diff --git a/pype/ftrack/actions/action_sync_notes.py b/pype/ftrack/actions/action_sync_notes.py new file mode 100644 index 0000000000..4c0788f858 --- /dev/null +++ b/pype/ftrack/actions/action_sync_notes.py @@ -0,0 +1,230 @@ +import os +import sys +import time +import datetime +import requests +import tempfile + +from pypeapp import config +from pype.vendor import ftrack_api +from pype.ftrack import BaseAction +from pype.ftrack.lib.custom_db_connector import DbConnector, ClientSession + + +class SynchronizeNotes(BaseAction): + #: Action identifier. + identifier = 'sync.notes' + #: Action label. + label = 'Synchronize Notes' + #: Action description. + description = 'Synchronize notes from one Ftrack to another' + #: roles that are allowed to register this action + role_list = ['Administrator', 'Project Manager', 'Pypeclub'] + + db_con = DbConnector( + mongo_url=os.environ["AVALON_MONGO"], + database_name='notes_database', + table_name='notes_table' + ) + + id_key_src = 'fridge_ftrackID' + id_key_dst = 'kredenc_ftrackID' + + def discover(self, session, entities, event): + ''' Validation ''' + if len(entities) == 0: + return False + + for entity in entities: + if entity.entity_type.lower() != 'assetversion': + return False + + return True + + def launch(self, session, entities, event): + source_credentials = config.get_presets()['ftrack'].get( + 'partnership_ftrack_cred', {} + ) + + self.session_source = ftrack_api.Session( + server_url=source_credentials.get('server_url'), + api_key=source_credentials.get('api_key'), + api_user=source_credentials.get('api_user'), + auto_connect_event_hub=True + ) + + self.session_for_components = ftrack_api.Session( + server_url=session.server_url, + api_key=session.api_key, + api_user=session.api_user, + auto_connect_event_hub=True + ) + + self.user = self.session_for_components.query( + 'User where username is "{}"'.format(self.session.api_user) + ).one() + + self.db_con.install() + + missing_id_entities = [] + to_sync_data = [] + for dst_entity in entities: + # Ignore entities withoud stored id from second ftrack + from_id = dst_entity['custom_attributes'].get(self.id_key_src) + if not from_id: + missing_id_entities.append(dst_entity.get('name', dst_entity)) + continue + + to_sync_data.append((dst_entity.entity_type, dst_entity['id'])) + + for dst_entity_data in to_sync_data: + av_query = 'AssetVersion where id is "{}"'.format(from_id) + src_entity = self.session_source.query(av_query).one() + src_notes = src_entity['notes'] + self.sync_notes(src_notes, dst_entity_data) + + self.db_con.uninstall() + + if missing_id_entities: + self.log.info('Entities without Avalon ID:') + self.log.info(missing_id_entities) + + return True + + def sync_notes(self, src_notes, dst_entity_data): + # Sort notes by date time + src_notes = sorted(src_notes, key=lambda note: note['date']) + + for src_note in src_notes: + # Find if exists in DB + db_note_entity = self.db_con.find_one({ + self.id_key_src: src_note['id'] + }) + + # WARNING: expr `if not db_note_entity:` does not work! + if db_note_entity is None: + # Create note if not found in DB + dst_note_id = self.create_note( + src_note, dst_entity_data + ) + # Add references to DB for next sync + item = { + self.id_key_dst: dst_note_id, + self.id_key_src: src_note['id'], + 'content': src_note['content'], + 'entity_type': 'Note', + 'sync_date': str(datetime.date.today()) + } + self.db_con.insert_one(item) + else: + dst_note_id = db_note_entity[self.id_key_dst] + + replies = src_note.get('replies') + if not replies: + continue + + self.sync_notes(replies, ('Note', dst_note_id)) + + def create_note(self, src_note, dst_entity_data): + # dst_entity_data - tuple(entity type, entity id) + dst_entity = self.session.query( + '{} where id is "{}"'.format(*dst_entity_data) + ).one() + + is_reply = False + if dst_entity.entity_type.lower() != 'note': + # Category + category = None + cat = src_note['category'] + if cat: + cat_name = cat['name'] + category = self.session.query( + 'NoteCategory where name is "{}"'.format(cat_name) + ).first() + + new_note = dst_entity.create_note( + src_note['content'], self.user, category=category + ) + else: + new_note = dst_entity.create_reply( + src_note['content'], self.user + ) + is_reply = True + + # QUESTION Should we change date to match source Ftrack? + new_note['date'] = src_note['date'] + + self.session.commit() + new_note_id = new_note['id'] + + # Components + if src_note['note_components']: + self.reupload_components(src_note, new_note_id) + + # Bug in ftrack_api, when reply is added session must be reset + if is_reply: + self.session.reset() + time.sleep(0.2) + + return new_note_id + + def reupload_components(self, src_note, dst_note_id): + # Download and collect source components + src_server_location = self.session_source.query( + 'Location where name is "ftrack.server"' + ).one() + + temp_folder = tempfile.mkdtemp('note_components') + + #download and store path to upload + paths_to_upload = [] + count = 0 + for note_component in src_note['note_components']: + count +=1 + download_url = src_server_location.get_url( + note_component['component'] + ) + + file_name = '{}{}{}'.format( + str(src_note['date'].format('YYYYMMDDHHmmss')), + "{:0>3}".format(count), + note_component['component']['file_type'] + ) + path = os.path.sep.join([temp_folder, file_name]) + + self.download_file(download_url, path) + paths_to_upload.append(path) + + # Create downloaded components and add to note + dst_server_location = self.session_for_components.query( + 'Location where name is "ftrack.server"' + ).one() + + for path in paths_to_upload: + component = self.session_for_components.create_component( + path, + data={'name': 'My file'}, + location=dst_server_location + ) + + # Attach the component to the note. + self.session_for_components.create( + 'NoteComponent', + {'component_id': component['id'], 'note_id': dst_note_id} + ) + + self.session_for_components.commit() + + def download_file(self, url, path): + r = requests.get(url, stream=True).content + with open(path, 'wb') as f: + f.write(r) + + +def register(session, **kw): + '''Register plugin. Called when used as an plugin.''' + + if not isinstance(session, ftrack_api.session.Session): + return + + SynchronizeNotes(session).register() diff --git a/pype/ftrack/lib/custom_db_connector.py b/pype/ftrack/lib/custom_db_connector.py new file mode 100644 index 0000000000..505ac96610 --- /dev/null +++ b/pype/ftrack/lib/custom_db_connector.py @@ -0,0 +1,207 @@ +""" +Wrapper around interactions with the database + +Copy of io module in avalon-core. + - In this case not working as singleton with api.Session! +""" + +import os +import time +import errno +import shutil +import logging +import tempfile +import functools +import contextlib + +import requests + +# Third-party dependencies +import pymongo +from pymongo.client_session import ClientSession + +def auto_reconnect(func): + """Handling auto reconnect in 3 retry times""" + @functools.wraps(func) + def decorated(*args, **kwargs): + object = args[0] + for retry in range(3): + try: + return func(*args, **kwargs) + except pymongo.errors.AutoReconnect: + object.log.error("Reconnecting..") + time.sleep(0.1) + else: + raise + + return decorated + + +class DbConnector: + + log = logging.getLogger(__name__) + timeout = 1000 + + def __init__(self, mongo_url, database_name, table_name): + self._mongo_client = None + self._sentry_client = None + self._sentry_logging_handler = None + self._database = None + self._is_installed = False + self._mongo_url = mongo_url + self._database_name = database_name + + self.active_table = table_name + + def install(self): + """Establish a persistent connection to the database""" + if self._is_installed: + return + + logging.basicConfig() + + self._mongo_client = pymongo.MongoClient( + self._mongo_url, + serverSelectionTimeoutMS=self.timeout + ) + + for retry in range(3): + try: + t1 = time.time() + self._mongo_client.server_info() + except Exception: + self.log.error("Retrying..") + time.sleep(1) + else: + break + + else: + raise IOError( + "ERROR: Couldn't connect to %s in " + "less than %.3f ms" % (self._mongo_url, timeout) + ) + + self.log.info("Connected to %s, delay %.3f s" % ( + self._mongo_url, time.time() - t1 + )) + + self._database = self._mongo_client[self._database_name] + self._is_installed = True + + def uninstall(self): + """Close any connection to the database""" + + try: + self._mongo_client.close() + except AttributeError: + pass + + self._mongo_client = None + self._database = None + self._is_installed = False + + def tables(self): + """List available tables + Returns: + list of table names + """ + collection_names = self.collections() + for table_name in collection_names: + if table_name in ("system.indexes",): + continue + yield table_name + + @auto_reconnect + def collections(self): + return self._database.collection_names() + + @auto_reconnect + def insert_one(self, item, session=None): + assert isinstance(item, dict), "item must be of type " + return self._database[self.active_table].insert_one( + item, + session=session + ) + + @auto_reconnect + def insert_many(self, items, ordered=True, session=None): + # check if all items are valid + assert isinstance(items, list), "`items` must be of type " + for item in items: + assert isinstance(item, dict), "`item` must be of type " + + return self._database[self.active_table].insert_many( + items, + ordered=ordered, + session=session + ) + + @auto_reconnect + def find(self, filter, projection=None, sort=None, session=None): + return self._database[self.active_table].find( + filter=filter, + projection=projection, + sort=sort, + session=session + ) + + @auto_reconnect + def find_one(self, filter, projection=None, sort=None, session=None): + assert isinstance(filter, dict), "filter must be " + + return self._database[self.active_table].find_one( + filter=filter, + projection=projection, + sort=sort, + session=session + ) + + @auto_reconnect + def replace_one(self, filter, replacement, session=None): + return self._database[self.active_table].replace_one( + filter, replacement, + session=session + ) + + @auto_reconnect + def update_one(self, filter, update, session=None): + return self._database[self.active_table].update_one( + filter, update, + session=session + ) + + @auto_reconnect + def update_many(self, filter, update, session=None): + return self._database[self.active_table].update_many( + filter, update, + session=session + ) + + @auto_reconnect + def distinct(self, *args, **kwargs): + return self._database[self.active_table].distinct( + *args, **kwargs + ) + + @auto_reconnect + def drop_collection(self, name_or_collection, session=None): + return self._database[self.active_table].drop( + name_or_collection, + session=session + ) + + @auto_reconnect + def delete_one(filter, collation=None, session=None): + return self._database[self.active_table].delete_one( + filter, + collation=collation, + session=session + ) + + @auto_reconnect + def delete_many(filter, collation=None, session=None): + return self._database[self.active_table].delete_many( + filter, + collation=collation, + session=session + )