Merged in feature/PYPE-349_parallel_event_server (pull request #330)

Feature/PYPE-349 parallel event server

Approved-by: Milan Kolar <milan@orbi.tools>
This commit is contained in:
Jakub Trllo 2019-10-31 18:06:54 +00:00 committed by Milan Kolar
commit 78519aa066
46 changed files with 1452 additions and 220 deletions

View file

@ -1,2 +1,2 @@
from .lib import * from .lib import *
from .ftrack_server import * from .ftrack_server import FtrackServer

View file

@ -281,7 +281,4 @@ class AttributesRemapper(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
AttributesRemapper(session, plugins_presets).register() AttributesRemapper(session, plugins_presets).register()

View file

@ -55,11 +55,8 @@ class ClientReviewSort(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.''' '''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ClientReviewSort(session, plugins_presets) ClientReviewSort(session, plugins_presets).register()
action_handler.register()
def main(arguments=None): def main(arguments=None):

View file

@ -68,12 +68,6 @@ class ComponentOpen(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.''' '''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
ComponentOpen(session, plugins_presets).register() ComponentOpen(session, plugins_presets).register()

View file

@ -572,12 +572,6 @@ class CustomAttributes(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
CustomAttributes(session, plugins_presets).register() CustomAttributes(session, plugins_presets).register()

View file

@ -327,9 +327,6 @@ class PartialDict(dict):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CreateFolders(session, plugins_presets).register() CreateFolders(session, plugins_presets).register()

View file

@ -198,9 +198,6 @@ class CreateProjectFolders(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CreateProjectFolders(session, plugins_presets).register() CreateProjectFolders(session, plugins_presets).register()

View file

@ -9,7 +9,7 @@ from pype.ftrack import BaseAction
class CustomAttributeDoctor(BaseAction): class CustomAttributeDoctor(BaseAction):
ignore_me = True ignore_me = True
#: Action identifier. #: Action identifier.
identifier = 'custom.attributes.doctor' identifier = 'custom.attributes.doctor'
@ -294,9 +294,6 @@ class CustomAttributeDoctor(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CustomAttributeDoctor(session, plugins_presets).register() CustomAttributeDoctor(session, plugins_presets).register()

View file

@ -85,7 +85,7 @@ class DeleteAsset(BaseAction):
'type': 'asset', 'type': 'asset',
'name': entity['name'] 'name': entity['name']
}) })
if av_entity is None: if av_entity is None:
return { return {
'success': False, 'success': False,
@ -314,12 +314,6 @@ class DeleteAsset(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
DeleteAsset(session, plugins_presets).register() DeleteAsset(session, plugins_presets).register()

View file

@ -135,12 +135,6 @@ class AssetsRemover(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
AssetsRemover(session, plugins_presets).register() AssetsRemover(session, plugins_presets).register()

View file

@ -220,8 +220,6 @@ class DJVViewAction(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
"""Register hooks.""" """Register hooks."""
if not isinstance(session, ftrack_api.session.Session):
return
DJVViewAction(session, plugins_presets).register() DJVViewAction(session, plugins_presets).register()

View file

@ -121,12 +121,6 @@ class JobKiller(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
JobKiller(session, plugins_presets).register() JobKiller(session, plugins_presets).register()

View file

@ -115,9 +115,6 @@ class MultipleNotes(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
MultipleNotes(session, plugins_presets).register() MultipleNotes(session, plugins_presets).register()

View file

@ -372,7 +372,4 @@ class PrepareProject(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
PrepareProject(session, plugins_presets).register() PrepareProject(session, plugins_presets).register()

View file

@ -328,8 +328,6 @@ class RVAction(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
"""Register hooks.""" """Register hooks."""
if not isinstance(session, ftrack_api.session.Session):
return
RVAction(session, plugins_presets).register() RVAction(session, plugins_presets).register()

View file

@ -26,7 +26,7 @@ class StartTimer(BaseAction):
user.start_timer(entity, force=True) user.start_timer(entity, force=True)
self.session.commit() self.session.commit()
self.log.info( self.log.info(
"Starting Ftrack timer for task: {}".format(entity['name']) "Starting Ftrack timer for task: {}".format(entity['name'])
) )
@ -37,7 +37,4 @@ class StartTimer(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
StartTimer(session, plugins_presets).register() StartTimer(session, plugins_presets).register()

View file

@ -309,9 +309,6 @@ class SyncHierarchicalAttrs(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register() SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -263,11 +263,4 @@ class SyncToAvalon(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
SyncToAvalon(session, plugins_presets).register() SyncToAvalon(session, plugins_presets).register()

View file

@ -43,9 +43,6 @@ class TestAction(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
TestAction(session, plugins_presets).register() TestAction(session, plugins_presets).register()

View file

@ -68,8 +68,6 @@ class ThumbToChildren(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.''' '''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbToChildren(session, plugins_presets).register() ThumbToChildren(session, plugins_presets).register()

View file

@ -90,8 +90,6 @@ class ThumbToParent(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.''' '''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbToParent(session, plugins_presets).register() ThumbToParent(session, plugins_presets).register()

View file

@ -40,7 +40,4 @@ class ActionAskWhereIRun(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ActionAskWhereIRun(session, plugins_presets).register() ActionAskWhereIRun(session, plugins_presets).register()

View file

@ -80,7 +80,4 @@ class ActionShowWhereIRun(BaseAction):
def register(session, plugins_presets={}): def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ActionShowWhereIRun(session, plugins_presets).register() ActionShowWhereIRun(session, plugins_presets).register()

View file

@ -220,7 +220,7 @@ class SyncHierarchicalAttrs(BaseAction):
if job['status'] in ('queued', 'running'): if job['status'] in ('queued', 'running'):
job['status'] = 'failed' job['status'] = 'failed'
session.commit() session.commit()
if self.interface_messages: if self.interface_messages:
self.show_interface_from_dict( self.show_interface_from_dict(
messages=self.interface_messages, messages=self.interface_messages,
@ -341,9 +341,6 @@ class SyncHierarchicalAttrs(BaseAction):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register() SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -296,9 +296,6 @@ def register(session, plugins_presets):
# Validate that session is an instance of ftrack_api.Session. If not, # Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and # assume that register is being called from an old or incompatible API and
# return without doing anything. # return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
SyncToAvalon(session, plugins_presets).register() SyncToAvalon(session, plugins_presets).register()

View file

@ -53,7 +53,5 @@ class DelAvalonIdFromNew(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
DelAvalonIdFromNew(session, plugins_presets).register() DelAvalonIdFromNew(session, plugins_presets).register()

View file

@ -88,7 +88,5 @@ class NextTaskUpdate(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
NextTaskUpdate(session, plugins_presets).register() NextTaskUpdate(session, plugins_presets).register()

View file

@ -36,7 +36,5 @@ class Radio_buttons(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Radio_buttons(session, plugins_presets).register() Radio_buttons(session, plugins_presets).register()

View file

@ -209,7 +209,5 @@ class SyncHierarchicalAttrs(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register() SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -122,8 +122,4 @@ class Sync_to_Avalon(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Sync_to_Avalon(session, plugins_presets).register() Sync_to_Avalon(session, plugins_presets).register()

View file

@ -8,7 +8,7 @@ from pype.ftrack import BaseEvent
class Test_Event(BaseEvent): class Test_Event(BaseEvent):
ignore_me = True ignore_me = True
priority = 10000 priority = 10000
def launch(self, session, event): def launch(self, session, event):
@ -22,7 +22,5 @@ class Test_Event(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Test_Event(session, plugins_presets).register() Test_Event(session, plugins_presets).register()

View file

@ -47,7 +47,5 @@ class ThumbnailEvents(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbnailEvents(session, plugins_presets).register() ThumbnailEvents(session, plugins_presets).register()

View file

@ -233,7 +233,5 @@ def register(session, plugins_presets):
""" """
Register plugin. Called when used as an plugin. Register plugin. Called when used as an plugin.
""" """
if not isinstance(session, ftrack_api.session.Session):
return
UserAssigmentEvent(session, plugins_presets).register() UserAssigmentEvent(session, plugins_presets).register()

View file

@ -71,7 +71,5 @@ class VersionToTaskStatus(BaseEvent):
def register(session, plugins_presets): def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.''' '''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
VersionToTaskStatus(session, plugins_presets).register() VersionToTaskStatus(session, plugins_presets).register()

View file

@ -1,7 +1 @@
from .ftrack_server import FtrackServer from .ftrack_server import FtrackServer
from . import event_server_cli
__all__ = [
'event_server_cli',
'FtrackServer'
]

View file

@ -1,18 +1,34 @@
import os import os
import sys import sys
import signal
import datetime
import subprocess
import socket
import argparse import argparse
import atexit
import time
from urllib.parse import urlparse
import requests import requests
from pype.vendor import ftrack_api from pype.vendor import ftrack_api
from pype.ftrack import credentials from pype.ftrack.lib import credentials
from pype.ftrack.ftrack_server import FtrackServer from pype.ftrack.ftrack_server import FtrackServer
from pypeapp import Logger from pype.ftrack.ftrack_server.lib import ftrack_events_mongo_settings
import socket_thread
log = Logger().get_logger('Ftrack event server', "ftrack-event-server-cli")
def check_url(url): class MongoPermissionsError(Exception):
"""Is used when is created multiple objects of same RestApi class."""
def __init__(self, message=None):
if not message:
message = "Exiting because have issue with acces to MongoDB"
super().__init__(message)
def check_ftrack_url(url, log_errors=True):
"""Checks if Ftrack server is responding"""
if not url: if not url:
log.error('Ftrack URL is not set!') print('ERROR: Ftrack URL is not set!')
return None return None
url = url.strip('/ ') url = url.strip('/ ')
@ -25,24 +41,47 @@ def check_url(url):
try: try:
result = requests.get(url, allow_redirects=False) result = requests.get(url, allow_redirects=False)
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
log.error('Entered Ftrack URL is not accesible!') if log_errors:
return None print('ERROR: Entered Ftrack URL is not accesible!')
return False
if (result.status_code != 200 or 'FTRACK_VERSION' not in result.headers): if (result.status_code != 200 or 'FTRACK_VERSION' not in result.headers):
log.error('Entered Ftrack URL is not accesible!') if log_errors:
return None print('ERROR: Entered Ftrack URL is not accesible!')
return False
log.debug('Ftrack server {} is accessible.'.format(url)) print('DEBUG: Ftrack server {} is accessible.'.format(url))
return url return url
def check_mongo_url(host, port, log_error=False):
"""Checks if mongo server is responding"""
sock = None
try:
sock = socket.create_connection(
(host, port),
timeout=1
)
return True
except socket.error as err:
if log_error:
print("Can't connect to MongoDB at {}:{} because: {}".format(
host, port, err
))
return False
finally:
if sock is not None:
sock.close()
def validate_credentials(url, user, api): def validate_credentials(url, user, api):
first_validation = True first_validation = True
if not user: if not user:
log.error('Ftrack Username is not set! Exiting.') print('ERROR: Ftrack Username is not set! Exiting.')
first_validation = False first_validation = False
if not api: if not api:
log.error('Ftrack API key is not set! Exiting.') print('ERROR: Ftrack API key is not set! Exiting.')
first_validation = False first_validation = False
if not first_validation: if not first_validation:
return False return False
@ -55,21 +94,21 @@ def validate_credentials(url, user, api):
) )
session.close() session.close()
except Exception as e: except Exception as e:
log.error( print(
'Can\'t log into Ftrack with used credentials:' 'ERROR: Can\'t log into Ftrack with used credentials:'
' Ftrack server: "{}" // Username: {} // API key: {}'.format( ' Ftrack server: "{}" // Username: {} // API key: {}'.format(
url, user, api url, user, api
)) ))
return False return False
log.debug('Credentials Username: "{}", API key: "{}" are valid.'.format( print('DEBUG: Credentials Username: "{}", API key: "{}" are valid.'.format(
user, api user, api
)) ))
return True return True
def process_event_paths(event_paths): def process_event_paths(event_paths):
log.debug('Processing event paths: {}.'.format(str(event_paths))) print('DEBUG: Processing event paths: {}.'.format(str(event_paths)))
return_paths = [] return_paths = []
not_found = [] not_found = []
if not event_paths: if not event_paths:
@ -87,14 +126,249 @@ def process_event_paths(event_paths):
return os.pathsep.join(return_paths), not_found return os.pathsep.join(return_paths), not_found
def run_event_server(ftrack_url, username, api_key, event_paths): def old_way_server(ftrack_url):
os.environ['FTRACK_SERVER'] = ftrack_url # Current file
os.environ['FTRACK_API_USER'] = username file_path = os.path.dirname(os.path.realpath(__file__))
os.environ['FTRACK_API_KEY'] = api_key
os.environ['FTRACK_EVENTS_PATH'] = event_paths min_fail_seconds = 5
max_fail_count = 3
wait_time_after_max_fail = 10
subproc = None
subproc_path = "{}/sub_old_way.py".format(file_path)
subproc_last_failed = datetime.datetime.now()
subproc_failed_count = 0
ftrack_accessible = False
printed_ftrack_error = False
while True:
if not ftrack_accessible:
ftrack_accessible = check_ftrack_url(ftrack_url)
# Run threads only if Ftrack is accessible
if not ftrack_accessible and not printed_ftrack_error:
print("Can't access Ftrack {} <{}>".format(
ftrack_url, str(datetime.datetime.now())
))
if subproc is not None:
if subproc.poll() is None:
subproc.terminate()
subproc = None
printed_ftrack_error = True
time.sleep(1)
continue
printed_ftrack_error = False
if subproc is None:
if subproc_failed_count < max_fail_count:
subproc = subprocess.Popen(
["python", subproc_path],
stdout=subprocess.PIPE
)
elif subproc_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
subproc_failed_count += 1
elif ((
datetime.datetime.now() - subproc_last_failed
).seconds > wait_time_after_max_fail):
subproc_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif subproc.poll() is not None:
subproc = None
ftrack_accessible = False
_subproc_last_failed = datetime.datetime.now()
delta_time = (_subproc_last_failed - subproc_last_failed).seconds
if delta_time < min_fail_seconds:
subproc_failed_count += 1
else:
subproc_failed_count = 0
subproc_last_failed = _subproc_last_failed
time.sleep(1)
def main_loop(ftrack_url):
""" This is main loop of event handling.
Loop is handling threads which handles subprocesses of event storer and
processor. When one of threads is stopped it is tested to connect to
ftrack and mongo server. Threads are not started when ftrack or mongo
server is not accessible. When threads are started it is checked for socket
signals as heartbeat. Heartbeat must become at least once per 30sec
otherwise thread will be killed.
"""
# Get mongo hostname and port for testing mongo connection
mongo_list = ftrack_events_mongo_settings()
mongo_hostname = mongo_list[0]
mongo_port = mongo_list[1]
# Current file
file_path = os.path.dirname(os.path.realpath(__file__))
min_fail_seconds = 5
max_fail_count = 3
wait_time_after_max_fail = 10
# Threads data
storer_name = "StorerThread"
storer_port = 10001
storer_path = "{}/sub_event_storer.py".format(file_path)
storer_thread = None
storer_last_failed = datetime.datetime.now()
storer_failed_count = 0
processor_name = "ProcessorThread"
processor_port = 10011
processor_path = "{}/sub_event_processor.py".format(file_path)
processor_thread = None
processor_last_failed = datetime.datetime.now()
processor_failed_count = 0
ftrack_accessible = False
mongo_accessible = False
printed_ftrack_error = False
printed_mongo_error = False
# stop threads on exit
# TODO check if works and args have thread objects!
def on_exit(processor_thread, storer_thread):
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
processor_thread = None
if storer_thread is not None:
storer_thread.stop()
storer_thread.join()
storer_thread = None
atexit.register(
on_exit, processor_thread=processor_thread, storer_thread=storer_thread
)
# Main loop
while True:
# Check if accessible Ftrack and Mongo url
if not ftrack_accessible:
ftrack_accessible = check_ftrack_url(ftrack_url)
if not mongo_accessible:
mongo_accessible = check_mongo_url(mongo_hostname, mongo_port)
# Run threads only if Ftrack is accessible
if not ftrack_accessible or not mongo_accessible:
if not mongo_accessible and not printed_mongo_error:
print("Can't access Mongo {}".format(mongo_url))
if not ftrack_accessible and not printed_ftrack_error:
print("Can't access Ftrack {}".format(ftrack_url))
if storer_thread is not None:
storer_thread.stop()
storer_thread.join()
storer_thread = None
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
processor_thread = None
printed_ftrack_error = True
printed_mongo_error = True
time.sleep(1)
continue
printed_ftrack_error = False
printed_mongo_error = False
# Run backup thread which does not requeire mongo to work
if storer_thread is None:
if storer_failed_count < max_fail_count:
storer_thread = socket_thread.SocketThread(
storer_name, storer_port, storer_path
)
storer_thread.start()
elif storer_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
storer_failed_count += 1
elif ((
datetime.datetime.now() - storer_last_failed
).seconds > wait_time_after_max_fail):
storer_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif not storer_thread.isAlive():
if storer_thread.mongo_error:
raise MongoPermissionsError()
storer_thread.join()
storer_thread = None
ftrack_accessible = False
mongo_accessible = False
_storer_last_failed = datetime.datetime.now()
delta_time = (_storer_last_failed - storer_last_failed).seconds
if delta_time < min_fail_seconds:
storer_failed_count += 1
else:
storer_failed_count = 0
storer_last_failed = _storer_last_failed
if processor_thread is None:
if processor_failed_count < max_fail_count:
processor_thread = socket_thread.SocketThread(
processor_name, processor_port, processor_path
)
processor_thread.start()
elif processor_failed_count == max_fail_count:
print((
"Processor failed {}times in row"
" I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
processor_failed_count += 1
elif ((
datetime.datetime.now() - processor_last_failed
).seconds > wait_time_after_max_fail):
processor_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif not processor_thread.isAlive():
if storer_thread.mongo_error:
raise Exception(
"Exiting because have issue with acces to MongoDB"
)
processor_thread.join()
processor_thread = None
ftrack_accessible = False
mongo_accessible = False
_processor_last_failed = datetime.datetime.now()
delta_time = (
_processor_last_failed - processor_last_failed
).seconds
if delta_time < min_fail_seconds:
processor_failed_count += 1
else:
processor_failed_count = 0
processor_last_failed = _processor_last_failed
time.sleep(1)
server = FtrackServer('event')
server.run_server()
def main(argv): def main(argv):
''' '''
@ -184,7 +458,11 @@ def main(argv):
help="Load creadentials from apps dir", help="Load creadentials from apps dir",
action="store_true" action="store_true"
) )
parser.add_argument(
'-oldway',
help="Load creadentials from apps dir",
action="store_true"
)
ftrack_url = os.environ.get('FTRACK_SERVER') ftrack_url = os.environ.get('FTRACK_SERVER')
username = os.environ.get('FTRACK_API_USER') username = os.environ.get('FTRACK_API_USER')
api_key = os.environ.get('FTRACK_API_KEY') api_key = os.environ.get('FTRACK_API_KEY')
@ -209,8 +487,9 @@ def main(argv):
if kwargs.ftrackapikey: if kwargs.ftrackapikey:
api_key = kwargs.ftrackapikey api_key = kwargs.ftrackapikey
oldway = kwargs.oldway
# Check url regex and accessibility # Check url regex and accessibility
ftrack_url = check_url(ftrack_url) ftrack_url = check_ftrack_url(ftrack_url)
if not ftrack_url: if not ftrack_url:
return 1 return 1
@ -221,21 +500,40 @@ def main(argv):
# Process events path # Process events path
event_paths, not_found = process_event_paths(event_paths) event_paths, not_found = process_event_paths(event_paths)
if not_found: if not_found:
log.warning( print(
'These paths were not found: {}'.format(str(not_found)) 'WARNING: These paths were not found: {}'.format(str(not_found))
) )
if not event_paths: if not event_paths:
if not_found: if not_found:
log.error('Any of entered paths is valid or can be accesible.') print('ERROR: Any of entered paths is valid or can be accesible.')
else: else:
log.error('Paths to events are not set. Exiting.') print('ERROR: Paths to events are not set. Exiting.')
return 1 return 1
if kwargs.storecred: if kwargs.storecred:
credentials._save_credentials(username, api_key, True) credentials._save_credentials(username, api_key, True)
run_event_server(ftrack_url, username, api_key, event_paths) # Set Ftrack environments
os.environ["FTRACK_SERVER"] = ftrack_url
os.environ["FTRACK_API_USER"] = username
os.environ["FTRACK_API_KEY"] = api_key
os.environ["FTRACK_EVENTS_PATH"] = event_paths
if oldway:
return old_way_server(ftrack_url)
return main_loop(ftrack_url)
if (__name__ == ('__main__')): if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv)) sys.exit(main(sys.argv))

View file

@ -126,23 +126,27 @@ class FtrackServer:
msg = '"{}" - register was not successful ({})'.format( msg = '"{}" - register was not successful ({})'.format(
function_dict['name'], str(exc) function_dict['name'], str(exc)
) )
log.warning(msg) log.warning(msg, exc_info=True)
def run_server(self): def run_server(self, session=None, load_files=True):
self.session = ftrack_api.Session(auto_connect_event_hub=True,) if not session:
session = ftrack_api.Session(auto_connect_event_hub=True)
paths_str = os.environ.get(self.env_key) self.session = session
if paths_str is None:
log.error((
"Env var \"{}\" is not set, \"{}\" server won\'t launch"
).format(self.env_key, self.server_type))
return
paths = paths_str.split(os.pathsep) if load_files:
self.set_files(paths) paths_str = os.environ.get(self.env_key)
if paths_str is None:
log.error((
"Env var \"{}\" is not set, \"{}\" server won\'t launch"
).format(self.env_key, self.server_type))
return
log.info(60*"*") paths = paths_str.split(os.pathsep)
log.info('Registration of actions/events has finished!') self.set_files(paths)
log.info(60*"*")
log.info('Registration of actions/events has finished!')
# keep event_hub on session running # keep event_hub on session running
self.session.event_hub.wait() self.session.event_hub.wait()

View file

@ -0,0 +1,68 @@
import os
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
def ftrack_events_mongo_settings():
host = None
port = None
username = None
password = None
collection = None
database = None
auth_db = ""
if os.environ.get('FTRACK_EVENTS_MONGO_URL'):
result = urlparse(os.environ['FTRACK_EVENTS_MONGO_URL'])
host = result.hostname
try:
port = result.port
except ValueError:
raise RuntimeError("invalid port specified")
username = result.username
password = result.password
try:
database = result.path.lstrip("/").split("/")[0]
collection = result.path.lstrip("/").split("/")[1]
except IndexError:
if not database:
raise RuntimeError("missing database name for logging")
try:
auth_db = parse_qs(result.query)['authSource'][0]
except KeyError:
# no auth db provided, mongo will use the one we are connecting to
pass
else:
host = os.environ.get('FTRACK_EVENTS_MONGO_HOST')
port = int(os.environ.get('FTRACK_EVENTS_MONGO_PORT', "0"))
database = os.environ.get('FTRACK_EVENTS_MONGO_DB')
username = os.environ.get('FTRACK_EVENTS_MONGO_USER')
password = os.environ.get('FTRACK_EVENTS_MONGO_PASSWORD')
collection = os.environ.get('FTRACK_EVENTS_MONGO_COL')
auth_db = os.environ.get('FTRACK_EVENTS_MONGO_AUTH_DB', 'avalon')
return host, port, database, username, password, collection, auth_db
def get_ftrack_event_mongo_info():
host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings()
user_pass = ""
if username and password:
user_pass = "{}:{}@".format(username, password)
socket_path = "{}:{}".format(host, port)
dab = ""
if database:
dab = "/{}".format(database)
auth = ""
if auth_db:
auth = "?authSource={}".format(auth_db)
url = "mongodb://{}{}{}{}".format(user_pass, socket_path, dab, auth)
return url, database, collection

View file

@ -0,0 +1,292 @@
import logging
import os
import atexit
import datetime
import tempfile
import threading
import time
import requests
import queue
import pymongo
import ftrack_api
import ftrack_api.session
import ftrack_api.cache
import ftrack_api.operation
import ftrack_api._centralized_storage_scenario
import ftrack_api.event
from ftrack_api.logging import LazyLogMessage as L
from pype.ftrack.lib.custom_db_connector import DbConnector
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
from pypeapp import Logger
log = Logger().get_logger("Session processor")
class ProcessEventHub(ftrack_api.event.hub.EventHub):
url, database, table_name = get_ftrack_event_mongo_info()
is_table_created = False
def __init__(self, *args, **kwargs):
self.dbcon = DbConnector(
mongo_url=self.url,
database_name=self.database,
table_name=self.table_name
)
self.sock = kwargs.pop("sock")
super(ProcessEventHub, self).__init__(*args, **kwargs)
def prepare_dbcon(self):
try:
self.dbcon.install()
dbcon._database.collection_names()
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
except pymongo.errors.OperationFailure:
log.error((
"Error with Mongo access, probably permissions."
"Check if exist database with name \"{}\""
" and collection \"{}\" inside."
).format(self.database, self.table_name))
self.sock.sendall(b"MongoError")
sys.exit(0)
def wait(self, duration=None):
"""Overriden wait
Event are loaded from Mongo DB when queue is empty. Handled event is
set as processed in Mongo DB.
"""
started = time.time()
self.prepare_dbcon()
while True:
try:
event = self._event_queue.get(timeout=0.1)
except queue.Empty:
if not self.load_events():
time.sleep(0.5)
else:
try:
self._handle(event)
self.dbcon.update_one(
{"id": event["id"]},
{"$set": {"pype_data.is_processed": True}}
)
except pymongo.errors.AutoReconnect:
log.error((
"Mongo server \"{}\" is not responding, exiting."
).format(os.environ["AVALON_MONGO"]))
sys.exit(0)
# Additional special processing of events.
if event['topic'] == 'ftrack.meta.disconnected':
break
if duration is not None:
if (time.time() - started) > duration:
break
def load_events(self):
"""Load not processed events sorted by stored date"""
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
result = self.dbcon.delete_many({
"pype_data.stored": {"$lte": ago_date},
"pype_data.is_processed": True
})
not_processed_events = self.dbcon.find(
{"pype_data.is_processed": False}
).sort(
[("pype_data.stored", pymongo.ASCENDING)]
)
found = False
for event_data in not_processed_events:
new_event_data = {
k: v for k, v in event_data.items()
if k not in ["_id", "pype_data"]
}
try:
event = ftrack_api.event.base.Event(**new_event_data)
except Exception:
self.logger.exception(L(
'Failed to convert payload into event: {0}',
event_data
))
continue
found = True
self._event_queue.put(event)
return found
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which skip events and extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "event":
return
if code_name == "heartbeat":
self.sock.sendall(b"processor")
return self._send_packet(self._code_name_mapping["heartbeat"])
return super()._handle_packet(code, packet_identifier, path, data)
class ProcessSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
plugin_paths=None, cache=None, cache_key_maker=None,
auto_connect_event_hub=None, schema_cache_path=None,
plugin_arguments=None, sock=None
):
super(ftrack_api.session.Session, self).__init__()
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
self._closed = False
if server_url is None:
server_url = os.environ.get('FTRACK_SERVER')
if not server_url:
raise TypeError(
'Required "server_url" not specified. Pass as argument or set '
'in environment variable FTRACK_SERVER.'
)
self._server_url = server_url
if api_key is None:
api_key = os.environ.get(
'FTRACK_API_KEY',
# Backwards compatibility
os.environ.get('FTRACK_APIKEY')
)
if not api_key:
raise TypeError(
'Required "api_key" not specified. Pass as argument or set in '
'environment variable FTRACK_API_KEY.'
)
self._api_key = api_key
if api_user is None:
api_user = os.environ.get('FTRACK_API_USER')
if not api_user:
try:
api_user = getpass.getuser()
except Exception:
pass
if not api_user:
raise TypeError(
'Required "api_user" not specified. Pass as argument, set in '
'environment variable FTRACK_API_USER or one of the standard '
'environment variables used by Python\'s getpass module.'
)
self._api_user = api_user
# Currently pending operations.
self.recorded_operations = ftrack_api.operation.Operations()
self.record_operations = True
self.cache_key_maker = cache_key_maker
if self.cache_key_maker is None:
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
# Enforce always having a memory cache at top level so that the same
# in-memory instance is returned from session.
self.cache = ftrack_api.cache.LayeredCache([
ftrack_api.cache.MemoryCache()
])
if cache is not None:
if callable(cache):
cache = cache(self)
if cache is not None:
self.cache.caches.append(cache)
self._managed_request = None
self._request = requests.Session()
self._request.auth = ftrack_api.session.SessionAuthentication(
self._api_key, self._api_user
)
self.auto_populate = auto_populate
# Fetch server information and in doing so also check credentials.
self._server_information = self._fetch_server_information()
# Now check compatibility of server based on retrieved information.
self.check_server_compatibility()
# Construct event hub and load plugins.
self._event_hub = ProcessEventHub(
self._server_url,
self._api_user,
self._api_key,
sock=sock
)
self._auto_connect_event_hub_thread = None
if auto_connect_event_hub in (None, True):
# Connect to event hub in background thread so as not to block main
# session usage waiting for event hub connection.
self._auto_connect_event_hub_thread = threading.Thread(
target=self._event_hub.connect
)
self._auto_connect_event_hub_thread.daemon = True
self._auto_connect_event_hub_thread.start()
# To help with migration from auto_connect_event_hub default changing
# from True to False.
self._event_hub._deprecation_warning_auto_connect = (
auto_connect_event_hub is None
)
# Register to auto-close session on exit.
atexit.register(self.close)
self._plugin_paths = plugin_paths
if self._plugin_paths is None:
self._plugin_paths = os.environ.get(
'FTRACK_EVENT_PLUGIN_PATH', ''
).split(os.pathsep)
self._discover_plugins(plugin_arguments=plugin_arguments)
# TODO: Make schemas read-only and non-mutable (or at least without
# rebuilding types)?
if schema_cache_path is not False:
if schema_cache_path is None:
schema_cache_path = os.environ.get(
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
)
schema_cache_path = os.path.join(
schema_cache_path, 'ftrack_api_schema_cache.json'
)
self.schemas = self._load_schemas(schema_cache_path)
self.types = self._build_entity_type_classes(self.schemas)
ftrack_api._centralized_storage_scenario.register(self)
self._configure_locations()
self.event_hub.publish(
ftrack_api.event.base.Event(
topic='ftrack.api.session.ready',
data=dict(
session=self
)
),
synchronous=True
)

View file

@ -0,0 +1,257 @@
import logging
import os
import atexit
import tempfile
import threading
import requests
import ftrack_api
import ftrack_api.session
import ftrack_api.cache
import ftrack_api.operation
import ftrack_api._centralized_storage_scenario
import ftrack_api.event
from ftrack_api.logging import LazyLogMessage as L
class StorerEventHub(ftrack_api.event.hub.EventHub):
def __init__(self, *args, **kwargs):
self.sock = kwargs.pop("sock")
super(StorerEventHub, self).__init__(*args, **kwargs)
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
if self._code_name_mapping[code] == "heartbeat":
# Reply with heartbeat.
self.sock.sendall(b"storer")
return self._send_packet(self._code_name_mapping['heartbeat'])
return super(StorerEventHub, self)._handle_packet(
code, packet_identifier, path, data
)
class StorerSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
plugin_paths=None, cache=None, cache_key_maker=None,
auto_connect_event_hub=None, schema_cache_path=None,
plugin_arguments=None, sock=None
):
'''Initialise session.
*server_url* should be the URL of the ftrack server to connect to
including any port number. If not specified attempt to look up from
:envvar:`FTRACK_SERVER`.
*api_key* should be the API key to use for authentication whilst
*api_user* should be the username of the user in ftrack to record
operations against. If not specified, *api_key* should be retrieved
from :envvar:`FTRACK_API_KEY` and *api_user* from
:envvar:`FTRACK_API_USER`.
If *auto_populate* is True (the default), then accessing entity
attributes will cause them to be automatically fetched from the server
if they are not already. This flag can be changed on the session
directly at any time.
*plugin_paths* should be a list of paths to search for plugins. If not
specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`.
*cache* should be an instance of a cache that fulfils the
:class:`ftrack_api.cache.Cache` interface and will be used as the cache
for the session. It can also be a callable that will be called with the
session instance as sole argument. The callable should return ``None``
if a suitable cache could not be configured, but session instantiation
can continue safely.
.. note::
The session will add the specified cache to a pre-configured layered
cache that specifies the top level cache as a
:class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary
to construct a separate memory cache for typical behaviour. Working
around this behaviour or removing the memory cache can lead to
unexpected behaviour.
*cache_key_maker* should be an instance of a key maker that fulfils the
:class:`ftrack_api.cache.KeyMaker` interface and will be used to
generate keys for objects being stored in the *cache*. If not specified,
a :class:`~ftrack_api.cache.StringKeyMaker` will be used.
If *auto_connect_event_hub* is True then embedded event hub will be
automatically connected to the event server and allow for publishing and
subscribing to **non-local** events. If False, then only publishing and
subscribing to **local** events will be possible until the hub is
manually connected using :meth:`EventHub.connect
<ftrack_api.event.hub.EventHub.connect>`.
.. note::
The event hub connection is performed in a background thread to
improve session startup time. If a registered plugin requires a
connected event hub then it should check the event hub connection
status explicitly. Subscribing to events does *not* require a
connected event hub.
Enable schema caching by setting *schema_cache_path* to a folder path.
If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to
determine the path to store cache in. If the environment variable is
also not specified then a temporary directory will be used. Set to
`False` to disable schema caching entirely.
*plugin_arguments* should be an optional mapping (dict) of keyword
arguments to pass to plugin register functions upon discovery. If a
discovered plugin has a signature that is incompatible with the passed
arguments, the discovery mechanism will attempt to reduce the passed
arguments to only those that the plugin accepts. Note that a warning
will be logged in this case.
'''
super(ftrack_api.session.Session, self).__init__()
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
self._closed = False
if server_url is None:
server_url = os.environ.get('FTRACK_SERVER')
if not server_url:
raise TypeError(
'Required "server_url" not specified. Pass as argument or set '
'in environment variable FTRACK_SERVER.'
)
self._server_url = server_url
if api_key is None:
api_key = os.environ.get(
'FTRACK_API_KEY',
# Backwards compatibility
os.environ.get('FTRACK_APIKEY')
)
if not api_key:
raise TypeError(
'Required "api_key" not specified. Pass as argument or set in '
'environment variable FTRACK_API_KEY.'
)
self._api_key = api_key
if api_user is None:
api_user = os.environ.get('FTRACK_API_USER')
if not api_user:
try:
api_user = getpass.getuser()
except Exception:
pass
if not api_user:
raise TypeError(
'Required "api_user" not specified. Pass as argument, set in '
'environment variable FTRACK_API_USER or one of the standard '
'environment variables used by Python\'s getpass module.'
)
self._api_user = api_user
# Currently pending operations.
self.recorded_operations = ftrack_api.operation.Operations()
self.record_operations = True
self.cache_key_maker = cache_key_maker
if self.cache_key_maker is None:
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
# Enforce always having a memory cache at top level so that the same
# in-memory instance is returned from session.
self.cache = ftrack_api.cache.LayeredCache([
ftrack_api.cache.MemoryCache()
])
if cache is not None:
if callable(cache):
cache = cache(self)
if cache is not None:
self.cache.caches.append(cache)
self._managed_request = None
self._request = requests.Session()
self._request.auth = ftrack_api.session.SessionAuthentication(
self._api_key, self._api_user
)
self.auto_populate = auto_populate
# Fetch server information and in doing so also check credentials.
self._server_information = self._fetch_server_information()
# Now check compatibility of server based on retrieved information.
self.check_server_compatibility()
# Construct event hub and load plugins.
self._event_hub = StorerEventHub(
self._server_url,
self._api_user,
self._api_key,
sock=sock
)
self._auto_connect_event_hub_thread = None
if auto_connect_event_hub in (None, True):
# Connect to event hub in background thread so as not to block main
# session usage waiting for event hub connection.
self._auto_connect_event_hub_thread = threading.Thread(
target=self._event_hub.connect
)
self._auto_connect_event_hub_thread.daemon = True
self._auto_connect_event_hub_thread.start()
# To help with migration from auto_connect_event_hub default changing
# from True to False.
self._event_hub._deprecation_warning_auto_connect = (
auto_connect_event_hub is None
)
# Register to auto-close session on exit.
atexit.register(self.close)
self._plugin_paths = plugin_paths
if self._plugin_paths is None:
self._plugin_paths = os.environ.get(
'FTRACK_EVENT_PLUGIN_PATH', ''
).split(os.pathsep)
self._discover_plugins(plugin_arguments=plugin_arguments)
# TODO: Make schemas read-only and non-mutable (or at least without
# rebuilding types)?
if schema_cache_path is not False:
if schema_cache_path is None:
schema_cache_path = os.environ.get(
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
)
schema_cache_path = os.path.join(
schema_cache_path, 'ftrack_api_schema_cache.json'
)
self.schemas = self._load_schemas(schema_cache_path)
self.types = self._build_entity_type_classes(self.schemas)
ftrack_api._centralized_storage_scenario.register(self)
self._configure_locations()
self.event_hub.publish(
ftrack_api.event.base.Event(
topic='ftrack.api.session.ready',
data=dict(
session=self
)
),
synchronous=True
)

View file

@ -0,0 +1,123 @@
import os
import sys
import time
import signal
import socket
import threading
import subprocess
from pypeapp import Logger
class SocketThread(threading.Thread):
"""Thread that checks suprocess of storer of processor of events"""
MAX_TIMEOUT = 35
def __init__(self, name, port, filepath):
super(SocketThread, self).__init__()
self.log = Logger().get_logger("SocketThread", "Event Thread")
self.setName(name)
self.name = name
self.port = port
self.filepath = filepath
self.sock = None
self.subproc = None
self.connection = None
self._is_running = False
self.finished = False
self.mongo_error = False
def stop(self):
self._is_running = False
def run(self):
self._is_running = True
time_socket = time.time()
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock = sock
# Bind the socket to the port - skip already used ports
while True:
try:
server_address = ("localhost", self.port)
sock.bind(server_address)
break
except OSError:
self.port += 1
self.log.debug(
"Running Socked thread on {}:{}".format(*server_address)
)
self.subproc = subprocess.Popen(
["python", self.filepath, "-port", str(self.port)],
stdout=subprocess.PIPE
)
# Listen for incoming connections
sock.listen(1)
sock.settimeout(1.0)
while True:
if not self._is_running:
break
try:
connection, client_address = sock.accept()
time_socket = time.time()
connection.settimeout(1.0)
self.connection = connection
except socket.timeout:
if (time.time() - time_socket) > self.MAX_TIMEOUT:
self.log.error("Connection timeout passed. Terminating.")
self._is_running = False
self.subproc.terminate()
break
continue
try:
time_con = time.time()
# Receive the data in small chunks and retransmit it
while True:
try:
if not self._is_running:
break
try:
data = connection.recv(16)
time_con = time.time()
except socket.timeout:
if (time.time() - time_con) > self.MAX_TIMEOUT:
self.log.error(
"Connection timeout passed. Terminating."
)
self._is_running = False
self.subproc.terminate()
break
continue
except ConnectionResetError:
self._is_running = False
break
if data:
if data == b"MongoError":
self.mongo_error = True
connection.sendall(data)
except Exception as exc:
self.log.error(
"Event server process failed", exc_info=True
)
finally:
# Clean up the connection
connection.close()
if self.subproc.poll() is None:
self.subproc.terminate()
lines = self.subproc.stdout.readlines()
if lines:
print("*** Socked Thread stdout ***")
for line in lines:
os.write(1, line)
self.finished = True

View file

@ -0,0 +1,53 @@
import os
import sys
import datetime
import signal
import socket
import pymongo
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.session_processor import ProcessSession
from pypeapp import Logger
log = Logger().get_logger("Event processor")
def main(args):
port = int(args[-1])
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ("localhost", port)
log.debug("Processor connected to {} port {}".format(*server_address))
sock.connect(server_address)
sock.sendall(b"CreatedProcess")
try:
session = ProcessSession(auto_connect_event_hub=True, sock=sock)
server = FtrackServer('event')
log.debug("Launched Ftrack Event processor")
server.run_server(session)
except Exception as exc:
import traceback
traceback.print_tb(exc.__traceback__)
finally:
log.debug("First closing socket")
sock.close()
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -0,0 +1,118 @@
import os
import sys
import datetime
import signal
import socket
import pymongo
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
from pype.ftrack.lib.custom_db_connector import DbConnector
from session_storer import StorerSession
from pypeapp import Logger
log = Logger().get_logger("Event storer")
url, database, table_name = get_ftrack_event_mongo_info()
dbcon = DbConnector(
mongo_url=url,
database_name=database,
table_name=table_name
)
# ignore_topics = ["ftrack.meta.connected"]
ignore_topics = []
def install_db():
try:
dbcon.install()
dbcon._database.collection_names()
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
def launch(event):
if event.get("topic") in ignore_topics:
return
event_data = event._data
event_id = event["id"]
event_data["pype_data"] = {
"stored": datetime.datetime.utcnow(),
"is_processed": False
}
try:
# dbcon.insert_one(event_data)
dbcon.update({"id": event_id}, event_data, upsert=True)
log.debug("Event: {} stored".format(event_id))
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
except Exception as exc:
log.error(
"Event: {} failed to store".format(event_id),
exc_info=True
)
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
install_db()
session.event_hub.subscribe("topic=*", launch)
def main(args):
port = int(args[-1])
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ("localhost", port)
log.debug("Storer connected to {} port {}".format(*server_address))
sock.connect(server_address)
sock.sendall(b"CreatedStore")
try:
session = StorerSession(auto_connect_event_hub=True, sock=sock)
register(session)
server = FtrackServer("event")
log.debug("Launched Ftrack Event storer")
server.run_server(session, load_files=False)
except pymongo.errors.OperationFailure:
log.error((
"Error with Mongo access, probably permissions."
"Check if exist database with name \"{}\""
" and collection \"{}\" inside."
).format(database, table_name))
sock.sendall(b"MongoError")
finally:
log.debug("First closing socket")
sock.close()
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -0,0 +1,100 @@
import os
import sys
import time
import datetime
import signal
import threading
from ftrack_server import FtrackServer
from pype.vendor import ftrack_api
from pype.vendor.ftrack_api.event.hub import EventHub
from pypeapp import Logger
log = Logger().get_logger("Event Server Old")
class TimerChecker(threading.Thread):
max_time_out = 35
def __init__(self, server, session):
self.server = server
self.session = session
self.is_running = False
self.failed = False
super().__init__()
def stop(self):
self.is_running = False
def run(self):
start = datetime.datetime.now()
self.is_running = True
connected = False
while True:
if not self.is_running:
break
if not self.session.event_hub.connected:
if not connected:
if (datetime.datetime.now() - start).seconds > self.max_time_out:
log.error((
"Exiting event server. Session was not connected"
" to ftrack server in {} seconds."
).format(self.max_time_out))
self.failed = True
break
else:
log.error(
"Exiting event server. Event Hub is not connected."
)
self.server.stop_session()
self.failed = True
break
else:
if not connected:
connected = True
time.sleep(1)
def main(args):
check_thread = None
try:
server = FtrackServer('event')
session = ftrack_api.Session(auto_connect_event_hub=True)
check_thread = TimerChecker(server, session)
check_thread.start()
log.debug("Launching Ftrack Event Old Way Server")
server.run_server(session)
except Exception as exc:
import traceback
traceback.print_tb(exc.__traceback__)
finally:
log_info = True
if check_thread is not None:
check_thread.stop()
check_thread.join()
if check_thread.failed:
log_info = False
if log_info:
log.info("Exiting Event server subprocess")
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -13,6 +13,7 @@ import logging
import tempfile import tempfile
import functools import functools
import contextlib import contextlib
import atexit
import requests import requests
@ -20,6 +21,9 @@ import requests
import pymongo import pymongo
from pymongo.client_session import ClientSession from pymongo.client_session import ClientSession
class NotActiveTable(Exception):
pass
def auto_reconnect(func): def auto_reconnect(func):
"""Handling auto reconnect in 3 retry times""" """Handling auto reconnect in 3 retry times"""
@functools.wraps(func) @functools.wraps(func)
@ -37,12 +41,23 @@ def auto_reconnect(func):
return decorated return decorated
def check_active_table(func):
"""Handling auto reconnect in 3 retry times"""
@functools.wraps(func)
def decorated(obj, *args, **kwargs):
if not obj.active_table:
raise NotActiveTable("Active table is not set. (This is bug)")
return func(obj, *args, **kwargs)
return decorated
class DbConnector: class DbConnector:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
timeout = 1000 timeout = 1000
def __init__(self, mongo_url, database_name, table_name): def __init__(self, mongo_url, database_name, table_name=None):
self._mongo_client = None self._mongo_client = None
self._sentry_client = None self._sentry_client = None
self._sentry_logging_handler = None self._sentry_logging_handler = None
@ -53,11 +68,17 @@ class DbConnector:
self.active_table = table_name self.active_table = table_name
def __getattribute__(self, attr):
try:
return super().__getattribute__(attr)
except AttributeError:
return self._database[self.active_table].__getattribute__(attr)
def install(self): def install(self):
"""Establish a persistent connection to the database""" """Establish a persistent connection to the database"""
if self._is_installed: if self._is_installed:
return return
atexit.register(self.uninstall)
logging.basicConfig() logging.basicConfig()
self._mongo_client = pymongo.MongoClient( self._mongo_client = pymongo.MongoClient(
@ -99,6 +120,16 @@ class DbConnector:
self._mongo_client = None self._mongo_client = None
self._database = None self._database = None
self._is_installed = False self._is_installed = False
atexit.unregister(self.uninstall)
def create_table(self, name, **options):
if self.exist_table(name):
return
return self._database.create_collection(name, **options)
def exist_table(self, table_name):
return table_name in self.tables()
def tables(self): def tables(self):
"""List available tables """List available tables
@ -115,93 +146,80 @@ class DbConnector:
def collections(self): def collections(self):
return self._database.collection_names() return self._database.collection_names()
@check_active_table
@auto_reconnect @auto_reconnect
def insert_one(self, item, session=None): def insert_one(self, item, **options):
assert isinstance(item, dict), "item must be of type <dict>" assert isinstance(item, dict), "item must be of type <dict>"
return self._database[self.active_table].insert_one( return self._database[self.active_table].insert_one(item, **options)
item,
session=session
)
@check_active_table
@auto_reconnect @auto_reconnect
def insert_many(self, items, ordered=True, session=None): def insert_many(self, items, ordered=True, **options):
# check if all items are valid # check if all items are valid
assert isinstance(items, list), "`items` must be of type <list>" assert isinstance(items, list), "`items` must be of type <list>"
for item in items: for item in items:
assert isinstance(item, dict), "`item` must be of type <dict>" assert isinstance(item, dict), "`item` must be of type <dict>"
return self._database[self.active_table].insert_many( options["ordered"] = ordered
items, return self._database[self.active_table].insert_many(items, **options)
ordered=ordered,
session=session
)
@check_active_table
@auto_reconnect @auto_reconnect
def find(self, filter, projection=None, sort=None, session=None): def find(self, filter, projection=None, sort=None, **options):
return self._database[self.active_table].find( options["projection"] = projection
filter=filter, options["sort"] = sort
projection=projection, return self._database[self.active_table].find(filter, **options)
sort=sort,
session=session
)
@check_active_table
@auto_reconnect @auto_reconnect
def find_one(self, filter, projection=None, sort=None, session=None): def find_one(self, filter, projection=None, sort=None, **options):
assert isinstance(filter, dict), "filter must be <dict>" assert isinstance(filter, dict), "filter must be <dict>"
return self._database[self.active_table].find_one( options["projection"] = projection
filter=filter, options["sort"] = sort
projection=projection, return self._database[self.active_table].find_one(filter, **options)
sort=sort,
session=session
)
@check_active_table
@auto_reconnect @auto_reconnect
def replace_one(self, filter, replacement, session=None): def replace_one(self, filter, replacement, **options):
return self._database[self.active_table].replace_one( return self._database[self.active_table].replace_one(
filter, replacement, filter, replacement, **options
session=session
) )
@check_active_table
@auto_reconnect @auto_reconnect
def update_one(self, filter, update, session=None): def update_one(self, filter, update, **options):
return self._database[self.active_table].update_one( return self._database[self.active_table].update_one(
filter, update, filter, update, **options
session=session
) )
@check_active_table
@auto_reconnect @auto_reconnect
def update_many(self, filter, update, session=None): def update_many(self, filter, update, **options):
return self._database[self.active_table].update_many( return self._database[self.active_table].update_many(
filter, update, filter, update, **options
session=session
) )
@check_active_table
@auto_reconnect @auto_reconnect
def distinct(self, *args, **kwargs): def distinct(self, *args, **kwargs):
return self._database[self.active_table].distinct( return self._database[self.active_table].distinct(*args, **kwargs)
*args, **kwargs
)
@check_active_table
@auto_reconnect @auto_reconnect
def drop_collection(self, name_or_collection, session=None): def drop_collection(self, name_or_collection, **options):
return self._database[self.active_table].drop( return self._database[self.active_table].drop(
name_or_collection, name_or_collection, **options
session=session
) )
@check_active_table
@auto_reconnect @auto_reconnect
def delete_one(filter, collation=None, session=None): def delete_one(self, filter, collation=None, **options):
return self._database[self.active_table].delete_one( options["collation"] = collation
filter, return self._database[self.active_table].delete_one(filter, **options)
collation=collation,
session=session
)
@check_active_table
@auto_reconnect @auto_reconnect
def delete_many(filter, collation=None, session=None): def delete_many(self, filter, collation=None, **options):
return self._database[self.active_table].delete_many( options["collation"] = collation
filter, return self._database[self.active_table].delete_many(filter, **options)
collation=collation,
session=session
)

View file

@ -3,6 +3,7 @@ import time
from pypeapp import Logger from pypeapp import Logger
from pype.vendor import ftrack_api from pype.vendor import ftrack_api
from pype.vendor.ftrack_api import session as fa_session from pype.vendor.ftrack_api import session as fa_session
from pype.ftrack.ftrack_server import session_processor
class MissingPermision(Exception): class MissingPermision(Exception):
@ -31,8 +32,21 @@ class BaseHandler(object):
def __init__(self, session, plugins_presets={}): def __init__(self, session, plugins_presets={}):
'''Expects a ftrack_api.Session instance''' '''Expects a ftrack_api.Session instance'''
self._session = session
self.log = Logger().get_logger(self.__class__.__name__) self.log = Logger().get_logger(self.__class__.__name__)
if not(
isinstance(session, ftrack_api.session.Session) or
isinstance(session, session_processor.ProcessSession)
):
raise Exception((
"Session object entered with args is instance of \"{}\""
" but expected instances are \"{}\" and \"{}\""
).format(
str(type(session)),
str(ftrack_api.session.Session),
str(session_processor.ProcessSession)
))
self._session = session
# Using decorator # Using decorator
self.register = self.register_decorator(self.register) self.register = self.register_decorator(self.register)