diff --git a/pype/ftrack/__init__.py b/pype/ftrack/__init__.py index bf18979e91..45ca8384b5 100644 --- a/pype/ftrack/__init__.py +++ b/pype/ftrack/__init__.py @@ -1,2 +1,2 @@ from .lib import * -from .ftrack_server import * +from .ftrack_server import FtrackServer diff --git a/pype/ftrack/actions/action_attributes_remapper.py b/pype/ftrack/actions/action_attributes_remapper.py index 759b5765e5..a0393ece40 100644 --- a/pype/ftrack/actions/action_attributes_remapper.py +++ b/pype/ftrack/actions/action_attributes_remapper.py @@ -281,7 +281,4 @@ class AttributesRemapper(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - AttributesRemapper(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_client_review_sort.py b/pype/ftrack/actions/action_client_review_sort.py index 6a659ce5e3..91926c2874 100644 --- a/pype/ftrack/actions/action_client_review_sort.py +++ b/pype/ftrack/actions/action_client_review_sort.py @@ -55,11 +55,8 @@ class ClientReviewSort(BaseAction): def register(session, plugins_presets={}): '''Register action. Called when used as an event plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - action_handler = ClientReviewSort(session, plugins_presets) - action_handler.register() + ClientReviewSort(session, plugins_presets).register() def main(arguments=None): diff --git a/pype/ftrack/actions/action_component_open.py b/pype/ftrack/actions/action_component_open.py index 33f4d38890..98d773dba6 100644 --- a/pype/ftrack/actions/action_component_open.py +++ b/pype/ftrack/actions/action_component_open.py @@ -68,12 +68,6 @@ class ComponentOpen(BaseAction): def register(session, plugins_presets={}): '''Register action. Called when used as an event plugin.''' - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - ComponentOpen(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_create_cust_attrs.py b/pype/ftrack/actions/action_create_cust_attrs.py index 47a6bb5d5f..ac6dcb0fd7 100644 --- a/pype/ftrack/actions/action_create_cust_attrs.py +++ b/pype/ftrack/actions/action_create_cust_attrs.py @@ -572,12 +572,6 @@ class CustomAttributes(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - CustomAttributes(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_create_folders.py b/pype/ftrack/actions/action_create_folders.py index 269316e052..44e9741bab 100644 --- a/pype/ftrack/actions/action_create_folders.py +++ b/pype/ftrack/actions/action_create_folders.py @@ -327,9 +327,6 @@ class PartialDict(dict): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - CreateFolders(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_create_project_structure.py b/pype/ftrack/actions/action_create_project_structure.py index 74d458b5f8..c39c717b11 100644 --- a/pype/ftrack/actions/action_create_project_structure.py +++ b/pype/ftrack/actions/action_create_project_structure.py @@ -198,9 +198,6 @@ class CreateProjectFolders(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - CreateProjectFolders(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_cust_attr_doctor.py b/pype/ftrack/actions/action_cust_attr_doctor.py index af5fe2dc4a..0469b3a1e6 100644 --- a/pype/ftrack/actions/action_cust_attr_doctor.py +++ b/pype/ftrack/actions/action_cust_attr_doctor.py @@ -9,7 +9,7 @@ from pype.ftrack import BaseAction class CustomAttributeDoctor(BaseAction): - + ignore_me = True #: Action identifier. identifier = 'custom.attributes.doctor' @@ -294,9 +294,6 @@ class CustomAttributeDoctor(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - CustomAttributeDoctor(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_delete_asset.py b/pype/ftrack/actions/action_delete_asset.py index 1b1e7fc905..106c81758a 100644 --- a/pype/ftrack/actions/action_delete_asset.py +++ b/pype/ftrack/actions/action_delete_asset.py @@ -85,7 +85,7 @@ class DeleteAsset(BaseAction): 'type': 'asset', 'name': entity['name'] }) - + if av_entity is None: return { 'success': False, @@ -314,12 +314,6 @@ class DeleteAsset(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - DeleteAsset(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_delete_asset_byname.py b/pype/ftrack/actions/action_delete_asset_byname.py index 2431b2311e..4a3807f8f0 100644 --- a/pype/ftrack/actions/action_delete_asset_byname.py +++ b/pype/ftrack/actions/action_delete_asset_byname.py @@ -135,12 +135,6 @@ class AssetsRemover(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - AssetsRemover(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_djvview.py b/pype/ftrack/actions/action_djvview.py index 58914fbc1e..9da12dd67c 100644 --- a/pype/ftrack/actions/action_djvview.py +++ b/pype/ftrack/actions/action_djvview.py @@ -220,8 +220,6 @@ class DJVViewAction(BaseAction): def register(session, plugins_presets={}): """Register hooks.""" - if not isinstance(session, ftrack_api.session.Session): - return DJVViewAction(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_job_killer.py b/pype/ftrack/actions/action_job_killer.py index 717f87e879..64fb99133d 100644 --- a/pype/ftrack/actions/action_job_killer.py +++ b/pype/ftrack/actions/action_job_killer.py @@ -121,12 +121,6 @@ class JobKiller(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - JobKiller(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_multiple_notes.py b/pype/ftrack/actions/action_multiple_notes.py index 6e28b7bed6..bd51cb2984 100644 --- a/pype/ftrack/actions/action_multiple_notes.py +++ b/pype/ftrack/actions/action_multiple_notes.py @@ -115,9 +115,6 @@ class MultipleNotes(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - MultipleNotes(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_prepare_project.py b/pype/ftrack/actions/action_prepare_project.py index e914fa74f0..d645748f6f 100644 --- a/pype/ftrack/actions/action_prepare_project.py +++ b/pype/ftrack/actions/action_prepare_project.py @@ -372,7 +372,4 @@ class PrepareProject(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - PrepareProject(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_rv.py b/pype/ftrack/actions/action_rv.py index 6b6591355f..69c6624b71 100644 --- a/pype/ftrack/actions/action_rv.py +++ b/pype/ftrack/actions/action_rv.py @@ -328,8 +328,6 @@ class RVAction(BaseAction): def register(session, plugins_presets={}): """Register hooks.""" - if not isinstance(session, ftrack_api.session.Session): - return RVAction(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_start_timer.py b/pype/ftrack/actions/action_start_timer.py index 36752a1edc..292789e9f3 100644 --- a/pype/ftrack/actions/action_start_timer.py +++ b/pype/ftrack/actions/action_start_timer.py @@ -26,7 +26,7 @@ class StartTimer(BaseAction): user.start_timer(entity, force=True) self.session.commit() - + self.log.info( "Starting Ftrack timer for task: {}".format(entity['name']) ) @@ -37,7 +37,4 @@ class StartTimer(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - StartTimer(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_sync_hier_attrs_local.py b/pype/ftrack/actions/action_sync_hier_attrs_local.py index 05a70461a1..289abd0122 100644 --- a/pype/ftrack/actions/action_sync_hier_attrs_local.py +++ b/pype/ftrack/actions/action_sync_hier_attrs_local.py @@ -309,9 +309,6 @@ class SyncHierarchicalAttrs(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - SyncHierarchicalAttrs(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_sync_to_avalon_local.py b/pype/ftrack/actions/action_sync_to_avalon_local.py index ddf8ed6571..61050f9883 100644 --- a/pype/ftrack/actions/action_sync_to_avalon_local.py +++ b/pype/ftrack/actions/action_sync_to_avalon_local.py @@ -263,11 +263,4 @@ class SyncToAvalon(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - - # Validate that session is an instance of ftrack_api.Session. If not, - # assume that register is being called from an old or incompatible API and - # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - SyncToAvalon(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_test.py b/pype/ftrack/actions/action_test.py index a2bc8bf892..58f7210e3b 100644 --- a/pype/ftrack/actions/action_test.py +++ b/pype/ftrack/actions/action_test.py @@ -43,9 +43,6 @@ class TestAction(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - TestAction(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_thumbnail_to_childern.py b/pype/ftrack/actions/action_thumbnail_to_childern.py index 101b678512..7d189cf652 100644 --- a/pype/ftrack/actions/action_thumbnail_to_childern.py +++ b/pype/ftrack/actions/action_thumbnail_to_childern.py @@ -68,8 +68,6 @@ class ThumbToChildren(BaseAction): def register(session, plugins_presets={}): '''Register action. Called when used as an event plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return ThumbToChildren(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_thumbnail_to_parent.py b/pype/ftrack/actions/action_thumbnail_to_parent.py index c382d9303c..efafca4a96 100644 --- a/pype/ftrack/actions/action_thumbnail_to_parent.py +++ b/pype/ftrack/actions/action_thumbnail_to_parent.py @@ -90,8 +90,6 @@ class ThumbToParent(BaseAction): def register(session, plugins_presets={}): '''Register action. Called when used as an event plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return ThumbToParent(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_where_run_ask.py b/pype/ftrack/actions/action_where_run_ask.py index 0351c09909..795c2664cc 100644 --- a/pype/ftrack/actions/action_where_run_ask.py +++ b/pype/ftrack/actions/action_where_run_ask.py @@ -40,7 +40,4 @@ class ActionAskWhereIRun(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - ActionAskWhereIRun(session, plugins_presets).register() diff --git a/pype/ftrack/actions/action_where_run_show.py b/pype/ftrack/actions/action_where_run_show.py index 7fea23e3b7..48618f0251 100644 --- a/pype/ftrack/actions/action_where_run_show.py +++ b/pype/ftrack/actions/action_where_run_show.py @@ -80,7 +80,4 @@ class ActionShowWhereIRun(BaseAction): def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - ActionShowWhereIRun(session, plugins_presets).register() diff --git a/pype/ftrack/events/action_sync_hier_attrs.py b/pype/ftrack/events/action_sync_hier_attrs.py index c9d968ee5d..23ac319261 100644 --- a/pype/ftrack/events/action_sync_hier_attrs.py +++ b/pype/ftrack/events/action_sync_hier_attrs.py @@ -220,7 +220,7 @@ class SyncHierarchicalAttrs(BaseAction): if job['status'] in ('queued', 'running'): job['status'] = 'failed' session.commit() - + if self.interface_messages: self.show_interface_from_dict( messages=self.interface_messages, @@ -341,9 +341,6 @@ class SyncHierarchicalAttrs(BaseAction): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return - SyncHierarchicalAttrs(session, plugins_presets).register() diff --git a/pype/ftrack/events/action_sync_to_avalon.py b/pype/ftrack/events/action_sync_to_avalon.py index 51a4ae9475..7b5f94f216 100644 --- a/pype/ftrack/events/action_sync_to_avalon.py +++ b/pype/ftrack/events/action_sync_to_avalon.py @@ -296,9 +296,6 @@ def register(session, plugins_presets): # Validate that session is an instance of ftrack_api.Session. If not, # assume that register is being called from an old or incompatible API and # return without doing anything. - if not isinstance(session, ftrack_api.session.Session): - return - SyncToAvalon(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_del_avalon_id_from_new.py b/pype/ftrack/events/event_del_avalon_id_from_new.py index 6f6320f51b..e5c4b1be45 100644 --- a/pype/ftrack/events/event_del_avalon_id_from_new.py +++ b/pype/ftrack/events/event_del_avalon_id_from_new.py @@ -53,7 +53,5 @@ class DelAvalonIdFromNew(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return DelAvalonIdFromNew(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_next_task_update.py b/pype/ftrack/events/event_next_task_update.py index 18a7abf328..51ccb2f057 100644 --- a/pype/ftrack/events/event_next_task_update.py +++ b/pype/ftrack/events/event_next_task_update.py @@ -88,7 +88,5 @@ class NextTaskUpdate(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return NextTaskUpdate(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_radio_buttons.py b/pype/ftrack/events/event_radio_buttons.py index 9c6f2d490a..917c7a49e6 100644 --- a/pype/ftrack/events/event_radio_buttons.py +++ b/pype/ftrack/events/event_radio_buttons.py @@ -36,7 +36,5 @@ class Radio_buttons(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return Radio_buttons(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_sync_hier_attr.py b/pype/ftrack/events/event_sync_hier_attr.py index 5ddc2394af..8031ec9e55 100644 --- a/pype/ftrack/events/event_sync_hier_attr.py +++ b/pype/ftrack/events/event_sync_hier_attr.py @@ -209,7 +209,5 @@ class SyncHierarchicalAttrs(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return SyncHierarchicalAttrs(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_sync_to_avalon.py b/pype/ftrack/events/event_sync_to_avalon.py index 4e7b208726..a25866be65 100644 --- a/pype/ftrack/events/event_sync_to_avalon.py +++ b/pype/ftrack/events/event_sync_to_avalon.py @@ -122,8 +122,4 @@ class Sync_to_Avalon(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - - if not isinstance(session, ftrack_api.session.Session): - return - Sync_to_Avalon(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_test.py b/pype/ftrack/events/event_test.py index 94d99dbf67..a909aa5510 100644 --- a/pype/ftrack/events/event_test.py +++ b/pype/ftrack/events/event_test.py @@ -8,7 +8,7 @@ from pype.ftrack import BaseEvent class Test_Event(BaseEvent): ignore_me = True - + priority = 10000 def launch(self, session, event): @@ -22,7 +22,5 @@ class Test_Event(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return Test_Event(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_thumbnail_updates.py b/pype/ftrack/events/event_thumbnail_updates.py index 51bb15a4c7..ae6f8adb5e 100644 --- a/pype/ftrack/events/event_thumbnail_updates.py +++ b/pype/ftrack/events/event_thumbnail_updates.py @@ -47,7 +47,5 @@ class ThumbnailEvents(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return ThumbnailEvents(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_user_assigment.py b/pype/ftrack/events/event_user_assigment.py index fe8b331629..61699d736c 100644 --- a/pype/ftrack/events/event_user_assigment.py +++ b/pype/ftrack/events/event_user_assigment.py @@ -233,7 +233,5 @@ def register(session, plugins_presets): """ Register plugin. Called when used as an plugin. """ - if not isinstance(session, ftrack_api.session.Session): - return UserAssigmentEvent(session, plugins_presets).register() diff --git a/pype/ftrack/events/event_version_to_task_statuses.py b/pype/ftrack/events/event_version_to_task_statuses.py index 85a31383d5..66a55c0cf7 100644 --- a/pype/ftrack/events/event_version_to_task_statuses.py +++ b/pype/ftrack/events/event_version_to_task_statuses.py @@ -71,7 +71,5 @@ class VersionToTaskStatus(BaseEvent): def register(session, plugins_presets): '''Register plugin. Called when used as an plugin.''' - if not isinstance(session, ftrack_api.session.Session): - return VersionToTaskStatus(session, plugins_presets).register() diff --git a/pype/ftrack/ftrack_server/__init__.py b/pype/ftrack/ftrack_server/__init__.py index 4a64ab8848..0861a1bc08 100644 --- a/pype/ftrack/ftrack_server/__init__.py +++ b/pype/ftrack/ftrack_server/__init__.py @@ -1,7 +1 @@ from .ftrack_server import FtrackServer -from . import event_server_cli - -__all__ = [ - 'event_server_cli', - 'FtrackServer' -] diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py index e06a626468..2106da3b5f 100644 --- a/pype/ftrack/ftrack_server/event_server_cli.py +++ b/pype/ftrack/ftrack_server/event_server_cli.py @@ -1,18 +1,34 @@ import os import sys +import signal +import datetime +import subprocess +import socket import argparse +import atexit +import time +from urllib.parse import urlparse + import requests from pype.vendor import ftrack_api -from pype.ftrack import credentials +from pype.ftrack.lib import credentials from pype.ftrack.ftrack_server import FtrackServer -from pypeapp import Logger - -log = Logger().get_logger('Ftrack event server', "ftrack-event-server-cli") +from pype.ftrack.ftrack_server.lib import ftrack_events_mongo_settings +import socket_thread -def check_url(url): +class MongoPermissionsError(Exception): + """Is used when is created multiple objects of same RestApi class.""" + def __init__(self, message=None): + if not message: + message = "Exiting because have issue with acces to MongoDB" + super().__init__(message) + + +def check_ftrack_url(url, log_errors=True): + """Checks if Ftrack server is responding""" if not url: - log.error('Ftrack URL is not set!') + print('ERROR: Ftrack URL is not set!') return None url = url.strip('/ ') @@ -25,24 +41,47 @@ def check_url(url): try: result = requests.get(url, allow_redirects=False) except requests.exceptions.RequestException: - log.error('Entered Ftrack URL is not accesible!') - return None + if log_errors: + print('ERROR: Entered Ftrack URL is not accesible!') + return False if (result.status_code != 200 or 'FTRACK_VERSION' not in result.headers): - log.error('Entered Ftrack URL is not accesible!') - return None + if log_errors: + print('ERROR: Entered Ftrack URL is not accesible!') + return False - log.debug('Ftrack server {} is accessible.'.format(url)) + print('DEBUG: Ftrack server {} is accessible.'.format(url)) return url + +def check_mongo_url(host, port, log_error=False): + """Checks if mongo server is responding""" + sock = None + try: + sock = socket.create_connection( + (host, port), + timeout=1 + ) + return True + except socket.error as err: + if log_error: + print("Can't connect to MongoDB at {}:{} because: {}".format( + host, port, err + )) + return False + finally: + if sock is not None: + sock.close() + + def validate_credentials(url, user, api): first_validation = True if not user: - log.error('Ftrack Username is not set! Exiting.') + print('ERROR: Ftrack Username is not set! Exiting.') first_validation = False if not api: - log.error('Ftrack API key is not set! Exiting.') + print('ERROR: Ftrack API key is not set! Exiting.') first_validation = False if not first_validation: return False @@ -55,21 +94,21 @@ def validate_credentials(url, user, api): ) session.close() except Exception as e: - log.error( - 'Can\'t log into Ftrack with used credentials:' + print( + 'ERROR: Can\'t log into Ftrack with used credentials:' ' Ftrack server: "{}" // Username: {} // API key: {}'.format( url, user, api )) return False - log.debug('Credentials Username: "{}", API key: "{}" are valid.'.format( + print('DEBUG: Credentials Username: "{}", API key: "{}" are valid.'.format( user, api )) return True def process_event_paths(event_paths): - log.debug('Processing event paths: {}.'.format(str(event_paths))) + print('DEBUG: Processing event paths: {}.'.format(str(event_paths))) return_paths = [] not_found = [] if not event_paths: @@ -87,14 +126,249 @@ def process_event_paths(event_paths): return os.pathsep.join(return_paths), not_found -def run_event_server(ftrack_url, username, api_key, event_paths): - os.environ['FTRACK_SERVER'] = ftrack_url - os.environ['FTRACK_API_USER'] = username - os.environ['FTRACK_API_KEY'] = api_key - os.environ['FTRACK_EVENTS_PATH'] = event_paths +def old_way_server(ftrack_url): + # Current file + file_path = os.path.dirname(os.path.realpath(__file__)) + + min_fail_seconds = 5 + max_fail_count = 3 + wait_time_after_max_fail = 10 + + subproc = None + subproc_path = "{}/sub_old_way.py".format(file_path) + subproc_last_failed = datetime.datetime.now() + subproc_failed_count = 0 + + ftrack_accessible = False + printed_ftrack_error = False + + while True: + if not ftrack_accessible: + ftrack_accessible = check_ftrack_url(ftrack_url) + + # Run threads only if Ftrack is accessible + if not ftrack_accessible and not printed_ftrack_error: + print("Can't access Ftrack {} <{}>".format( + ftrack_url, str(datetime.datetime.now()) + )) + if subproc is not None: + if subproc.poll() is None: + subproc.terminate() + + subproc = None + + printed_ftrack_error = True + + time.sleep(1) + continue + + printed_ftrack_error = False + + if subproc is None: + if subproc_failed_count < max_fail_count: + subproc = subprocess.Popen( + ["python", subproc_path], + stdout=subprocess.PIPE + ) + elif subproc_failed_count == max_fail_count: + print(( + "Storer failed {}times I'll try to run again {}s later" + ).format(str(max_fail_count), str(wait_time_after_max_fail))) + subproc_failed_count += 1 + elif (( + datetime.datetime.now() - subproc_last_failed + ).seconds > wait_time_after_max_fail): + subproc_failed_count = 0 + + # If thread failed test Ftrack and Mongo connection + elif subproc.poll() is not None: + subproc = None + ftrack_accessible = False + + _subproc_last_failed = datetime.datetime.now() + delta_time = (_subproc_last_failed - subproc_last_failed).seconds + if delta_time < min_fail_seconds: + subproc_failed_count += 1 + else: + subproc_failed_count = 0 + subproc_last_failed = _subproc_last_failed + + time.sleep(1) + + +def main_loop(ftrack_url): + """ This is main loop of event handling. + + Loop is handling threads which handles subprocesses of event storer and + processor. When one of threads is stopped it is tested to connect to + ftrack and mongo server. Threads are not started when ftrack or mongo + server is not accessible. When threads are started it is checked for socket + signals as heartbeat. Heartbeat must become at least once per 30sec + otherwise thread will be killed. + """ + + # Get mongo hostname and port for testing mongo connection + mongo_list = ftrack_events_mongo_settings() + mongo_hostname = mongo_list[0] + mongo_port = mongo_list[1] + + # Current file + file_path = os.path.dirname(os.path.realpath(__file__)) + + min_fail_seconds = 5 + max_fail_count = 3 + wait_time_after_max_fail = 10 + + # Threads data + storer_name = "StorerThread" + storer_port = 10001 + storer_path = "{}/sub_event_storer.py".format(file_path) + storer_thread = None + storer_last_failed = datetime.datetime.now() + storer_failed_count = 0 + + processor_name = "ProcessorThread" + processor_port = 10011 + processor_path = "{}/sub_event_processor.py".format(file_path) + processor_thread = None + processor_last_failed = datetime.datetime.now() + processor_failed_count = 0 + + ftrack_accessible = False + mongo_accessible = False + + printed_ftrack_error = False + printed_mongo_error = False + + # stop threads on exit + # TODO check if works and args have thread objects! + def on_exit(processor_thread, storer_thread): + if processor_thread is not None: + processor_thread.stop() + processor_thread.join() + processor_thread = None + + if storer_thread is not None: + storer_thread.stop() + storer_thread.join() + storer_thread = None + + atexit.register( + on_exit, processor_thread=processor_thread, storer_thread=storer_thread + ) + # Main loop + while True: + # Check if accessible Ftrack and Mongo url + if not ftrack_accessible: + ftrack_accessible = check_ftrack_url(ftrack_url) + + if not mongo_accessible: + mongo_accessible = check_mongo_url(mongo_hostname, mongo_port) + + # Run threads only if Ftrack is accessible + if not ftrack_accessible or not mongo_accessible: + if not mongo_accessible and not printed_mongo_error: + print("Can't access Mongo {}".format(mongo_url)) + + if not ftrack_accessible and not printed_ftrack_error: + print("Can't access Ftrack {}".format(ftrack_url)) + + if storer_thread is not None: + storer_thread.stop() + storer_thread.join() + storer_thread = None + + if processor_thread is not None: + processor_thread.stop() + processor_thread.join() + processor_thread = None + + printed_ftrack_error = True + printed_mongo_error = True + + time.sleep(1) + continue + + printed_ftrack_error = False + printed_mongo_error = False + + # Run backup thread which does not requeire mongo to work + if storer_thread is None: + if storer_failed_count < max_fail_count: + storer_thread = socket_thread.SocketThread( + storer_name, storer_port, storer_path + ) + storer_thread.start() + elif storer_failed_count == max_fail_count: + print(( + "Storer failed {}times I'll try to run again {}s later" + ).format(str(max_fail_count), str(wait_time_after_max_fail))) + storer_failed_count += 1 + elif (( + datetime.datetime.now() - storer_last_failed + ).seconds > wait_time_after_max_fail): + storer_failed_count = 0 + + # If thread failed test Ftrack and Mongo connection + elif not storer_thread.isAlive(): + if storer_thread.mongo_error: + raise MongoPermissionsError() + storer_thread.join() + storer_thread = None + ftrack_accessible = False + mongo_accessible = False + + _storer_last_failed = datetime.datetime.now() + delta_time = (_storer_last_failed - storer_last_failed).seconds + if delta_time < min_fail_seconds: + storer_failed_count += 1 + else: + storer_failed_count = 0 + storer_last_failed = _storer_last_failed + + if processor_thread is None: + if processor_failed_count < max_fail_count: + processor_thread = socket_thread.SocketThread( + processor_name, processor_port, processor_path + ) + processor_thread.start() + + elif processor_failed_count == max_fail_count: + print(( + "Processor failed {}times in row" + " I'll try to run again {}s later" + ).format(str(max_fail_count), str(wait_time_after_max_fail))) + processor_failed_count += 1 + + elif (( + datetime.datetime.now() - processor_last_failed + ).seconds > wait_time_after_max_fail): + processor_failed_count = 0 + + # If thread failed test Ftrack and Mongo connection + elif not processor_thread.isAlive(): + if storer_thread.mongo_error: + raise Exception( + "Exiting because have issue with acces to MongoDB" + ) + processor_thread.join() + processor_thread = None + ftrack_accessible = False + mongo_accessible = False + + _processor_last_failed = datetime.datetime.now() + delta_time = ( + _processor_last_failed - processor_last_failed + ).seconds + + if delta_time < min_fail_seconds: + processor_failed_count += 1 + else: + processor_failed_count = 0 + processor_last_failed = _processor_last_failed + + time.sleep(1) - server = FtrackServer('event') - server.run_server() def main(argv): ''' @@ -184,7 +458,11 @@ def main(argv): help="Load creadentials from apps dir", action="store_true" ) - + parser.add_argument( + '-oldway', + help="Load creadentials from apps dir", + action="store_true" + ) ftrack_url = os.environ.get('FTRACK_SERVER') username = os.environ.get('FTRACK_API_USER') api_key = os.environ.get('FTRACK_API_KEY') @@ -209,8 +487,9 @@ def main(argv): if kwargs.ftrackapikey: api_key = kwargs.ftrackapikey + oldway = kwargs.oldway # Check url regex and accessibility - ftrack_url = check_url(ftrack_url) + ftrack_url = check_ftrack_url(ftrack_url) if not ftrack_url: return 1 @@ -221,21 +500,40 @@ def main(argv): # Process events path event_paths, not_found = process_event_paths(event_paths) if not_found: - log.warning( - 'These paths were not found: {}'.format(str(not_found)) + print( + 'WARNING: These paths were not found: {}'.format(str(not_found)) ) if not event_paths: if not_found: - log.error('Any of entered paths is valid or can be accesible.') + print('ERROR: Any of entered paths is valid or can be accesible.') else: - log.error('Paths to events are not set. Exiting.') + print('ERROR: Paths to events are not set. Exiting.') return 1 if kwargs.storecred: credentials._save_credentials(username, api_key, True) - run_event_server(ftrack_url, username, api_key, event_paths) + # Set Ftrack environments + os.environ["FTRACK_SERVER"] = ftrack_url + os.environ["FTRACK_API_USER"] = username + os.environ["FTRACK_API_KEY"] = api_key + os.environ["FTRACK_EVENTS_PATH"] = event_paths + + if oldway: + return old_way_server(ftrack_url) + + return main_loop(ftrack_url) -if (__name__ == ('__main__')): +if __name__ == "__main__": + # Register interupt signal + def signal_handler(sig, frame): + print("You pressed Ctrl+C. Process ended.") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + if hasattr(signal, "SIGKILL"): + signal.signal(signal.SIGKILL, signal_handler) + sys.exit(main(sys.argv)) diff --git a/pype/ftrack/ftrack_server/ftrack_server.py b/pype/ftrack/ftrack_server/ftrack_server.py index e1c13cda32..12b046c510 100644 --- a/pype/ftrack/ftrack_server/ftrack_server.py +++ b/pype/ftrack/ftrack_server/ftrack_server.py @@ -126,23 +126,27 @@ class FtrackServer: msg = '"{}" - register was not successful ({})'.format( function_dict['name'], str(exc) ) - log.warning(msg) + log.warning(msg, exc_info=True) - def run_server(self): - self.session = ftrack_api.Session(auto_connect_event_hub=True,) + def run_server(self, session=None, load_files=True): + if not session: + session = ftrack_api.Session(auto_connect_event_hub=True) - paths_str = os.environ.get(self.env_key) - if paths_str is None: - log.error(( - "Env var \"{}\" is not set, \"{}\" server won\'t launch" - ).format(self.env_key, self.server_type)) - return + self.session = session - paths = paths_str.split(os.pathsep) - self.set_files(paths) + if load_files: + paths_str = os.environ.get(self.env_key) + if paths_str is None: + log.error(( + "Env var \"{}\" is not set, \"{}\" server won\'t launch" + ).format(self.env_key, self.server_type)) + return - log.info(60*"*") - log.info('Registration of actions/events has finished!') + paths = paths_str.split(os.pathsep) + self.set_files(paths) + + log.info(60*"*") + log.info('Registration of actions/events has finished!') # keep event_hub on session running self.session.event_hub.wait() diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py new file mode 100644 index 0000000000..12159693fe --- /dev/null +++ b/pype/ftrack/ftrack_server/lib.py @@ -0,0 +1,68 @@ +import os +try: + from urllib.parse import urlparse, parse_qs +except ImportError: + from urlparse import urlparse, parse_qs + + +def ftrack_events_mongo_settings(): + host = None + port = None + username = None + password = None + collection = None + database = None + auth_db = "" + + if os.environ.get('FTRACK_EVENTS_MONGO_URL'): + result = urlparse(os.environ['FTRACK_EVENTS_MONGO_URL']) + + host = result.hostname + try: + port = result.port + except ValueError: + raise RuntimeError("invalid port specified") + username = result.username + password = result.password + try: + database = result.path.lstrip("/").split("/")[0] + collection = result.path.lstrip("/").split("/")[1] + except IndexError: + if not database: + raise RuntimeError("missing database name for logging") + try: + auth_db = parse_qs(result.query)['authSource'][0] + except KeyError: + # no auth db provided, mongo will use the one we are connecting to + pass + else: + host = os.environ.get('FTRACK_EVENTS_MONGO_HOST') + port = int(os.environ.get('FTRACK_EVENTS_MONGO_PORT', "0")) + database = os.environ.get('FTRACK_EVENTS_MONGO_DB') + username = os.environ.get('FTRACK_EVENTS_MONGO_USER') + password = os.environ.get('FTRACK_EVENTS_MONGO_PASSWORD') + collection = os.environ.get('FTRACK_EVENTS_MONGO_COL') + auth_db = os.environ.get('FTRACK_EVENTS_MONGO_AUTH_DB', 'avalon') + + return host, port, database, username, password, collection, auth_db + + +def get_ftrack_event_mongo_info(): + host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings() + user_pass = "" + if username and password: + user_pass = "{}:{}@".format(username, password) + + socket_path = "{}:{}".format(host, port) + + dab = "" + if database: + dab = "/{}".format(database) + + auth = "" + if auth_db: + auth = "?authSource={}".format(auth_db) + + url = "mongodb://{}{}{}{}".format(user_pass, socket_path, dab, auth) + + return url, database, collection diff --git a/pype/ftrack/ftrack_server/session_processor.py b/pype/ftrack/ftrack_server/session_processor.py new file mode 100644 index 0000000000..2f5818aab4 --- /dev/null +++ b/pype/ftrack/ftrack_server/session_processor.py @@ -0,0 +1,292 @@ +import logging +import os +import atexit +import datetime +import tempfile +import threading +import time +import requests +import queue +import pymongo + +import ftrack_api +import ftrack_api.session +import ftrack_api.cache +import ftrack_api.operation +import ftrack_api._centralized_storage_scenario +import ftrack_api.event +from ftrack_api.logging import LazyLogMessage as L + +from pype.ftrack.lib.custom_db_connector import DbConnector +from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info +from pypeapp import Logger + +log = Logger().get_logger("Session processor") + + +class ProcessEventHub(ftrack_api.event.hub.EventHub): + url, database, table_name = get_ftrack_event_mongo_info() + + is_table_created = False + + def __init__(self, *args, **kwargs): + self.dbcon = DbConnector( + mongo_url=self.url, + database_name=self.database, + table_name=self.table_name + ) + self.sock = kwargs.pop("sock") + super(ProcessEventHub, self).__init__(*args, **kwargs) + + def prepare_dbcon(self): + try: + self.dbcon.install() + dbcon._database.collection_names() + except pymongo.errors.AutoReconnect: + log.error("Mongo server \"{}\" is not responding, exiting.".format( + os.environ["AVALON_MONGO"] + )) + sys.exit(0) + + except pymongo.errors.OperationFailure: + log.error(( + "Error with Mongo access, probably permissions." + "Check if exist database with name \"{}\"" + " and collection \"{}\" inside." + ).format(self.database, self.table_name)) + self.sock.sendall(b"MongoError") + sys.exit(0) + + def wait(self, duration=None): + """Overriden wait + + Event are loaded from Mongo DB when queue is empty. Handled event is + set as processed in Mongo DB. + """ + started = time.time() + self.prepare_dbcon() + while True: + try: + event = self._event_queue.get(timeout=0.1) + except queue.Empty: + if not self.load_events(): + time.sleep(0.5) + else: + try: + self._handle(event) + self.dbcon.update_one( + {"id": event["id"]}, + {"$set": {"pype_data.is_processed": True}} + ) + except pymongo.errors.AutoReconnect: + log.error(( + "Mongo server \"{}\" is not responding, exiting." + ).format(os.environ["AVALON_MONGO"])) + sys.exit(0) + # Additional special processing of events. + if event['topic'] == 'ftrack.meta.disconnected': + break + + if duration is not None: + if (time.time() - started) > duration: + break + + def load_events(self): + """Load not processed events sorted by stored date""" + ago_date = datetime.datetime.now() - datetime.timedelta(days=3) + result = self.dbcon.delete_many({ + "pype_data.stored": {"$lte": ago_date}, + "pype_data.is_processed": True + }) + + not_processed_events = self.dbcon.find( + {"pype_data.is_processed": False} + ).sort( + [("pype_data.stored", pymongo.ASCENDING)] + ) + + found = False + for event_data in not_processed_events: + new_event_data = { + k: v for k, v in event_data.items() + if k not in ["_id", "pype_data"] + } + try: + event = ftrack_api.event.base.Event(**new_event_data) + except Exception: + self.logger.exception(L( + 'Failed to convert payload into event: {0}', + event_data + )) + continue + found = True + self._event_queue.put(event) + + return found + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which skip events and extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "event": + return + if code_name == "heartbeat": + self.sock.sendall(b"processor") + return self._send_packet(self._code_name_mapping["heartbeat"]) + + return super()._handle_packet(code, packet_identifier, path, data) + + +class ProcessSession(ftrack_api.session.Session): + '''An isolated session for interaction with an ftrack server.''' + def __init__( + self, server_url=None, api_key=None, api_user=None, auto_populate=True, + plugin_paths=None, cache=None, cache_key_maker=None, + auto_connect_event_hub=None, schema_cache_path=None, + plugin_arguments=None, sock=None + ): + super(ftrack_api.session.Session, self).__init__() + self.logger = logging.getLogger( + __name__ + '.' + self.__class__.__name__ + ) + self._closed = False + + if server_url is None: + server_url = os.environ.get('FTRACK_SERVER') + + if not server_url: + raise TypeError( + 'Required "server_url" not specified. Pass as argument or set ' + 'in environment variable FTRACK_SERVER.' + ) + + self._server_url = server_url + + if api_key is None: + api_key = os.environ.get( + 'FTRACK_API_KEY', + # Backwards compatibility + os.environ.get('FTRACK_APIKEY') + ) + + if not api_key: + raise TypeError( + 'Required "api_key" not specified. Pass as argument or set in ' + 'environment variable FTRACK_API_KEY.' + ) + + self._api_key = api_key + + if api_user is None: + api_user = os.environ.get('FTRACK_API_USER') + if not api_user: + try: + api_user = getpass.getuser() + except Exception: + pass + + if not api_user: + raise TypeError( + 'Required "api_user" not specified. Pass as argument, set in ' + 'environment variable FTRACK_API_USER or one of the standard ' + 'environment variables used by Python\'s getpass module.' + ) + + self._api_user = api_user + + # Currently pending operations. + self.recorded_operations = ftrack_api.operation.Operations() + self.record_operations = True + + self.cache_key_maker = cache_key_maker + if self.cache_key_maker is None: + self.cache_key_maker = ftrack_api.cache.StringKeyMaker() + + # Enforce always having a memory cache at top level so that the same + # in-memory instance is returned from session. + self.cache = ftrack_api.cache.LayeredCache([ + ftrack_api.cache.MemoryCache() + ]) + + if cache is not None: + if callable(cache): + cache = cache(self) + + if cache is not None: + self.cache.caches.append(cache) + + self._managed_request = None + self._request = requests.Session() + self._request.auth = ftrack_api.session.SessionAuthentication( + self._api_key, self._api_user + ) + + self.auto_populate = auto_populate + + # Fetch server information and in doing so also check credentials. + self._server_information = self._fetch_server_information() + + # Now check compatibility of server based on retrieved information. + self.check_server_compatibility() + + # Construct event hub and load plugins. + self._event_hub = ProcessEventHub( + self._server_url, + self._api_user, + self._api_key, + sock=sock + ) + + self._auto_connect_event_hub_thread = None + if auto_connect_event_hub in (None, True): + # Connect to event hub in background thread so as not to block main + # session usage waiting for event hub connection. + self._auto_connect_event_hub_thread = threading.Thread( + target=self._event_hub.connect + ) + self._auto_connect_event_hub_thread.daemon = True + self._auto_connect_event_hub_thread.start() + + # To help with migration from auto_connect_event_hub default changing + # from True to False. + self._event_hub._deprecation_warning_auto_connect = ( + auto_connect_event_hub is None + ) + + # Register to auto-close session on exit. + atexit.register(self.close) + + self._plugin_paths = plugin_paths + if self._plugin_paths is None: + self._plugin_paths = os.environ.get( + 'FTRACK_EVENT_PLUGIN_PATH', '' + ).split(os.pathsep) + + self._discover_plugins(plugin_arguments=plugin_arguments) + + # TODO: Make schemas read-only and non-mutable (or at least without + # rebuilding types)? + if schema_cache_path is not False: + if schema_cache_path is None: + schema_cache_path = os.environ.get( + 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() + ) + + schema_cache_path = os.path.join( + schema_cache_path, 'ftrack_api_schema_cache.json' + ) + + self.schemas = self._load_schemas(schema_cache_path) + self.types = self._build_entity_type_classes(self.schemas) + + ftrack_api._centralized_storage_scenario.register(self) + + self._configure_locations() + self.event_hub.publish( + ftrack_api.event.base.Event( + topic='ftrack.api.session.ready', + data=dict( + session=self + ) + ), + synchronous=True + ) diff --git a/pype/ftrack/ftrack_server/session_storer.py b/pype/ftrack/ftrack_server/session_storer.py new file mode 100644 index 0000000000..b3201c9e4d --- /dev/null +++ b/pype/ftrack/ftrack_server/session_storer.py @@ -0,0 +1,257 @@ +import logging +import os +import atexit +import tempfile +import threading +import requests + +import ftrack_api +import ftrack_api.session +import ftrack_api.cache +import ftrack_api.operation +import ftrack_api._centralized_storage_scenario +import ftrack_api.event +from ftrack_api.logging import LazyLogMessage as L + + +class StorerEventHub(ftrack_api.event.hub.EventHub): + def __init__(self, *args, **kwargs): + self.sock = kwargs.pop("sock") + super(StorerEventHub, self).__init__(*args, **kwargs) + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which extend heartbeat""" + if self._code_name_mapping[code] == "heartbeat": + # Reply with heartbeat. + self.sock.sendall(b"storer") + return self._send_packet(self._code_name_mapping['heartbeat']) + + return super(StorerEventHub, self)._handle_packet( + code, packet_identifier, path, data + ) + + +class StorerSession(ftrack_api.session.Session): + '''An isolated session for interaction with an ftrack server.''' + def __init__( + self, server_url=None, api_key=None, api_user=None, auto_populate=True, + plugin_paths=None, cache=None, cache_key_maker=None, + auto_connect_event_hub=None, schema_cache_path=None, + plugin_arguments=None, sock=None + ): + '''Initialise session. + + *server_url* should be the URL of the ftrack server to connect to + including any port number. If not specified attempt to look up from + :envvar:`FTRACK_SERVER`. + + *api_key* should be the API key to use for authentication whilst + *api_user* should be the username of the user in ftrack to record + operations against. If not specified, *api_key* should be retrieved + from :envvar:`FTRACK_API_KEY` and *api_user* from + :envvar:`FTRACK_API_USER`. + + If *auto_populate* is True (the default), then accessing entity + attributes will cause them to be automatically fetched from the server + if they are not already. This flag can be changed on the session + directly at any time. + + *plugin_paths* should be a list of paths to search for plugins. If not + specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`. + + *cache* should be an instance of a cache that fulfils the + :class:`ftrack_api.cache.Cache` interface and will be used as the cache + for the session. It can also be a callable that will be called with the + session instance as sole argument. The callable should return ``None`` + if a suitable cache could not be configured, but session instantiation + can continue safely. + + .. note:: + + The session will add the specified cache to a pre-configured layered + cache that specifies the top level cache as a + :class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary + to construct a separate memory cache for typical behaviour. Working + around this behaviour or removing the memory cache can lead to + unexpected behaviour. + + *cache_key_maker* should be an instance of a key maker that fulfils the + :class:`ftrack_api.cache.KeyMaker` interface and will be used to + generate keys for objects being stored in the *cache*. If not specified, + a :class:`~ftrack_api.cache.StringKeyMaker` will be used. + + If *auto_connect_event_hub* is True then embedded event hub will be + automatically connected to the event server and allow for publishing and + subscribing to **non-local** events. If False, then only publishing and + subscribing to **local** events will be possible until the hub is + manually connected using :meth:`EventHub.connect + `. + + .. note:: + + The event hub connection is performed in a background thread to + improve session startup time. If a registered plugin requires a + connected event hub then it should check the event hub connection + status explicitly. Subscribing to events does *not* require a + connected event hub. + + Enable schema caching by setting *schema_cache_path* to a folder path. + If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to + determine the path to store cache in. If the environment variable is + also not specified then a temporary directory will be used. Set to + `False` to disable schema caching entirely. + + *plugin_arguments* should be an optional mapping (dict) of keyword + arguments to pass to plugin register functions upon discovery. If a + discovered plugin has a signature that is incompatible with the passed + arguments, the discovery mechanism will attempt to reduce the passed + arguments to only those that the plugin accepts. Note that a warning + will be logged in this case. + + ''' + super(ftrack_api.session.Session, self).__init__() + self.logger = logging.getLogger( + __name__ + '.' + self.__class__.__name__ + ) + self._closed = False + + if server_url is None: + server_url = os.environ.get('FTRACK_SERVER') + + if not server_url: + raise TypeError( + 'Required "server_url" not specified. Pass as argument or set ' + 'in environment variable FTRACK_SERVER.' + ) + + self._server_url = server_url + + if api_key is None: + api_key = os.environ.get( + 'FTRACK_API_KEY', + # Backwards compatibility + os.environ.get('FTRACK_APIKEY') + ) + + if not api_key: + raise TypeError( + 'Required "api_key" not specified. Pass as argument or set in ' + 'environment variable FTRACK_API_KEY.' + ) + + self._api_key = api_key + + if api_user is None: + api_user = os.environ.get('FTRACK_API_USER') + if not api_user: + try: + api_user = getpass.getuser() + except Exception: + pass + + if not api_user: + raise TypeError( + 'Required "api_user" not specified. Pass as argument, set in ' + 'environment variable FTRACK_API_USER or one of the standard ' + 'environment variables used by Python\'s getpass module.' + ) + + self._api_user = api_user + + # Currently pending operations. + self.recorded_operations = ftrack_api.operation.Operations() + self.record_operations = True + + self.cache_key_maker = cache_key_maker + if self.cache_key_maker is None: + self.cache_key_maker = ftrack_api.cache.StringKeyMaker() + + # Enforce always having a memory cache at top level so that the same + # in-memory instance is returned from session. + self.cache = ftrack_api.cache.LayeredCache([ + ftrack_api.cache.MemoryCache() + ]) + + if cache is not None: + if callable(cache): + cache = cache(self) + + if cache is not None: + self.cache.caches.append(cache) + + self._managed_request = None + self._request = requests.Session() + self._request.auth = ftrack_api.session.SessionAuthentication( + self._api_key, self._api_user + ) + + self.auto_populate = auto_populate + + # Fetch server information and in doing so also check credentials. + self._server_information = self._fetch_server_information() + + # Now check compatibility of server based on retrieved information. + self.check_server_compatibility() + + # Construct event hub and load plugins. + self._event_hub = StorerEventHub( + self._server_url, + self._api_user, + self._api_key, + sock=sock + ) + + self._auto_connect_event_hub_thread = None + if auto_connect_event_hub in (None, True): + # Connect to event hub in background thread so as not to block main + # session usage waiting for event hub connection. + self._auto_connect_event_hub_thread = threading.Thread( + target=self._event_hub.connect + ) + self._auto_connect_event_hub_thread.daemon = True + self._auto_connect_event_hub_thread.start() + + # To help with migration from auto_connect_event_hub default changing + # from True to False. + self._event_hub._deprecation_warning_auto_connect = ( + auto_connect_event_hub is None + ) + + # Register to auto-close session on exit. + atexit.register(self.close) + + self._plugin_paths = plugin_paths + if self._plugin_paths is None: + self._plugin_paths = os.environ.get( + 'FTRACK_EVENT_PLUGIN_PATH', '' + ).split(os.pathsep) + + self._discover_plugins(plugin_arguments=plugin_arguments) + + # TODO: Make schemas read-only and non-mutable (or at least without + # rebuilding types)? + if schema_cache_path is not False: + if schema_cache_path is None: + schema_cache_path = os.environ.get( + 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() + ) + + schema_cache_path = os.path.join( + schema_cache_path, 'ftrack_api_schema_cache.json' + ) + + self.schemas = self._load_schemas(schema_cache_path) + self.types = self._build_entity_type_classes(self.schemas) + + ftrack_api._centralized_storage_scenario.register(self) + + self._configure_locations() + self.event_hub.publish( + ftrack_api.event.base.Event( + topic='ftrack.api.session.ready', + data=dict( + session=self + ) + ), + synchronous=True + ) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py new file mode 100644 index 0000000000..d0a2868743 --- /dev/null +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -0,0 +1,123 @@ +import os +import sys +import time +import signal +import socket +import threading +import subprocess +from pypeapp import Logger + + +class SocketThread(threading.Thread): + """Thread that checks suprocess of storer of processor of events""" + MAX_TIMEOUT = 35 + def __init__(self, name, port, filepath): + super(SocketThread, self).__init__() + self.log = Logger().get_logger("SocketThread", "Event Thread") + self.setName(name) + self.name = name + self.port = port + self.filepath = filepath + self.sock = None + self.subproc = None + self.connection = None + self._is_running = False + self.finished = False + + self.mongo_error = False + + def stop(self): + self._is_running = False + + def run(self): + self._is_running = True + time_socket = time.time() + # Create a TCP/IP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock = sock + + # Bind the socket to the port - skip already used ports + while True: + try: + server_address = ("localhost", self.port) + sock.bind(server_address) + break + except OSError: + self.port += 1 + + self.log.debug( + "Running Socked thread on {}:{}".format(*server_address) + ) + + self.subproc = subprocess.Popen( + ["python", self.filepath, "-port", str(self.port)], + stdout=subprocess.PIPE + ) + + # Listen for incoming connections + sock.listen(1) + sock.settimeout(1.0) + while True: + if not self._is_running: + break + try: + connection, client_address = sock.accept() + time_socket = time.time() + connection.settimeout(1.0) + self.connection = connection + + except socket.timeout: + if (time.time() - time_socket) > self.MAX_TIMEOUT: + self.log.error("Connection timeout passed. Terminating.") + self._is_running = False + self.subproc.terminate() + break + continue + + try: + time_con = time.time() + # Receive the data in small chunks and retransmit it + while True: + try: + if not self._is_running: + break + try: + data = connection.recv(16) + time_con = time.time() + + except socket.timeout: + if (time.time() - time_con) > self.MAX_TIMEOUT: + self.log.error( + "Connection timeout passed. Terminating." + ) + self._is_running = False + self.subproc.terminate() + break + continue + + except ConnectionResetError: + self._is_running = False + break + + if data: + if data == b"MongoError": + self.mongo_error = True + connection.sendall(data) + + except Exception as exc: + self.log.error( + "Event server process failed", exc_info=True + ) + + finally: + # Clean up the connection + connection.close() + if self.subproc.poll() is None: + self.subproc.terminate() + + lines = self.subproc.stdout.readlines() + if lines: + print("*** Socked Thread stdout ***") + for line in lines: + os.write(1, line) + self.finished = True diff --git a/pype/ftrack/ftrack_server/sub_event_processor.py b/pype/ftrack/ftrack_server/sub_event_processor.py new file mode 100644 index 0000000000..9444fe3ff0 --- /dev/null +++ b/pype/ftrack/ftrack_server/sub_event_processor.py @@ -0,0 +1,53 @@ +import os +import sys +import datetime +import signal +import socket +import pymongo + +from ftrack_server import FtrackServer +from pype.ftrack.ftrack_server.session_processor import ProcessSession +from pypeapp import Logger + +log = Logger().get_logger("Event processor") + + +def main(args): + port = int(args[-1]) + # Create a TCP/IP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Connect the socket to the port where the server is listening + server_address = ("localhost", port) + log.debug("Processor connected to {} port {}".format(*server_address)) + sock.connect(server_address) + + sock.sendall(b"CreatedProcess") + try: + session = ProcessSession(auto_connect_event_hub=True, sock=sock) + server = FtrackServer('event') + log.debug("Launched Ftrack Event processor") + server.run_server(session) + + except Exception as exc: + import traceback + traceback.print_tb(exc.__traceback__) + + finally: + log.debug("First closing socket") + sock.close() + return 1 + + +if __name__ == "__main__": + # Register interupt signal + def signal_handler(sig, frame): + print("You pressed Ctrl+C. Process ended.") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + if hasattr(signal, "SIGKILL"): + signal.signal(signal.SIGKILL, signal_handler) + + sys.exit(main(sys.argv)) diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py new file mode 100644 index 0000000000..6e30fb99e2 --- /dev/null +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -0,0 +1,118 @@ +import os +import sys +import datetime +import signal +import socket +import pymongo + +from ftrack_server import FtrackServer +from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info +from pype.ftrack.lib.custom_db_connector import DbConnector +from session_storer import StorerSession +from pypeapp import Logger + +log = Logger().get_logger("Event storer") + +url, database, table_name = get_ftrack_event_mongo_info() + +dbcon = DbConnector( + mongo_url=url, + database_name=database, + table_name=table_name +) + +# ignore_topics = ["ftrack.meta.connected"] +ignore_topics = [] + +def install_db(): + try: + dbcon.install() + dbcon._database.collection_names() + except pymongo.errors.AutoReconnect: + log.error("Mongo server \"{}\" is not responding, exiting.".format( + os.environ["AVALON_MONGO"] + )) + sys.exit(0) + + +def launch(event): + if event.get("topic") in ignore_topics: + return + + event_data = event._data + event_id = event["id"] + + event_data["pype_data"] = { + "stored": datetime.datetime.utcnow(), + "is_processed": False + } + + try: + # dbcon.insert_one(event_data) + dbcon.update({"id": event_id}, event_data, upsert=True) + log.debug("Event: {} stored".format(event_id)) + + except pymongo.errors.AutoReconnect: + log.error("Mongo server \"{}\" is not responding, exiting.".format( + os.environ["AVALON_MONGO"] + )) + sys.exit(0) + + except Exception as exc: + log.error( + "Event: {} failed to store".format(event_id), + exc_info=True + ) + + +def register(session): + '''Registers the event, subscribing the discover and launch topics.''' + install_db() + session.event_hub.subscribe("topic=*", launch) + + +def main(args): + port = int(args[-1]) + + # Create a TCP/IP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Connect the socket to the port where the server is listening + server_address = ("localhost", port) + log.debug("Storer connected to {} port {}".format(*server_address)) + sock.connect(server_address) + sock.sendall(b"CreatedStore") + + try: + session = StorerSession(auto_connect_event_hub=True, sock=sock) + register(session) + server = FtrackServer("event") + log.debug("Launched Ftrack Event storer") + server.run_server(session, load_files=False) + + except pymongo.errors.OperationFailure: + log.error(( + "Error with Mongo access, probably permissions." + "Check if exist database with name \"{}\"" + " and collection \"{}\" inside." + ).format(database, table_name)) + sock.sendall(b"MongoError") + + finally: + log.debug("First closing socket") + sock.close() + return 1 + + +if __name__ == "__main__": + # Register interupt signal + def signal_handler(sig, frame): + print("You pressed Ctrl+C. Process ended.") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + if hasattr(signal, "SIGKILL"): + signal.signal(signal.SIGKILL, signal_handler) + + sys.exit(main(sys.argv)) diff --git a/pype/ftrack/ftrack_server/sub_old_way.py b/pype/ftrack/ftrack_server/sub_old_way.py new file mode 100644 index 0000000000..92e7c0cf8c --- /dev/null +++ b/pype/ftrack/ftrack_server/sub_old_way.py @@ -0,0 +1,100 @@ +import os +import sys +import time +import datetime +import signal +import threading + +from ftrack_server import FtrackServer +from pype.vendor import ftrack_api +from pype.vendor.ftrack_api.event.hub import EventHub +from pypeapp import Logger + +log = Logger().get_logger("Event Server Old") + + +class TimerChecker(threading.Thread): + max_time_out = 35 + + def __init__(self, server, session): + self.server = server + self.session = session + self.is_running = False + self.failed = False + super().__init__() + + def stop(self): + self.is_running = False + + def run(self): + start = datetime.datetime.now() + self.is_running = True + connected = False + + while True: + if not self.is_running: + break + + if not self.session.event_hub.connected: + if not connected: + if (datetime.datetime.now() - start).seconds > self.max_time_out: + log.error(( + "Exiting event server. Session was not connected" + " to ftrack server in {} seconds." + ).format(self.max_time_out)) + self.failed = True + break + else: + log.error( + "Exiting event server. Event Hub is not connected." + ) + self.server.stop_session() + self.failed = True + break + else: + if not connected: + connected = True + + time.sleep(1) + + +def main(args): + check_thread = None + try: + server = FtrackServer('event') + session = ftrack_api.Session(auto_connect_event_hub=True) + + check_thread = TimerChecker(server, session) + check_thread.start() + + log.debug("Launching Ftrack Event Old Way Server") + server.run_server(session) + + except Exception as exc: + import traceback + traceback.print_tb(exc.__traceback__) + + finally: + log_info = True + if check_thread is not None: + check_thread.stop() + check_thread.join() + if check_thread.failed: + log_info = False + if log_info: + log.info("Exiting Event server subprocess") + return 1 + + +if __name__ == "__main__": + # Register interupt signal + def signal_handler(sig, frame): + print("You pressed Ctrl+C. Process ended.") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + if hasattr(signal, "SIGKILL"): + signal.signal(signal.SIGKILL, signal_handler) + + sys.exit(main(sys.argv)) diff --git a/pype/ftrack/lib/custom_db_connector.py b/pype/ftrack/lib/custom_db_connector.py index 505ac96610..11fd197555 100644 --- a/pype/ftrack/lib/custom_db_connector.py +++ b/pype/ftrack/lib/custom_db_connector.py @@ -13,6 +13,7 @@ import logging import tempfile import functools import contextlib +import atexit import requests @@ -20,6 +21,9 @@ import requests import pymongo from pymongo.client_session import ClientSession +class NotActiveTable(Exception): + pass + def auto_reconnect(func): """Handling auto reconnect in 3 retry times""" @functools.wraps(func) @@ -37,12 +41,23 @@ def auto_reconnect(func): return decorated +def check_active_table(func): + """Handling auto reconnect in 3 retry times""" + @functools.wraps(func) + def decorated(obj, *args, **kwargs): + if not obj.active_table: + raise NotActiveTable("Active table is not set. (This is bug)") + return func(obj, *args, **kwargs) + + return decorated + + class DbConnector: log = logging.getLogger(__name__) timeout = 1000 - def __init__(self, mongo_url, database_name, table_name): + def __init__(self, mongo_url, database_name, table_name=None): self._mongo_client = None self._sentry_client = None self._sentry_logging_handler = None @@ -53,11 +68,17 @@ class DbConnector: self.active_table = table_name + def __getattribute__(self, attr): + try: + return super().__getattribute__(attr) + except AttributeError: + return self._database[self.active_table].__getattribute__(attr) + def install(self): """Establish a persistent connection to the database""" if self._is_installed: return - + atexit.register(self.uninstall) logging.basicConfig() self._mongo_client = pymongo.MongoClient( @@ -99,6 +120,16 @@ class DbConnector: self._mongo_client = None self._database = None self._is_installed = False + atexit.unregister(self.uninstall) + + def create_table(self, name, **options): + if self.exist_table(name): + return + + return self._database.create_collection(name, **options) + + def exist_table(self, table_name): + return table_name in self.tables() def tables(self): """List available tables @@ -115,93 +146,80 @@ class DbConnector: def collections(self): return self._database.collection_names() + @check_active_table @auto_reconnect - def insert_one(self, item, session=None): + def insert_one(self, item, **options): assert isinstance(item, dict), "item must be of type " - return self._database[self.active_table].insert_one( - item, - session=session - ) + return self._database[self.active_table].insert_one(item, **options) + @check_active_table @auto_reconnect - def insert_many(self, items, ordered=True, session=None): + def insert_many(self, items, ordered=True, **options): # check if all items are valid assert isinstance(items, list), "`items` must be of type " for item in items: assert isinstance(item, dict), "`item` must be of type " - return self._database[self.active_table].insert_many( - items, - ordered=ordered, - session=session - ) + options["ordered"] = ordered + return self._database[self.active_table].insert_many(items, **options) + @check_active_table @auto_reconnect - def find(self, filter, projection=None, sort=None, session=None): - return self._database[self.active_table].find( - filter=filter, - projection=projection, - sort=sort, - session=session - ) + def find(self, filter, projection=None, sort=None, **options): + options["projection"] = projection + options["sort"] = sort + return self._database[self.active_table].find(filter, **options) + @check_active_table @auto_reconnect - def find_one(self, filter, projection=None, sort=None, session=None): + def find_one(self, filter, projection=None, sort=None, **options): assert isinstance(filter, dict), "filter must be " - return self._database[self.active_table].find_one( - filter=filter, - projection=projection, - sort=sort, - session=session - ) + options["projection"] = projection + options["sort"] = sort + return self._database[self.active_table].find_one(filter, **options) + @check_active_table @auto_reconnect - def replace_one(self, filter, replacement, session=None): + def replace_one(self, filter, replacement, **options): return self._database[self.active_table].replace_one( - filter, replacement, - session=session + filter, replacement, **options ) + @check_active_table @auto_reconnect - def update_one(self, filter, update, session=None): + def update_one(self, filter, update, **options): return self._database[self.active_table].update_one( - filter, update, - session=session + filter, update, **options ) + @check_active_table @auto_reconnect - def update_many(self, filter, update, session=None): + def update_many(self, filter, update, **options): return self._database[self.active_table].update_many( - filter, update, - session=session + filter, update, **options ) + @check_active_table @auto_reconnect def distinct(self, *args, **kwargs): - return self._database[self.active_table].distinct( - *args, **kwargs - ) + return self._database[self.active_table].distinct(*args, **kwargs) + @check_active_table @auto_reconnect - def drop_collection(self, name_or_collection, session=None): + def drop_collection(self, name_or_collection, **options): return self._database[self.active_table].drop( - name_or_collection, - session=session + name_or_collection, **options ) + @check_active_table @auto_reconnect - def delete_one(filter, collation=None, session=None): - return self._database[self.active_table].delete_one( - filter, - collation=collation, - session=session - ) + def delete_one(self, filter, collation=None, **options): + options["collation"] = collation + return self._database[self.active_table].delete_one(filter, **options) + @check_active_table @auto_reconnect - def delete_many(filter, collation=None, session=None): - return self._database[self.active_table].delete_many( - filter, - collation=collation, - session=session - ) + def delete_many(self, filter, collation=None, **options): + options["collation"] = collation + return self._database[self.active_table].delete_many(filter, **options) diff --git a/pype/ftrack/lib/ftrack_base_handler.py b/pype/ftrack/lib/ftrack_base_handler.py index 9eda74f0f3..13e1cae9a9 100644 --- a/pype/ftrack/lib/ftrack_base_handler.py +++ b/pype/ftrack/lib/ftrack_base_handler.py @@ -3,6 +3,7 @@ import time from pypeapp import Logger from pype.vendor import ftrack_api from pype.vendor.ftrack_api import session as fa_session +from pype.ftrack.ftrack_server import session_processor class MissingPermision(Exception): @@ -31,8 +32,21 @@ class BaseHandler(object): def __init__(self, session, plugins_presets={}): '''Expects a ftrack_api.Session instance''' - self._session = session self.log = Logger().get_logger(self.__class__.__name__) + if not( + isinstance(session, ftrack_api.session.Session) or + isinstance(session, session_processor.ProcessSession) + ): + raise Exception(( + "Session object entered with args is instance of \"{}\"" + " but expected instances are \"{}\" and \"{}\"" + ).format( + str(type(session)), + str(ftrack_api.session.Session), + str(session_processor.ProcessSession) + )) + + self._session = session # Using decorator self.register = self.register_decorator(self.register)