From 7b09e1c41e846c0ba1a0686653cdf5c3101c0c17 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:24:06 +0100 Subject: [PATCH 1/8] formatting --- pype/ftrack/ftrack_server/lib.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 748937c7bd..fd4c1fe7b9 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -49,7 +49,9 @@ def ftrack_events_mongo_settings(): def get_ftrack_event_mongo_info(): - host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings() + host, port, database, username, password, collection, auth_db = ( + ftrack_events_mongo_settings() + ) user_pass = "" if username and password: user_pass = "{}:{}@".format(username, password) From 4497a89f53198adfeb60d6ea663fd9e266f0ef5e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:24:36 +0100 Subject: [PATCH 2/8] Storer EventHub moved to lib --- pype/ftrack/ftrack_server/lib.py | 29 +++++++++++++++++++++ pype/ftrack/ftrack_server/session_storer.py | 27 ------------------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index fd4c1fe7b9..80c147d400 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -99,3 +99,32 @@ def check_ftrack_url(url, log_errors=True): print('DEBUG: Ftrack server {} is accessible.'.format(url)) return url + + +class StorerEventHub(ftrack_api.event.hub.EventHub): + def __init__(self, *args, **kwargs): + self.sock = kwargs.pop("sock") + super(StorerEventHub, self).__init__(*args, **kwargs) + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "heartbeat": + # Reply with heartbeat. + self.sock.sendall(b"storer") + return self._send_packet(self._code_name_mapping['heartbeat']) + + elif code_name == "connect": + event = ftrack_api.event.base.Event( + topic="pype.storer.started", + data={}, + source={ + "id": self.id, + "user": {"username": self._api_user} + } + ) + self._event_queue.put(event) + + return super(StorerEventHub, self)._handle_packet( + code, packet_identifier, path, data + ) diff --git a/pype/ftrack/ftrack_server/session_storer.py b/pype/ftrack/ftrack_server/session_storer.py index 0b44d7d3a1..29abf329f0 100644 --- a/pype/ftrack/ftrack_server/session_storer.py +++ b/pype/ftrack/ftrack_server/session_storer.py @@ -14,33 +14,6 @@ import ftrack_api.event from ftrack_api.logging import LazyLogMessage as L -class StorerEventHub(ftrack_api.event.hub.EventHub): - def __init__(self, *args, **kwargs): - self.sock = kwargs.pop("sock") - super(StorerEventHub, self).__init__(*args, **kwargs) - - def _handle_packet(self, code, packet_identifier, path, data): - """Override `_handle_packet` which extend heartbeat""" - code_name = self._code_name_mapping[code] - if code_name == "heartbeat": - # Reply with heartbeat. - self.sock.sendall(b"storer") - return self._send_packet(self._code_name_mapping['heartbeat']) - - elif code_name == "connect": - event = ftrack_api.event.base.Event( - topic="pype.storer.started", - data={}, - source={ - "id": self.id, - "user": {"username": self._api_user} - } - ) - self._event_queue.put(event) - - return super(StorerEventHub, self)._handle_packet( - code, packet_identifier, path, data - ) class StorerSession(ftrack_api.session.Session): From d714f5fb780c7c7db13e4610be385590e373778a Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:25:25 +0100 Subject: [PATCH 3/8] Processor Eventub moved to lib --- pype/ftrack/ftrack_server/lib.py | 115 ++++++++++++++++++ .../ftrack/ftrack_server/session_processor.py | 110 ----------------- 2 files changed, 115 insertions(+), 110 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 80c147d400..091df72a98 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -128,3 +128,118 @@ class StorerEventHub(ftrack_api.event.hub.EventHub): return super(StorerEventHub, self)._handle_packet( code, packet_identifier, path, data ) + + +class ProcessEventHub(ftrack_api.event.hub.EventHub): + url, database, table_name = get_ftrack_event_mongo_info() + + is_table_created = False + pypelog = Logger().get_logger("Session Processor") + + def __init__(self, *args, **kwargs): + self.dbcon = DbConnector( + mongo_url=self.url, + database_name=self.database, + table_name=self.table_name + ) + self.sock = kwargs.pop("sock") + super(ProcessEventHub, self).__init__(*args, **kwargs) + + def prepare_dbcon(self): + try: + self.dbcon.install() + self.dbcon._database.list_collection_names() + except pymongo.errors.AutoReconnect: + self.pypelog.error( + "Mongo server \"{}\" is not responding, exiting.".format( + os.environ["AVALON_MONGO"] + ) + ) + sys.exit(0) + + except pymongo.errors.OperationFailure: + self.pypelog.error(( + "Error with Mongo access, probably permissions." + "Check if exist database with name \"{}\"" + " and collection \"{}\" inside." + ).format(self.database, self.table_name)) + self.sock.sendall(b"MongoError") + sys.exit(0) + + def wait(self, duration=None): + """Overriden wait + + Event are loaded from Mongo DB when queue is empty. Handled event is + set as processed in Mongo DB. + """ + started = time.time() + self.prepare_dbcon() + while True: + try: + event = self._event_queue.get(timeout=0.1) + except queue.Empty: + if not self.load_events(): + time.sleep(0.5) + else: + try: + self._handle(event) + self.dbcon.update_one( + {"id": event["id"]}, + {"$set": {"pype_data.is_processed": True}} + ) + except pymongo.errors.AutoReconnect: + self.pypelog.error(( + "Mongo server \"{}\" is not responding, exiting." + ).format(os.environ["AVALON_MONGO"])) + sys.exit(0) + # Additional special processing of events. + if event['topic'] == 'ftrack.meta.disconnected': + break + + if duration is not None: + if (time.time() - started) > duration: + break + + def load_events(self): + """Load not processed events sorted by stored date""" + ago_date = datetime.datetime.now() - datetime.timedelta(days=3) + result = self.dbcon.delete_many({ + "pype_data.stored": {"$lte": ago_date}, + "pype_data.is_processed": True + }) + + not_processed_events = self.dbcon.find( + {"pype_data.is_processed": False} + ).sort( + [("pype_data.stored", pymongo.ASCENDING)] + ) + + found = False + for event_data in not_processed_events: + new_event_data = { + k: v for k, v in event_data.items() + if k not in ["_id", "pype_data"] + } + try: + event = ftrack_api.event.base.Event(**new_event_data) + except Exception: + self.logger.exception(L( + 'Failed to convert payload into event: {0}', + event_data + )) + continue + found = True + self._event_queue.put(event) + + return found + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which skip events and extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "event": + return + if code_name == "heartbeat": + self.sock.sendall(b"processor") + return self._send_packet(self._code_name_mapping["heartbeat"]) + + return super()._handle_packet(code, packet_identifier, path, data) diff --git a/pype/ftrack/ftrack_server/session_processor.py b/pype/ftrack/ftrack_server/session_processor.py index 133719bab4..a17f919969 100644 --- a/pype/ftrack/ftrack_server/session_processor.py +++ b/pype/ftrack/ftrack_server/session_processor.py @@ -24,116 +24,6 @@ from pypeapp import Logger log = Logger().get_logger("Session processor") -class ProcessEventHub(ftrack_api.event.hub.EventHub): - url, database, table_name = get_ftrack_event_mongo_info() - - is_table_created = False - - def __init__(self, *args, **kwargs): - self.dbcon = DbConnector( - mongo_url=self.url, - database_name=self.database, - table_name=self.table_name - ) - self.sock = kwargs.pop("sock") - super(ProcessEventHub, self).__init__(*args, **kwargs) - - def prepare_dbcon(self): - try: - self.dbcon.install() - self.dbcon._database.list_collection_names() - except pymongo.errors.AutoReconnect: - log.error("Mongo server \"{}\" is not responding, exiting.".format( - os.environ["AVALON_MONGO"] - )) - sys.exit(0) - - except pymongo.errors.OperationFailure: - log.error(( - "Error with Mongo access, probably permissions." - "Check if exist database with name \"{}\"" - " and collection \"{}\" inside." - ).format(self.database, self.table_name)) - self.sock.sendall(b"MongoError") - sys.exit(0) - - def wait(self, duration=None): - """Overriden wait - - Event are loaded from Mongo DB when queue is empty. Handled event is - set as processed in Mongo DB. - """ - started = time.time() - self.prepare_dbcon() - while True: - try: - event = self._event_queue.get(timeout=0.1) - except queue.Empty: - if not self.load_events(): - time.sleep(0.5) - else: - try: - self._handle(event) - self.dbcon.update_one( - {"id": event["id"]}, - {"$set": {"pype_data.is_processed": True}} - ) - except pymongo.errors.AutoReconnect: - log.error(( - "Mongo server \"{}\" is not responding, exiting." - ).format(os.environ["AVALON_MONGO"])) - sys.exit(0) - # Additional special processing of events. - if event['topic'] == 'ftrack.meta.disconnected': - break - - if duration is not None: - if (time.time() - started) > duration: - break - - def load_events(self): - """Load not processed events sorted by stored date""" - ago_date = datetime.datetime.now() - datetime.timedelta(days=3) - result = self.dbcon.delete_many({ - "pype_data.stored": {"$lte": ago_date}, - "pype_data.is_processed": True - }) - - not_processed_events = self.dbcon.find( - {"pype_data.is_processed": False} - ).sort( - [("pype_data.stored", pymongo.ASCENDING)] - ) - - found = False - for event_data in not_processed_events: - new_event_data = { - k: v for k, v in event_data.items() - if k not in ["_id", "pype_data"] - } - try: - event = ftrack_api.event.base.Event(**new_event_data) - except Exception: - self.logger.exception(L( - 'Failed to convert payload into event: {0}', - event_data - )) - continue - found = True - self._event_queue.put(event) - - return found - - def _handle_packet(self, code, packet_identifier, path, data): - """Override `_handle_packet` which skip events and extend heartbeat""" - code_name = self._code_name_mapping[code] - if code_name == "event": - return - if code_name == "heartbeat": - self.sock.sendall(b"processor") - return self._send_packet(self._code_name_mapping["heartbeat"]) - - return super()._handle_packet(code, packet_identifier, path, data) class ProcessSession(ftrack_api.session.Session): From edacecf04152ee36e2e6047747fbc69ba4dfc75f Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:27:59 +0100 Subject: [PATCH 4/8] sessions from processor and storer moved to lib and added possiblity to set EventHub class in init --- pype/ftrack/ftrack_server/lib.py | 178 +++++++++++++ .../ftrack/ftrack_server/session_processor.py | 182 ------------- pype/ftrack/ftrack_server/session_storer.py | 242 ------------------ 3 files changed, 178 insertions(+), 424 deletions(-) delete mode 100644 pype/ftrack/ftrack_server/session_processor.py delete mode 100644 pype/ftrack/ftrack_server/session_storer.py diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 091df72a98..edd3cee09b 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -1,10 +1,32 @@ import os +import sys +import logging +import getpass +import atexit +import tempfile +import threading +import datetime +import time +import queue +import pymongo + import requests +import ftrack_api +import ftrack_api.session +import ftrack_api.cache +import ftrack_api.operation +import ftrack_api._centralized_storage_scenario +import ftrack_api.event +from ftrack_api.logging import LazyLogMessage as L try: from urllib.parse import urlparse, parse_qs except ImportError: from urlparse import urlparse, parse_qs +from pypeapp import Logger + +from pype.ftrack.lib.custom_db_connector import DbConnector + def ftrack_events_mongo_settings(): host = None @@ -243,3 +265,159 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub): return self._send_packet(self._code_name_mapping["heartbeat"]) return super()._handle_packet(code, packet_identifier, path, data) +class SocketSession(ftrack_api.session.Session): + '''An isolated session for interaction with an ftrack server.''' + def __init__( + self, server_url=None, api_key=None, api_user=None, auto_populate=True, + plugin_paths=None, cache=None, cache_key_maker=None, + auto_connect_event_hub=None, schema_cache_path=None, + plugin_arguments=None, sock=None, Eventhub=None + ): + super(ftrack_api.session.Session, self).__init__() + self.logger = logging.getLogger( + __name__ + '.' + self.__class__.__name__ + ) + self._closed = False + + if server_url is None: + server_url = os.environ.get('FTRACK_SERVER') + + if not server_url: + raise TypeError( + 'Required "server_url" not specified. Pass as argument or set ' + 'in environment variable FTRACK_SERVER.' + ) + + self._server_url = server_url + + if api_key is None: + api_key = os.environ.get( + 'FTRACK_API_KEY', + # Backwards compatibility + os.environ.get('FTRACK_APIKEY') + ) + + if not api_key: + raise TypeError( + 'Required "api_key" not specified. Pass as argument or set in ' + 'environment variable FTRACK_API_KEY.' + ) + + self._api_key = api_key + + if api_user is None: + api_user = os.environ.get('FTRACK_API_USER') + if not api_user: + try: + api_user = getpass.getuser() + except Exception: + pass + + if not api_user: + raise TypeError( + 'Required "api_user" not specified. Pass as argument, set in ' + 'environment variable FTRACK_API_USER or one of the standard ' + 'environment variables used by Python\'s getpass module.' + ) + + self._api_user = api_user + + # Currently pending operations. + self.recorded_operations = ftrack_api.operation.Operations() + self.record_operations = True + + self.cache_key_maker = cache_key_maker + if self.cache_key_maker is None: + self.cache_key_maker = ftrack_api.cache.StringKeyMaker() + + # Enforce always having a memory cache at top level so that the same + # in-memory instance is returned from session. + self.cache = ftrack_api.cache.LayeredCache([ + ftrack_api.cache.MemoryCache() + ]) + + if cache is not None: + if callable(cache): + cache = cache(self) + + if cache is not None: + self.cache.caches.append(cache) + + self._managed_request = None + self._request = requests.Session() + self._request.auth = ftrack_api.session.SessionAuthentication( + self._api_key, self._api_user + ) + + self.auto_populate = auto_populate + + # Fetch server information and in doing so also check credentials. + self._server_information = self._fetch_server_information() + + # Now check compatibility of server based on retrieved information. + self.check_server_compatibility() + + # Construct event hub and load plugins. + if Eventhub is None: + Eventhub = ftrack_api.event.hub.EventHub + self._event_hub = Eventhub( + self._server_url, + self._api_user, + self._api_key, + sock=sock + ) + + self._auto_connect_event_hub_thread = None + if auto_connect_event_hub in (None, True): + # Connect to event hub in background thread so as not to block main + # session usage waiting for event hub connection. + self._auto_connect_event_hub_thread = threading.Thread( + target=self._event_hub.connect + ) + self._auto_connect_event_hub_thread.daemon = True + self._auto_connect_event_hub_thread.start() + + # To help with migration from auto_connect_event_hub default changing + # from True to False. + self._event_hub._deprecation_warning_auto_connect = ( + auto_connect_event_hub is None + ) + + # Register to auto-close session on exit. + atexit.register(self.close) + + self._plugin_paths = plugin_paths + if self._plugin_paths is None: + self._plugin_paths = os.environ.get( + 'FTRACK_EVENT_PLUGIN_PATH', '' + ).split(os.pathsep) + + self._discover_plugins(plugin_arguments=plugin_arguments) + + # TODO: Make schemas read-only and non-mutable (or at least without + # rebuilding types)? + if schema_cache_path is not False: + if schema_cache_path is None: + schema_cache_path = os.environ.get( + 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() + ) + + schema_cache_path = os.path.join( + schema_cache_path, 'ftrack_api_schema_cache.json' + ) + + self.schemas = self._load_schemas(schema_cache_path) + self.types = self._build_entity_type_classes(self.schemas) + + ftrack_api._centralized_storage_scenario.register(self) + + self._configure_locations() + self.event_hub.publish( + ftrack_api.event.base.Event( + topic='ftrack.api.session.ready', + data=dict( + session=self + ) + ), + synchronous=True + ) diff --git a/pype/ftrack/ftrack_server/session_processor.py b/pype/ftrack/ftrack_server/session_processor.py deleted file mode 100644 index a17f919969..0000000000 --- a/pype/ftrack/ftrack_server/session_processor.py +++ /dev/null @@ -1,182 +0,0 @@ -import logging -import os -import atexit -import datetime -import tempfile -import threading -import time -import requests -import queue -import pymongo - -import ftrack_api -import ftrack_api.session -import ftrack_api.cache -import ftrack_api.operation -import ftrack_api._centralized_storage_scenario -import ftrack_api.event -from ftrack_api.logging import LazyLogMessage as L - -from pype.ftrack.lib.custom_db_connector import DbConnector -from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info -from pypeapp import Logger - -log = Logger().get_logger("Session processor") - - - - -class ProcessSession(ftrack_api.session.Session): - '''An isolated session for interaction with an ftrack server.''' - def __init__( - self, server_url=None, api_key=None, api_user=None, auto_populate=True, - plugin_paths=None, cache=None, cache_key_maker=None, - auto_connect_event_hub=None, schema_cache_path=None, - plugin_arguments=None, sock=None - ): - super(ftrack_api.session.Session, self).__init__() - self.logger = logging.getLogger( - __name__ + '.' + self.__class__.__name__ - ) - self._closed = False - - if server_url is None: - server_url = os.environ.get('FTRACK_SERVER') - - if not server_url: - raise TypeError( - 'Required "server_url" not specified. Pass as argument or set ' - 'in environment variable FTRACK_SERVER.' - ) - - self._server_url = server_url - - if api_key is None: - api_key = os.environ.get( - 'FTRACK_API_KEY', - # Backwards compatibility - os.environ.get('FTRACK_APIKEY') - ) - - if not api_key: - raise TypeError( - 'Required "api_key" not specified. Pass as argument or set in ' - 'environment variable FTRACK_API_KEY.' - ) - - self._api_key = api_key - - if api_user is None: - api_user = os.environ.get('FTRACK_API_USER') - if not api_user: - try: - api_user = getpass.getuser() - except Exception: - pass - - if not api_user: - raise TypeError( - 'Required "api_user" not specified. Pass as argument, set in ' - 'environment variable FTRACK_API_USER or one of the standard ' - 'environment variables used by Python\'s getpass module.' - ) - - self._api_user = api_user - - # Currently pending operations. - self.recorded_operations = ftrack_api.operation.Operations() - self.record_operations = True - - self.cache_key_maker = cache_key_maker - if self.cache_key_maker is None: - self.cache_key_maker = ftrack_api.cache.StringKeyMaker() - - # Enforce always having a memory cache at top level so that the same - # in-memory instance is returned from session. - self.cache = ftrack_api.cache.LayeredCache([ - ftrack_api.cache.MemoryCache() - ]) - - if cache is not None: - if callable(cache): - cache = cache(self) - - if cache is not None: - self.cache.caches.append(cache) - - self._managed_request = None - self._request = requests.Session() - self._request.auth = ftrack_api.session.SessionAuthentication( - self._api_key, self._api_user - ) - - self.auto_populate = auto_populate - - # Fetch server information and in doing so also check credentials. - self._server_information = self._fetch_server_information() - - # Now check compatibility of server based on retrieved information. - self.check_server_compatibility() - - # Construct event hub and load plugins. - self._event_hub = ProcessEventHub( - self._server_url, - self._api_user, - self._api_key, - sock=sock - ) - - self._auto_connect_event_hub_thread = None - if auto_connect_event_hub in (None, True): - # Connect to event hub in background thread so as not to block main - # session usage waiting for event hub connection. - self._auto_connect_event_hub_thread = threading.Thread( - target=self._event_hub.connect - ) - self._auto_connect_event_hub_thread.daemon = True - self._auto_connect_event_hub_thread.start() - - # To help with migration from auto_connect_event_hub default changing - # from True to False. - self._event_hub._deprecation_warning_auto_connect = ( - auto_connect_event_hub is None - ) - - # Register to auto-close session on exit. - atexit.register(self.close) - - self._plugin_paths = plugin_paths - if self._plugin_paths is None: - self._plugin_paths = os.environ.get( - 'FTRACK_EVENT_PLUGIN_PATH', '' - ).split(os.pathsep) - - self._discover_plugins(plugin_arguments=plugin_arguments) - - # TODO: Make schemas read-only and non-mutable (or at least without - # rebuilding types)? - if schema_cache_path is not False: - if schema_cache_path is None: - schema_cache_path = os.environ.get( - 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() - ) - - schema_cache_path = os.path.join( - schema_cache_path, 'ftrack_api_schema_cache.json' - ) - - self.schemas = self._load_schemas(schema_cache_path) - self.types = self._build_entity_type_classes(self.schemas) - - ftrack_api._centralized_storage_scenario.register(self) - - self._configure_locations() - self.event_hub.publish( - ftrack_api.event.base.Event( - topic='ftrack.api.session.ready', - data=dict( - session=self - ) - ), - synchronous=True - ) diff --git a/pype/ftrack/ftrack_server/session_storer.py b/pype/ftrack/ftrack_server/session_storer.py deleted file mode 100644 index 29abf329f0..0000000000 --- a/pype/ftrack/ftrack_server/session_storer.py +++ /dev/null @@ -1,242 +0,0 @@ -import logging -import os -import atexit -import tempfile -import threading -import requests - -import ftrack_api -import ftrack_api.session -import ftrack_api.cache -import ftrack_api.operation -import ftrack_api._centralized_storage_scenario -import ftrack_api.event -from ftrack_api.logging import LazyLogMessage as L - - - - -class StorerSession(ftrack_api.session.Session): - '''An isolated session for interaction with an ftrack server.''' - def __init__( - self, server_url=None, api_key=None, api_user=None, auto_populate=True, - plugin_paths=None, cache=None, cache_key_maker=None, - auto_connect_event_hub=None, schema_cache_path=None, - plugin_arguments=None, sock=None - ): - '''Initialise session. - - *server_url* should be the URL of the ftrack server to connect to - including any port number. If not specified attempt to look up from - :envvar:`FTRACK_SERVER`. - - *api_key* should be the API key to use for authentication whilst - *api_user* should be the username of the user in ftrack to record - operations against. If not specified, *api_key* should be retrieved - from :envvar:`FTRACK_API_KEY` and *api_user* from - :envvar:`FTRACK_API_USER`. - - If *auto_populate* is True (the default), then accessing entity - attributes will cause them to be automatically fetched from the server - if they are not already. This flag can be changed on the session - directly at any time. - - *plugin_paths* should be a list of paths to search for plugins. If not - specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`. - - *cache* should be an instance of a cache that fulfils the - :class:`ftrack_api.cache.Cache` interface and will be used as the cache - for the session. It can also be a callable that will be called with the - session instance as sole argument. The callable should return ``None`` - if a suitable cache could not be configured, but session instantiation - can continue safely. - - .. note:: - - The session will add the specified cache to a pre-configured layered - cache that specifies the top level cache as a - :class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary - to construct a separate memory cache for typical behaviour. Working - around this behaviour or removing the memory cache can lead to - unexpected behaviour. - - *cache_key_maker* should be an instance of a key maker that fulfils the - :class:`ftrack_api.cache.KeyMaker` interface and will be used to - generate keys for objects being stored in the *cache*. If not specified, - a :class:`~ftrack_api.cache.StringKeyMaker` will be used. - - If *auto_connect_event_hub* is True then embedded event hub will be - automatically connected to the event server and allow for publishing and - subscribing to **non-local** events. If False, then only publishing and - subscribing to **local** events will be possible until the hub is - manually connected using :meth:`EventHub.connect - `. - - .. note:: - - The event hub connection is performed in a background thread to - improve session startup time. If a registered plugin requires a - connected event hub then it should check the event hub connection - status explicitly. Subscribing to events does *not* require a - connected event hub. - - Enable schema caching by setting *schema_cache_path* to a folder path. - If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to - determine the path to store cache in. If the environment variable is - also not specified then a temporary directory will be used. Set to - `False` to disable schema caching entirely. - - *plugin_arguments* should be an optional mapping (dict) of keyword - arguments to pass to plugin register functions upon discovery. If a - discovered plugin has a signature that is incompatible with the passed - arguments, the discovery mechanism will attempt to reduce the passed - arguments to only those that the plugin accepts. Note that a warning - will be logged in this case. - - ''' - super(ftrack_api.session.Session, self).__init__() - self.logger = logging.getLogger( - __name__ + '.' + self.__class__.__name__ - ) - self._closed = False - - if server_url is None: - server_url = os.environ.get('FTRACK_SERVER') - - if not server_url: - raise TypeError( - 'Required "server_url" not specified. Pass as argument or set ' - 'in environment variable FTRACK_SERVER.' - ) - - self._server_url = server_url - - if api_key is None: - api_key = os.environ.get( - 'FTRACK_API_KEY', - # Backwards compatibility - os.environ.get('FTRACK_APIKEY') - ) - - if not api_key: - raise TypeError( - 'Required "api_key" not specified. Pass as argument or set in ' - 'environment variable FTRACK_API_KEY.' - ) - - self._api_key = api_key - - if api_user is None: - api_user = os.environ.get('FTRACK_API_USER') - if not api_user: - try: - api_user = getpass.getuser() - except Exception: - pass - - if not api_user: - raise TypeError( - 'Required "api_user" not specified. Pass as argument, set in ' - 'environment variable FTRACK_API_USER or one of the standard ' - 'environment variables used by Python\'s getpass module.' - ) - - self._api_user = api_user - - # Currently pending operations. - self.recorded_operations = ftrack_api.operation.Operations() - self.record_operations = True - - self.cache_key_maker = cache_key_maker - if self.cache_key_maker is None: - self.cache_key_maker = ftrack_api.cache.StringKeyMaker() - - # Enforce always having a memory cache at top level so that the same - # in-memory instance is returned from session. - self.cache = ftrack_api.cache.LayeredCache([ - ftrack_api.cache.MemoryCache() - ]) - - if cache is not None: - if callable(cache): - cache = cache(self) - - if cache is not None: - self.cache.caches.append(cache) - - self._managed_request = None - self._request = requests.Session() - self._request.auth = ftrack_api.session.SessionAuthentication( - self._api_key, self._api_user - ) - - self.auto_populate = auto_populate - - # Fetch server information and in doing so also check credentials. - self._server_information = self._fetch_server_information() - - # Now check compatibility of server based on retrieved information. - self.check_server_compatibility() - - # Construct event hub and load plugins. - self._event_hub = StorerEventHub( - self._server_url, - self._api_user, - self._api_key, - sock=sock - ) - - self._auto_connect_event_hub_thread = None - if auto_connect_event_hub in (None, True): - # Connect to event hub in background thread so as not to block main - # session usage waiting for event hub connection. - self._auto_connect_event_hub_thread = threading.Thread( - target=self._event_hub.connect - ) - self._auto_connect_event_hub_thread.daemon = True - self._auto_connect_event_hub_thread.start() - - # To help with migration from auto_connect_event_hub default changing - # from True to False. - self._event_hub._deprecation_warning_auto_connect = ( - auto_connect_event_hub is None - ) - - # Register to auto-close session on exit. - atexit.register(self.close) - - self._plugin_paths = plugin_paths - if self._plugin_paths is None: - self._plugin_paths = os.environ.get( - 'FTRACK_EVENT_PLUGIN_PATH', '' - ).split(os.pathsep) - - self._discover_plugins(plugin_arguments=plugin_arguments) - - # TODO: Make schemas read-only and non-mutable (or at least without - # rebuilding types)? - if schema_cache_path is not False: - if schema_cache_path is None: - schema_cache_path = os.environ.get( - 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() - ) - - schema_cache_path = os.path.join( - schema_cache_path, 'ftrack_api_schema_cache.json' - ) - - self.schemas = self._load_schemas(schema_cache_path) - self.types = self._build_entity_type_classes(self.schemas) - - ftrack_api._centralized_storage_scenario.register(self) - - self._configure_locations() - self.event_hub.publish( - ftrack_api.event.base.Event( - topic='ftrack.api.session.ready', - data=dict( - session=self - ) - ), - synchronous=True - ) From dd52e6594db97395f4ace8c39c0d873518f54d09 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:29:28 +0100 Subject: [PATCH 5/8] used moved session and event hubs in subprocesses --- .../ftrack_server/sub_event_processor.py | 13 ++++---- pype/ftrack/ftrack_server/sub_event_storer.py | 30 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_processor.py b/pype/ftrack/ftrack_server/sub_event_processor.py index 6ada787223..9c971ca916 100644 --- a/pype/ftrack/ftrack_server/sub_event_processor.py +++ b/pype/ftrack/ftrack_server/sub_event_processor.py @@ -1,12 +1,9 @@ -import os import sys -import datetime import signal import socket -import pymongo from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.session_processor import ProcessSession +from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub from pypeapp import Logger log = Logger().get_logger("Event processor") @@ -24,12 +21,14 @@ def main(args): sock.sendall(b"CreatedProcess") try: - session = ProcessSession(auto_connect_event_hub=True, sock=sock) - server = FtrackServer('event') + session = SocketSession( + auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub + ) + server = FtrackServer("event") log.debug("Launched Ftrack Event processor") server.run_server(session) - except Exception as exc: + except Exception: log.error("Event server crashed. See traceback below", exc_info=True) finally: diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py index 4828b10bfa..11cda0e487 100644 --- a/pype/ftrack/ftrack_server/sub_event_storer.py +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -7,22 +7,22 @@ import pymongo import ftrack_api from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info +from pype.ftrack.ftrack_server.lib import ( + get_ftrack_event_mongo_info, + SocketSession, + StorerEventHub +) from pype.ftrack.lib.custom_db_connector import DbConnector -from session_storer import StorerSession from pypeapp import Logger log = Logger().get_logger("Event storer") + +class SessionFactory: + session = None + + url, database, table_name = get_ftrack_event_mongo_info() - - -class SessionClass: - def __init__(self): - self.session = None - - -session_obj = SessionClass() dbcon = DbConnector( mongo_url=url, database_name=database, @@ -75,7 +75,7 @@ def launch(event): def trigger_sync(event): - session = session_obj.session + session = SessionFactory.session if session is None: log.warning("Session is not set. Can't trigger Sync to avalon action.") return True @@ -93,7 +93,7 @@ def trigger_sync(event): "$set": {"pype_data.is_processed": True} } dbcon.update_many(query, set_dict) - + selections = [] for project in projects: if project["status"] != "active": @@ -154,8 +154,10 @@ def main(args): sock.sendall(b"CreatedStore") try: - session = StorerSession(auto_connect_event_hub=True, sock=sock) - session_obj.session = session + session = SocketSession( + auto_connect_event_hub=True, sock=sock, Eventhub=StorerEventHub + ) + SessionFactory.session = session register(session) server = FtrackServer("event") log.debug("Launched Ftrack Event storer") From 8d7f29c1bac8fc1b6cf66137e07771b42b582c1e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:32:03 +0100 Subject: [PATCH 6/8] fix in trigger sync method, now check event_hub id to not trigger sync on every connect event --- pype/ftrack/ftrack_server/sub_event_storer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py index 11cda0e487..dfe8e21654 100644 --- a/pype/ftrack/ftrack_server/sub_event_storer.py +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -76,6 +76,10 @@ def launch(event): def trigger_sync(event): session = SessionFactory.session + source_id = event.get("source", {}).get("id") + if not source_id or source_id != session.event_hub.id: + return + if session is None: log.warning("Session is not set. Can't trigger Sync to avalon action.") return True From f5c326aa568a3660c4a8e612e1704b613403577e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:33:22 +0100 Subject: [PATCH 7/8] formatting changes --- pype/ftrack/ftrack_server/socket_thread.py | 4 ++-- pype/ftrack/ftrack_server/sub_legacy_server.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py index d0a2868743..3309f75cd7 100644 --- a/pype/ftrack/ftrack_server/socket_thread.py +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -1,7 +1,5 @@ import os -import sys import time -import signal import socket import threading import subprocess @@ -10,7 +8,9 @@ from pypeapp import Logger class SocketThread(threading.Thread): """Thread that checks suprocess of storer of processor of events""" + MAX_TIMEOUT = 35 + def __init__(self, name, port, filepath): super(SocketThread, self).__init__() self.log = Logger().get_logger("SocketThread", "Event Thread") diff --git a/pype/ftrack/ftrack_server/sub_legacy_server.py b/pype/ftrack/ftrack_server/sub_legacy_server.py index 31f38d0404..8b7bab5e2e 100644 --- a/pype/ftrack/ftrack_server/sub_legacy_server.py +++ b/pype/ftrack/ftrack_server/sub_legacy_server.py @@ -1,4 +1,3 @@ -import os import sys import time import datetime @@ -7,7 +6,6 @@ import threading from ftrack_server import FtrackServer import ftrack_api -from ftrack_api.event.hub import EventHub from pypeapp import Logger log = Logger().get_logger("Event Server Legacy") @@ -37,7 +35,10 @@ class TimerChecker(threading.Thread): if not self.session.event_hub.connected: if not connected: - if (datetime.datetime.now() - start).seconds > self.max_time_out: + if ( + (datetime.datetime.now() - start).seconds > + self.max_time_out + ): log.error(( "Exiting event server. Session was not connected" " to ftrack server in {} seconds." @@ -61,7 +62,7 @@ class TimerChecker(threading.Thread): def main(args): check_thread = None try: - server = FtrackServer('event') + server = FtrackServer("event") session = ftrack_api.Session(auto_connect_event_hub=True) check_thread = TimerChecker(server, session) From 6e2ea0c05bc63fbcb4272e11922c202b55d39b0f Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:34:36 +0100 Subject: [PATCH 8/8] change session check in base event handler --- pype/ftrack/lib/ftrack_base_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/lib/ftrack_base_handler.py b/pype/ftrack/lib/ftrack_base_handler.py index 4b57452961..8329505ffb 100644 --- a/pype/ftrack/lib/ftrack_base_handler.py +++ b/pype/ftrack/lib/ftrack_base_handler.py @@ -2,7 +2,7 @@ import functools import time from pypeapp import Logger import ftrack_api -from pype.ftrack.ftrack_server import session_processor +from pype.ftrack.ftrack_server.lib import SocketSession class MissingPermision(Exception): @@ -41,7 +41,7 @@ class BaseHandler(object): self.log = Logger().get_logger(self.__class__.__name__) if not( isinstance(session, ftrack_api.session.Session) or - isinstance(session, session_processor.ProcessSession) + isinstance(session, SocketSession) ): raise Exception(( "Session object entered with args is instance of \"{}\""