Merged in feature/PYPE-349_parallel_event_server (pull request #330)

Feature/PYPE-349 parallel event server

Approved-by: Milan Kolar <milan@orbi.tools>
This commit is contained in:
Jakub Trllo 2019-10-31 18:06:54 +00:00 committed by Milan Kolar
commit 78519aa066
46 changed files with 1452 additions and 220 deletions

View file

@ -1,2 +1,2 @@
from .lib import *
from .ftrack_server import *
from .ftrack_server import FtrackServer

View file

@ -281,7 +281,4 @@ class AttributesRemapper(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
AttributesRemapper(session, plugins_presets).register()

View file

@ -55,11 +55,8 @@ class ClientReviewSort(BaseAction):
def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ClientReviewSort(session, plugins_presets)
action_handler.register()
ClientReviewSort(session, plugins_presets).register()
def main(arguments=None):

View file

@ -68,12 +68,6 @@ class ComponentOpen(BaseAction):
def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
ComponentOpen(session, plugins_presets).register()

View file

@ -572,12 +572,6 @@ class CustomAttributes(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
CustomAttributes(session, plugins_presets).register()

View file

@ -327,9 +327,6 @@ class PartialDict(dict):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CreateFolders(session, plugins_presets).register()

View file

@ -198,9 +198,6 @@ class CreateProjectFolders(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CreateProjectFolders(session, plugins_presets).register()

View file

@ -9,7 +9,7 @@ from pype.ftrack import BaseAction
class CustomAttributeDoctor(BaseAction):
ignore_me = True
#: Action identifier.
identifier = 'custom.attributes.doctor'
@ -294,9 +294,6 @@ class CustomAttributeDoctor(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CustomAttributeDoctor(session, plugins_presets).register()

View file

@ -85,7 +85,7 @@ class DeleteAsset(BaseAction):
'type': 'asset',
'name': entity['name']
})
if av_entity is None:
return {
'success': False,
@ -314,12 +314,6 @@ class DeleteAsset(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
DeleteAsset(session, plugins_presets).register()

View file

@ -135,12 +135,6 @@ class AssetsRemover(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
AssetsRemover(session, plugins_presets).register()

View file

@ -220,8 +220,6 @@ class DJVViewAction(BaseAction):
def register(session, plugins_presets={}):
"""Register hooks."""
if not isinstance(session, ftrack_api.session.Session):
return
DJVViewAction(session, plugins_presets).register()

View file

@ -121,12 +121,6 @@ class JobKiller(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
JobKiller(session, plugins_presets).register()

View file

@ -115,9 +115,6 @@ class MultipleNotes(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
MultipleNotes(session, plugins_presets).register()

View file

@ -372,7 +372,4 @@ class PrepareProject(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
PrepareProject(session, plugins_presets).register()

View file

@ -328,8 +328,6 @@ class RVAction(BaseAction):
def register(session, plugins_presets={}):
"""Register hooks."""
if not isinstance(session, ftrack_api.session.Session):
return
RVAction(session, plugins_presets).register()

View file

@ -26,7 +26,7 @@ class StartTimer(BaseAction):
user.start_timer(entity, force=True)
self.session.commit()
self.log.info(
"Starting Ftrack timer for task: {}".format(entity['name'])
)
@ -37,7 +37,4 @@ class StartTimer(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
StartTimer(session, plugins_presets).register()

View file

@ -309,9 +309,6 @@ class SyncHierarchicalAttrs(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -263,11 +263,4 @@ class SyncToAvalon(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
SyncToAvalon(session, plugins_presets).register()

View file

@ -43,9 +43,6 @@ class TestAction(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
TestAction(session, plugins_presets).register()

View file

@ -68,8 +68,6 @@ class ThumbToChildren(BaseAction):
def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbToChildren(session, plugins_presets).register()

View file

@ -90,8 +90,6 @@ class ThumbToParent(BaseAction):
def register(session, plugins_presets={}):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbToParent(session, plugins_presets).register()

View file

@ -40,7 +40,4 @@ class ActionAskWhereIRun(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ActionAskWhereIRun(session, plugins_presets).register()

View file

@ -80,7 +80,4 @@ class ActionShowWhereIRun(BaseAction):
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ActionShowWhereIRun(session, plugins_presets).register()

View file

@ -220,7 +220,7 @@ class SyncHierarchicalAttrs(BaseAction):
if job['status'] in ('queued', 'running'):
job['status'] = 'failed'
session.commit()
if self.interface_messages:
self.show_interface_from_dict(
messages=self.interface_messages,
@ -341,9 +341,6 @@ class SyncHierarchicalAttrs(BaseAction):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -296,9 +296,6 @@ def register(session, plugins_presets):
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
SyncToAvalon(session, plugins_presets).register()

View file

@ -53,7 +53,5 @@ class DelAvalonIdFromNew(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
DelAvalonIdFromNew(session, plugins_presets).register()

View file

@ -88,7 +88,5 @@ class NextTaskUpdate(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
NextTaskUpdate(session, plugins_presets).register()

View file

@ -36,7 +36,5 @@ class Radio_buttons(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Radio_buttons(session, plugins_presets).register()

View file

@ -209,7 +209,5 @@ class SyncHierarchicalAttrs(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
SyncHierarchicalAttrs(session, plugins_presets).register()

View file

@ -122,8 +122,4 @@ class Sync_to_Avalon(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Sync_to_Avalon(session, plugins_presets).register()

View file

@ -8,7 +8,7 @@ from pype.ftrack import BaseEvent
class Test_Event(BaseEvent):
ignore_me = True
priority = 10000
def launch(self, session, event):
@ -22,7 +22,5 @@ class Test_Event(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
Test_Event(session, plugins_presets).register()

View file

@ -47,7 +47,5 @@ class ThumbnailEvents(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
ThumbnailEvents(session, plugins_presets).register()

View file

@ -233,7 +233,5 @@ def register(session, plugins_presets):
"""
Register plugin. Called when used as an plugin.
"""
if not isinstance(session, ftrack_api.session.Session):
return
UserAssigmentEvent(session, plugins_presets).register()

View file

@ -71,7 +71,5 @@ class VersionToTaskStatus(BaseEvent):
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
VersionToTaskStatus(session, plugins_presets).register()

View file

@ -1,7 +1 @@
from .ftrack_server import FtrackServer
from . import event_server_cli
__all__ = [
'event_server_cli',
'FtrackServer'
]

View file

@ -1,18 +1,34 @@
import os
import sys
import signal
import datetime
import subprocess
import socket
import argparse
import atexit
import time
from urllib.parse import urlparse
import requests
from pype.vendor import ftrack_api
from pype.ftrack import credentials
from pype.ftrack.lib import credentials
from pype.ftrack.ftrack_server import FtrackServer
from pypeapp import Logger
log = Logger().get_logger('Ftrack event server', "ftrack-event-server-cli")
from pype.ftrack.ftrack_server.lib import ftrack_events_mongo_settings
import socket_thread
def check_url(url):
class MongoPermissionsError(Exception):
"""Is used when is created multiple objects of same RestApi class."""
def __init__(self, message=None):
if not message:
message = "Exiting because have issue with acces to MongoDB"
super().__init__(message)
def check_ftrack_url(url, log_errors=True):
"""Checks if Ftrack server is responding"""
if not url:
log.error('Ftrack URL is not set!')
print('ERROR: Ftrack URL is not set!')
return None
url = url.strip('/ ')
@ -25,24 +41,47 @@ def check_url(url):
try:
result = requests.get(url, allow_redirects=False)
except requests.exceptions.RequestException:
log.error('Entered Ftrack URL is not accesible!')
return None
if log_errors:
print('ERROR: Entered Ftrack URL is not accesible!')
return False
if (result.status_code != 200 or 'FTRACK_VERSION' not in result.headers):
log.error('Entered Ftrack URL is not accesible!')
return None
if log_errors:
print('ERROR: Entered Ftrack URL is not accesible!')
return False
log.debug('Ftrack server {} is accessible.'.format(url))
print('DEBUG: Ftrack server {} is accessible.'.format(url))
return url
def check_mongo_url(host, port, log_error=False):
"""Checks if mongo server is responding"""
sock = None
try:
sock = socket.create_connection(
(host, port),
timeout=1
)
return True
except socket.error as err:
if log_error:
print("Can't connect to MongoDB at {}:{} because: {}".format(
host, port, err
))
return False
finally:
if sock is not None:
sock.close()
def validate_credentials(url, user, api):
first_validation = True
if not user:
log.error('Ftrack Username is not set! Exiting.')
print('ERROR: Ftrack Username is not set! Exiting.')
first_validation = False
if not api:
log.error('Ftrack API key is not set! Exiting.')
print('ERROR: Ftrack API key is not set! Exiting.')
first_validation = False
if not first_validation:
return False
@ -55,21 +94,21 @@ def validate_credentials(url, user, api):
)
session.close()
except Exception as e:
log.error(
'Can\'t log into Ftrack with used credentials:'
print(
'ERROR: Can\'t log into Ftrack with used credentials:'
' Ftrack server: "{}" // Username: {} // API key: {}'.format(
url, user, api
))
return False
log.debug('Credentials Username: "{}", API key: "{}" are valid.'.format(
print('DEBUG: Credentials Username: "{}", API key: "{}" are valid.'.format(
user, api
))
return True
def process_event_paths(event_paths):
log.debug('Processing event paths: {}.'.format(str(event_paths)))
print('DEBUG: Processing event paths: {}.'.format(str(event_paths)))
return_paths = []
not_found = []
if not event_paths:
@ -87,14 +126,249 @@ def process_event_paths(event_paths):
return os.pathsep.join(return_paths), not_found
def run_event_server(ftrack_url, username, api_key, event_paths):
os.environ['FTRACK_SERVER'] = ftrack_url
os.environ['FTRACK_API_USER'] = username
os.environ['FTRACK_API_KEY'] = api_key
os.environ['FTRACK_EVENTS_PATH'] = event_paths
def old_way_server(ftrack_url):
# Current file
file_path = os.path.dirname(os.path.realpath(__file__))
min_fail_seconds = 5
max_fail_count = 3
wait_time_after_max_fail = 10
subproc = None
subproc_path = "{}/sub_old_way.py".format(file_path)
subproc_last_failed = datetime.datetime.now()
subproc_failed_count = 0
ftrack_accessible = False
printed_ftrack_error = False
while True:
if not ftrack_accessible:
ftrack_accessible = check_ftrack_url(ftrack_url)
# Run threads only if Ftrack is accessible
if not ftrack_accessible and not printed_ftrack_error:
print("Can't access Ftrack {} <{}>".format(
ftrack_url, str(datetime.datetime.now())
))
if subproc is not None:
if subproc.poll() is None:
subproc.terminate()
subproc = None
printed_ftrack_error = True
time.sleep(1)
continue
printed_ftrack_error = False
if subproc is None:
if subproc_failed_count < max_fail_count:
subproc = subprocess.Popen(
["python", subproc_path],
stdout=subprocess.PIPE
)
elif subproc_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
subproc_failed_count += 1
elif ((
datetime.datetime.now() - subproc_last_failed
).seconds > wait_time_after_max_fail):
subproc_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif subproc.poll() is not None:
subproc = None
ftrack_accessible = False
_subproc_last_failed = datetime.datetime.now()
delta_time = (_subproc_last_failed - subproc_last_failed).seconds
if delta_time < min_fail_seconds:
subproc_failed_count += 1
else:
subproc_failed_count = 0
subproc_last_failed = _subproc_last_failed
time.sleep(1)
def main_loop(ftrack_url):
""" This is main loop of event handling.
Loop is handling threads which handles subprocesses of event storer and
processor. When one of threads is stopped it is tested to connect to
ftrack and mongo server. Threads are not started when ftrack or mongo
server is not accessible. When threads are started it is checked for socket
signals as heartbeat. Heartbeat must become at least once per 30sec
otherwise thread will be killed.
"""
# Get mongo hostname and port for testing mongo connection
mongo_list = ftrack_events_mongo_settings()
mongo_hostname = mongo_list[0]
mongo_port = mongo_list[1]
# Current file
file_path = os.path.dirname(os.path.realpath(__file__))
min_fail_seconds = 5
max_fail_count = 3
wait_time_after_max_fail = 10
# Threads data
storer_name = "StorerThread"
storer_port = 10001
storer_path = "{}/sub_event_storer.py".format(file_path)
storer_thread = None
storer_last_failed = datetime.datetime.now()
storer_failed_count = 0
processor_name = "ProcessorThread"
processor_port = 10011
processor_path = "{}/sub_event_processor.py".format(file_path)
processor_thread = None
processor_last_failed = datetime.datetime.now()
processor_failed_count = 0
ftrack_accessible = False
mongo_accessible = False
printed_ftrack_error = False
printed_mongo_error = False
# stop threads on exit
# TODO check if works and args have thread objects!
def on_exit(processor_thread, storer_thread):
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
processor_thread = None
if storer_thread is not None:
storer_thread.stop()
storer_thread.join()
storer_thread = None
atexit.register(
on_exit, processor_thread=processor_thread, storer_thread=storer_thread
)
# Main loop
while True:
# Check if accessible Ftrack and Mongo url
if not ftrack_accessible:
ftrack_accessible = check_ftrack_url(ftrack_url)
if not mongo_accessible:
mongo_accessible = check_mongo_url(mongo_hostname, mongo_port)
# Run threads only if Ftrack is accessible
if not ftrack_accessible or not mongo_accessible:
if not mongo_accessible and not printed_mongo_error:
print("Can't access Mongo {}".format(mongo_url))
if not ftrack_accessible and not printed_ftrack_error:
print("Can't access Ftrack {}".format(ftrack_url))
if storer_thread is not None:
storer_thread.stop()
storer_thread.join()
storer_thread = None
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
processor_thread = None
printed_ftrack_error = True
printed_mongo_error = True
time.sleep(1)
continue
printed_ftrack_error = False
printed_mongo_error = False
# Run backup thread which does not requeire mongo to work
if storer_thread is None:
if storer_failed_count < max_fail_count:
storer_thread = socket_thread.SocketThread(
storer_name, storer_port, storer_path
)
storer_thread.start()
elif storer_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
storer_failed_count += 1
elif ((
datetime.datetime.now() - storer_last_failed
).seconds > wait_time_after_max_fail):
storer_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif not storer_thread.isAlive():
if storer_thread.mongo_error:
raise MongoPermissionsError()
storer_thread.join()
storer_thread = None
ftrack_accessible = False
mongo_accessible = False
_storer_last_failed = datetime.datetime.now()
delta_time = (_storer_last_failed - storer_last_failed).seconds
if delta_time < min_fail_seconds:
storer_failed_count += 1
else:
storer_failed_count = 0
storer_last_failed = _storer_last_failed
if processor_thread is None:
if processor_failed_count < max_fail_count:
processor_thread = socket_thread.SocketThread(
processor_name, processor_port, processor_path
)
processor_thread.start()
elif processor_failed_count == max_fail_count:
print((
"Processor failed {}times in row"
" I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
processor_failed_count += 1
elif ((
datetime.datetime.now() - processor_last_failed
).seconds > wait_time_after_max_fail):
processor_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif not processor_thread.isAlive():
if storer_thread.mongo_error:
raise Exception(
"Exiting because have issue with acces to MongoDB"
)
processor_thread.join()
processor_thread = None
ftrack_accessible = False
mongo_accessible = False
_processor_last_failed = datetime.datetime.now()
delta_time = (
_processor_last_failed - processor_last_failed
).seconds
if delta_time < min_fail_seconds:
processor_failed_count += 1
else:
processor_failed_count = 0
processor_last_failed = _processor_last_failed
time.sleep(1)
server = FtrackServer('event')
server.run_server()
def main(argv):
'''
@ -184,7 +458,11 @@ def main(argv):
help="Load creadentials from apps dir",
action="store_true"
)
parser.add_argument(
'-oldway',
help="Load creadentials from apps dir",
action="store_true"
)
ftrack_url = os.environ.get('FTRACK_SERVER')
username = os.environ.get('FTRACK_API_USER')
api_key = os.environ.get('FTRACK_API_KEY')
@ -209,8 +487,9 @@ def main(argv):
if kwargs.ftrackapikey:
api_key = kwargs.ftrackapikey
oldway = kwargs.oldway
# Check url regex and accessibility
ftrack_url = check_url(ftrack_url)
ftrack_url = check_ftrack_url(ftrack_url)
if not ftrack_url:
return 1
@ -221,21 +500,40 @@ def main(argv):
# Process events path
event_paths, not_found = process_event_paths(event_paths)
if not_found:
log.warning(
'These paths were not found: {}'.format(str(not_found))
print(
'WARNING: These paths were not found: {}'.format(str(not_found))
)
if not event_paths:
if not_found:
log.error('Any of entered paths is valid or can be accesible.')
print('ERROR: Any of entered paths is valid or can be accesible.')
else:
log.error('Paths to events are not set. Exiting.')
print('ERROR: Paths to events are not set. Exiting.')
return 1
if kwargs.storecred:
credentials._save_credentials(username, api_key, True)
run_event_server(ftrack_url, username, api_key, event_paths)
# Set Ftrack environments
os.environ["FTRACK_SERVER"] = ftrack_url
os.environ["FTRACK_API_USER"] = username
os.environ["FTRACK_API_KEY"] = api_key
os.environ["FTRACK_EVENTS_PATH"] = event_paths
if oldway:
return old_way_server(ftrack_url)
return main_loop(ftrack_url)
if (__name__ == ('__main__')):
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -126,23 +126,27 @@ class FtrackServer:
msg = '"{}" - register was not successful ({})'.format(
function_dict['name'], str(exc)
)
log.warning(msg)
log.warning(msg, exc_info=True)
def run_server(self):
self.session = ftrack_api.Session(auto_connect_event_hub=True,)
def run_server(self, session=None, load_files=True):
if not session:
session = ftrack_api.Session(auto_connect_event_hub=True)
paths_str = os.environ.get(self.env_key)
if paths_str is None:
log.error((
"Env var \"{}\" is not set, \"{}\" server won\'t launch"
).format(self.env_key, self.server_type))
return
self.session = session
paths = paths_str.split(os.pathsep)
self.set_files(paths)
if load_files:
paths_str = os.environ.get(self.env_key)
if paths_str is None:
log.error((
"Env var \"{}\" is not set, \"{}\" server won\'t launch"
).format(self.env_key, self.server_type))
return
log.info(60*"*")
log.info('Registration of actions/events has finished!')
paths = paths_str.split(os.pathsep)
self.set_files(paths)
log.info(60*"*")
log.info('Registration of actions/events has finished!')
# keep event_hub on session running
self.session.event_hub.wait()

View file

@ -0,0 +1,68 @@
import os
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
def ftrack_events_mongo_settings():
host = None
port = None
username = None
password = None
collection = None
database = None
auth_db = ""
if os.environ.get('FTRACK_EVENTS_MONGO_URL'):
result = urlparse(os.environ['FTRACK_EVENTS_MONGO_URL'])
host = result.hostname
try:
port = result.port
except ValueError:
raise RuntimeError("invalid port specified")
username = result.username
password = result.password
try:
database = result.path.lstrip("/").split("/")[0]
collection = result.path.lstrip("/").split("/")[1]
except IndexError:
if not database:
raise RuntimeError("missing database name for logging")
try:
auth_db = parse_qs(result.query)['authSource'][0]
except KeyError:
# no auth db provided, mongo will use the one we are connecting to
pass
else:
host = os.environ.get('FTRACK_EVENTS_MONGO_HOST')
port = int(os.environ.get('FTRACK_EVENTS_MONGO_PORT', "0"))
database = os.environ.get('FTRACK_EVENTS_MONGO_DB')
username = os.environ.get('FTRACK_EVENTS_MONGO_USER')
password = os.environ.get('FTRACK_EVENTS_MONGO_PASSWORD')
collection = os.environ.get('FTRACK_EVENTS_MONGO_COL')
auth_db = os.environ.get('FTRACK_EVENTS_MONGO_AUTH_DB', 'avalon')
return host, port, database, username, password, collection, auth_db
def get_ftrack_event_mongo_info():
host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings()
user_pass = ""
if username and password:
user_pass = "{}:{}@".format(username, password)
socket_path = "{}:{}".format(host, port)
dab = ""
if database:
dab = "/{}".format(database)
auth = ""
if auth_db:
auth = "?authSource={}".format(auth_db)
url = "mongodb://{}{}{}{}".format(user_pass, socket_path, dab, auth)
return url, database, collection

View file

@ -0,0 +1,292 @@
import logging
import os
import atexit
import datetime
import tempfile
import threading
import time
import requests
import queue
import pymongo
import ftrack_api
import ftrack_api.session
import ftrack_api.cache
import ftrack_api.operation
import ftrack_api._centralized_storage_scenario
import ftrack_api.event
from ftrack_api.logging import LazyLogMessage as L
from pype.ftrack.lib.custom_db_connector import DbConnector
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
from pypeapp import Logger
log = Logger().get_logger("Session processor")
class ProcessEventHub(ftrack_api.event.hub.EventHub):
url, database, table_name = get_ftrack_event_mongo_info()
is_table_created = False
def __init__(self, *args, **kwargs):
self.dbcon = DbConnector(
mongo_url=self.url,
database_name=self.database,
table_name=self.table_name
)
self.sock = kwargs.pop("sock")
super(ProcessEventHub, self).__init__(*args, **kwargs)
def prepare_dbcon(self):
try:
self.dbcon.install()
dbcon._database.collection_names()
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
except pymongo.errors.OperationFailure:
log.error((
"Error with Mongo access, probably permissions."
"Check if exist database with name \"{}\""
" and collection \"{}\" inside."
).format(self.database, self.table_name))
self.sock.sendall(b"MongoError")
sys.exit(0)
def wait(self, duration=None):
"""Overriden wait
Event are loaded from Mongo DB when queue is empty. Handled event is
set as processed in Mongo DB.
"""
started = time.time()
self.prepare_dbcon()
while True:
try:
event = self._event_queue.get(timeout=0.1)
except queue.Empty:
if not self.load_events():
time.sleep(0.5)
else:
try:
self._handle(event)
self.dbcon.update_one(
{"id": event["id"]},
{"$set": {"pype_data.is_processed": True}}
)
except pymongo.errors.AutoReconnect:
log.error((
"Mongo server \"{}\" is not responding, exiting."
).format(os.environ["AVALON_MONGO"]))
sys.exit(0)
# Additional special processing of events.
if event['topic'] == 'ftrack.meta.disconnected':
break
if duration is not None:
if (time.time() - started) > duration:
break
def load_events(self):
"""Load not processed events sorted by stored date"""
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
result = self.dbcon.delete_many({
"pype_data.stored": {"$lte": ago_date},
"pype_data.is_processed": True
})
not_processed_events = self.dbcon.find(
{"pype_data.is_processed": False}
).sort(
[("pype_data.stored", pymongo.ASCENDING)]
)
found = False
for event_data in not_processed_events:
new_event_data = {
k: v for k, v in event_data.items()
if k not in ["_id", "pype_data"]
}
try:
event = ftrack_api.event.base.Event(**new_event_data)
except Exception:
self.logger.exception(L(
'Failed to convert payload into event: {0}',
event_data
))
continue
found = True
self._event_queue.put(event)
return found
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which skip events and extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "event":
return
if code_name == "heartbeat":
self.sock.sendall(b"processor")
return self._send_packet(self._code_name_mapping["heartbeat"])
return super()._handle_packet(code, packet_identifier, path, data)
class ProcessSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
plugin_paths=None, cache=None, cache_key_maker=None,
auto_connect_event_hub=None, schema_cache_path=None,
plugin_arguments=None, sock=None
):
super(ftrack_api.session.Session, self).__init__()
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
self._closed = False
if server_url is None:
server_url = os.environ.get('FTRACK_SERVER')
if not server_url:
raise TypeError(
'Required "server_url" not specified. Pass as argument or set '
'in environment variable FTRACK_SERVER.'
)
self._server_url = server_url
if api_key is None:
api_key = os.environ.get(
'FTRACK_API_KEY',
# Backwards compatibility
os.environ.get('FTRACK_APIKEY')
)
if not api_key:
raise TypeError(
'Required "api_key" not specified. Pass as argument or set in '
'environment variable FTRACK_API_KEY.'
)
self._api_key = api_key
if api_user is None:
api_user = os.environ.get('FTRACK_API_USER')
if not api_user:
try:
api_user = getpass.getuser()
except Exception:
pass
if not api_user:
raise TypeError(
'Required "api_user" not specified. Pass as argument, set in '
'environment variable FTRACK_API_USER or one of the standard '
'environment variables used by Python\'s getpass module.'
)
self._api_user = api_user
# Currently pending operations.
self.recorded_operations = ftrack_api.operation.Operations()
self.record_operations = True
self.cache_key_maker = cache_key_maker
if self.cache_key_maker is None:
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
# Enforce always having a memory cache at top level so that the same
# in-memory instance is returned from session.
self.cache = ftrack_api.cache.LayeredCache([
ftrack_api.cache.MemoryCache()
])
if cache is not None:
if callable(cache):
cache = cache(self)
if cache is not None:
self.cache.caches.append(cache)
self._managed_request = None
self._request = requests.Session()
self._request.auth = ftrack_api.session.SessionAuthentication(
self._api_key, self._api_user
)
self.auto_populate = auto_populate
# Fetch server information and in doing so also check credentials.
self._server_information = self._fetch_server_information()
# Now check compatibility of server based on retrieved information.
self.check_server_compatibility()
# Construct event hub and load plugins.
self._event_hub = ProcessEventHub(
self._server_url,
self._api_user,
self._api_key,
sock=sock
)
self._auto_connect_event_hub_thread = None
if auto_connect_event_hub in (None, True):
# Connect to event hub in background thread so as not to block main
# session usage waiting for event hub connection.
self._auto_connect_event_hub_thread = threading.Thread(
target=self._event_hub.connect
)
self._auto_connect_event_hub_thread.daemon = True
self._auto_connect_event_hub_thread.start()
# To help with migration from auto_connect_event_hub default changing
# from True to False.
self._event_hub._deprecation_warning_auto_connect = (
auto_connect_event_hub is None
)
# Register to auto-close session on exit.
atexit.register(self.close)
self._plugin_paths = plugin_paths
if self._plugin_paths is None:
self._plugin_paths = os.environ.get(
'FTRACK_EVENT_PLUGIN_PATH', ''
).split(os.pathsep)
self._discover_plugins(plugin_arguments=plugin_arguments)
# TODO: Make schemas read-only and non-mutable (or at least without
# rebuilding types)?
if schema_cache_path is not False:
if schema_cache_path is None:
schema_cache_path = os.environ.get(
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
)
schema_cache_path = os.path.join(
schema_cache_path, 'ftrack_api_schema_cache.json'
)
self.schemas = self._load_schemas(schema_cache_path)
self.types = self._build_entity_type_classes(self.schemas)
ftrack_api._centralized_storage_scenario.register(self)
self._configure_locations()
self.event_hub.publish(
ftrack_api.event.base.Event(
topic='ftrack.api.session.ready',
data=dict(
session=self
)
),
synchronous=True
)

View file

@ -0,0 +1,257 @@
import logging
import os
import atexit
import tempfile
import threading
import requests
import ftrack_api
import ftrack_api.session
import ftrack_api.cache
import ftrack_api.operation
import ftrack_api._centralized_storage_scenario
import ftrack_api.event
from ftrack_api.logging import LazyLogMessage as L
class StorerEventHub(ftrack_api.event.hub.EventHub):
def __init__(self, *args, **kwargs):
self.sock = kwargs.pop("sock")
super(StorerEventHub, self).__init__(*args, **kwargs)
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
if self._code_name_mapping[code] == "heartbeat":
# Reply with heartbeat.
self.sock.sendall(b"storer")
return self._send_packet(self._code_name_mapping['heartbeat'])
return super(StorerEventHub, self)._handle_packet(
code, packet_identifier, path, data
)
class StorerSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
plugin_paths=None, cache=None, cache_key_maker=None,
auto_connect_event_hub=None, schema_cache_path=None,
plugin_arguments=None, sock=None
):
'''Initialise session.
*server_url* should be the URL of the ftrack server to connect to
including any port number. If not specified attempt to look up from
:envvar:`FTRACK_SERVER`.
*api_key* should be the API key to use for authentication whilst
*api_user* should be the username of the user in ftrack to record
operations against. If not specified, *api_key* should be retrieved
from :envvar:`FTRACK_API_KEY` and *api_user* from
:envvar:`FTRACK_API_USER`.
If *auto_populate* is True (the default), then accessing entity
attributes will cause them to be automatically fetched from the server
if they are not already. This flag can be changed on the session
directly at any time.
*plugin_paths* should be a list of paths to search for plugins. If not
specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`.
*cache* should be an instance of a cache that fulfils the
:class:`ftrack_api.cache.Cache` interface and will be used as the cache
for the session. It can also be a callable that will be called with the
session instance as sole argument. The callable should return ``None``
if a suitable cache could not be configured, but session instantiation
can continue safely.
.. note::
The session will add the specified cache to a pre-configured layered
cache that specifies the top level cache as a
:class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary
to construct a separate memory cache for typical behaviour. Working
around this behaviour or removing the memory cache can lead to
unexpected behaviour.
*cache_key_maker* should be an instance of a key maker that fulfils the
:class:`ftrack_api.cache.KeyMaker` interface and will be used to
generate keys for objects being stored in the *cache*. If not specified,
a :class:`~ftrack_api.cache.StringKeyMaker` will be used.
If *auto_connect_event_hub* is True then embedded event hub will be
automatically connected to the event server and allow for publishing and
subscribing to **non-local** events. If False, then only publishing and
subscribing to **local** events will be possible until the hub is
manually connected using :meth:`EventHub.connect
<ftrack_api.event.hub.EventHub.connect>`.
.. note::
The event hub connection is performed in a background thread to
improve session startup time. If a registered plugin requires a
connected event hub then it should check the event hub connection
status explicitly. Subscribing to events does *not* require a
connected event hub.
Enable schema caching by setting *schema_cache_path* to a folder path.
If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to
determine the path to store cache in. If the environment variable is
also not specified then a temporary directory will be used. Set to
`False` to disable schema caching entirely.
*plugin_arguments* should be an optional mapping (dict) of keyword
arguments to pass to plugin register functions upon discovery. If a
discovered plugin has a signature that is incompatible with the passed
arguments, the discovery mechanism will attempt to reduce the passed
arguments to only those that the plugin accepts. Note that a warning
will be logged in this case.
'''
super(ftrack_api.session.Session, self).__init__()
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
self._closed = False
if server_url is None:
server_url = os.environ.get('FTRACK_SERVER')
if not server_url:
raise TypeError(
'Required "server_url" not specified. Pass as argument or set '
'in environment variable FTRACK_SERVER.'
)
self._server_url = server_url
if api_key is None:
api_key = os.environ.get(
'FTRACK_API_KEY',
# Backwards compatibility
os.environ.get('FTRACK_APIKEY')
)
if not api_key:
raise TypeError(
'Required "api_key" not specified. Pass as argument or set in '
'environment variable FTRACK_API_KEY.'
)
self._api_key = api_key
if api_user is None:
api_user = os.environ.get('FTRACK_API_USER')
if not api_user:
try:
api_user = getpass.getuser()
except Exception:
pass
if not api_user:
raise TypeError(
'Required "api_user" not specified. Pass as argument, set in '
'environment variable FTRACK_API_USER or one of the standard '
'environment variables used by Python\'s getpass module.'
)
self._api_user = api_user
# Currently pending operations.
self.recorded_operations = ftrack_api.operation.Operations()
self.record_operations = True
self.cache_key_maker = cache_key_maker
if self.cache_key_maker is None:
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
# Enforce always having a memory cache at top level so that the same
# in-memory instance is returned from session.
self.cache = ftrack_api.cache.LayeredCache([
ftrack_api.cache.MemoryCache()
])
if cache is not None:
if callable(cache):
cache = cache(self)
if cache is not None:
self.cache.caches.append(cache)
self._managed_request = None
self._request = requests.Session()
self._request.auth = ftrack_api.session.SessionAuthentication(
self._api_key, self._api_user
)
self.auto_populate = auto_populate
# Fetch server information and in doing so also check credentials.
self._server_information = self._fetch_server_information()
# Now check compatibility of server based on retrieved information.
self.check_server_compatibility()
# Construct event hub and load plugins.
self._event_hub = StorerEventHub(
self._server_url,
self._api_user,
self._api_key,
sock=sock
)
self._auto_connect_event_hub_thread = None
if auto_connect_event_hub in (None, True):
# Connect to event hub in background thread so as not to block main
# session usage waiting for event hub connection.
self._auto_connect_event_hub_thread = threading.Thread(
target=self._event_hub.connect
)
self._auto_connect_event_hub_thread.daemon = True
self._auto_connect_event_hub_thread.start()
# To help with migration from auto_connect_event_hub default changing
# from True to False.
self._event_hub._deprecation_warning_auto_connect = (
auto_connect_event_hub is None
)
# Register to auto-close session on exit.
atexit.register(self.close)
self._plugin_paths = plugin_paths
if self._plugin_paths is None:
self._plugin_paths = os.environ.get(
'FTRACK_EVENT_PLUGIN_PATH', ''
).split(os.pathsep)
self._discover_plugins(plugin_arguments=plugin_arguments)
# TODO: Make schemas read-only and non-mutable (or at least without
# rebuilding types)?
if schema_cache_path is not False:
if schema_cache_path is None:
schema_cache_path = os.environ.get(
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
)
schema_cache_path = os.path.join(
schema_cache_path, 'ftrack_api_schema_cache.json'
)
self.schemas = self._load_schemas(schema_cache_path)
self.types = self._build_entity_type_classes(self.schemas)
ftrack_api._centralized_storage_scenario.register(self)
self._configure_locations()
self.event_hub.publish(
ftrack_api.event.base.Event(
topic='ftrack.api.session.ready',
data=dict(
session=self
)
),
synchronous=True
)

View file

@ -0,0 +1,123 @@
import os
import sys
import time
import signal
import socket
import threading
import subprocess
from pypeapp import Logger
class SocketThread(threading.Thread):
"""Thread that checks suprocess of storer of processor of events"""
MAX_TIMEOUT = 35
def __init__(self, name, port, filepath):
super(SocketThread, self).__init__()
self.log = Logger().get_logger("SocketThread", "Event Thread")
self.setName(name)
self.name = name
self.port = port
self.filepath = filepath
self.sock = None
self.subproc = None
self.connection = None
self._is_running = False
self.finished = False
self.mongo_error = False
def stop(self):
self._is_running = False
def run(self):
self._is_running = True
time_socket = time.time()
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock = sock
# Bind the socket to the port - skip already used ports
while True:
try:
server_address = ("localhost", self.port)
sock.bind(server_address)
break
except OSError:
self.port += 1
self.log.debug(
"Running Socked thread on {}:{}".format(*server_address)
)
self.subproc = subprocess.Popen(
["python", self.filepath, "-port", str(self.port)],
stdout=subprocess.PIPE
)
# Listen for incoming connections
sock.listen(1)
sock.settimeout(1.0)
while True:
if not self._is_running:
break
try:
connection, client_address = sock.accept()
time_socket = time.time()
connection.settimeout(1.0)
self.connection = connection
except socket.timeout:
if (time.time() - time_socket) > self.MAX_TIMEOUT:
self.log.error("Connection timeout passed. Terminating.")
self._is_running = False
self.subproc.terminate()
break
continue
try:
time_con = time.time()
# Receive the data in small chunks and retransmit it
while True:
try:
if not self._is_running:
break
try:
data = connection.recv(16)
time_con = time.time()
except socket.timeout:
if (time.time() - time_con) > self.MAX_TIMEOUT:
self.log.error(
"Connection timeout passed. Terminating."
)
self._is_running = False
self.subproc.terminate()
break
continue
except ConnectionResetError:
self._is_running = False
break
if data:
if data == b"MongoError":
self.mongo_error = True
connection.sendall(data)
except Exception as exc:
self.log.error(
"Event server process failed", exc_info=True
)
finally:
# Clean up the connection
connection.close()
if self.subproc.poll() is None:
self.subproc.terminate()
lines = self.subproc.stdout.readlines()
if lines:
print("*** Socked Thread stdout ***")
for line in lines:
os.write(1, line)
self.finished = True

View file

@ -0,0 +1,53 @@
import os
import sys
import datetime
import signal
import socket
import pymongo
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.session_processor import ProcessSession
from pypeapp import Logger
log = Logger().get_logger("Event processor")
def main(args):
port = int(args[-1])
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ("localhost", port)
log.debug("Processor connected to {} port {}".format(*server_address))
sock.connect(server_address)
sock.sendall(b"CreatedProcess")
try:
session = ProcessSession(auto_connect_event_hub=True, sock=sock)
server = FtrackServer('event')
log.debug("Launched Ftrack Event processor")
server.run_server(session)
except Exception as exc:
import traceback
traceback.print_tb(exc.__traceback__)
finally:
log.debug("First closing socket")
sock.close()
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -0,0 +1,118 @@
import os
import sys
import datetime
import signal
import socket
import pymongo
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
from pype.ftrack.lib.custom_db_connector import DbConnector
from session_storer import StorerSession
from pypeapp import Logger
log = Logger().get_logger("Event storer")
url, database, table_name = get_ftrack_event_mongo_info()
dbcon = DbConnector(
mongo_url=url,
database_name=database,
table_name=table_name
)
# ignore_topics = ["ftrack.meta.connected"]
ignore_topics = []
def install_db():
try:
dbcon.install()
dbcon._database.collection_names()
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
def launch(event):
if event.get("topic") in ignore_topics:
return
event_data = event._data
event_id = event["id"]
event_data["pype_data"] = {
"stored": datetime.datetime.utcnow(),
"is_processed": False
}
try:
# dbcon.insert_one(event_data)
dbcon.update({"id": event_id}, event_data, upsert=True)
log.debug("Event: {} stored".format(event_id))
except pymongo.errors.AutoReconnect:
log.error("Mongo server \"{}\" is not responding, exiting.".format(
os.environ["AVALON_MONGO"]
))
sys.exit(0)
except Exception as exc:
log.error(
"Event: {} failed to store".format(event_id),
exc_info=True
)
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
install_db()
session.event_hub.subscribe("topic=*", launch)
def main(args):
port = int(args[-1])
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ("localhost", port)
log.debug("Storer connected to {} port {}".format(*server_address))
sock.connect(server_address)
sock.sendall(b"CreatedStore")
try:
session = StorerSession(auto_connect_event_hub=True, sock=sock)
register(session)
server = FtrackServer("event")
log.debug("Launched Ftrack Event storer")
server.run_server(session, load_files=False)
except pymongo.errors.OperationFailure:
log.error((
"Error with Mongo access, probably permissions."
"Check if exist database with name \"{}\""
" and collection \"{}\" inside."
).format(database, table_name))
sock.sendall(b"MongoError")
finally:
log.debug("First closing socket")
sock.close()
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -0,0 +1,100 @@
import os
import sys
import time
import datetime
import signal
import threading
from ftrack_server import FtrackServer
from pype.vendor import ftrack_api
from pype.vendor.ftrack_api.event.hub import EventHub
from pypeapp import Logger
log = Logger().get_logger("Event Server Old")
class TimerChecker(threading.Thread):
max_time_out = 35
def __init__(self, server, session):
self.server = server
self.session = session
self.is_running = False
self.failed = False
super().__init__()
def stop(self):
self.is_running = False
def run(self):
start = datetime.datetime.now()
self.is_running = True
connected = False
while True:
if not self.is_running:
break
if not self.session.event_hub.connected:
if not connected:
if (datetime.datetime.now() - start).seconds > self.max_time_out:
log.error((
"Exiting event server. Session was not connected"
" to ftrack server in {} seconds."
).format(self.max_time_out))
self.failed = True
break
else:
log.error(
"Exiting event server. Event Hub is not connected."
)
self.server.stop_session()
self.failed = True
break
else:
if not connected:
connected = True
time.sleep(1)
def main(args):
check_thread = None
try:
server = FtrackServer('event')
session = ftrack_api.Session(auto_connect_event_hub=True)
check_thread = TimerChecker(server, session)
check_thread.start()
log.debug("Launching Ftrack Event Old Way Server")
server.run_server(session)
except Exception as exc:
import traceback
traceback.print_tb(exc.__traceback__)
finally:
log_info = True
if check_thread is not None:
check_thread.stop()
check_thread.join()
if check_thread.failed:
log_info = False
if log_info:
log.info("Exiting Event server subprocess")
return 1
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if hasattr(signal, "SIGKILL"):
signal.signal(signal.SIGKILL, signal_handler)
sys.exit(main(sys.argv))

View file

@ -13,6 +13,7 @@ import logging
import tempfile
import functools
import contextlib
import atexit
import requests
@ -20,6 +21,9 @@ import requests
import pymongo
from pymongo.client_session import ClientSession
class NotActiveTable(Exception):
pass
def auto_reconnect(func):
"""Handling auto reconnect in 3 retry times"""
@functools.wraps(func)
@ -37,12 +41,23 @@ def auto_reconnect(func):
return decorated
def check_active_table(func):
"""Handling auto reconnect in 3 retry times"""
@functools.wraps(func)
def decorated(obj, *args, **kwargs):
if not obj.active_table:
raise NotActiveTable("Active table is not set. (This is bug)")
return func(obj, *args, **kwargs)
return decorated
class DbConnector:
log = logging.getLogger(__name__)
timeout = 1000
def __init__(self, mongo_url, database_name, table_name):
def __init__(self, mongo_url, database_name, table_name=None):
self._mongo_client = None
self._sentry_client = None
self._sentry_logging_handler = None
@ -53,11 +68,17 @@ class DbConnector:
self.active_table = table_name
def __getattribute__(self, attr):
try:
return super().__getattribute__(attr)
except AttributeError:
return self._database[self.active_table].__getattribute__(attr)
def install(self):
"""Establish a persistent connection to the database"""
if self._is_installed:
return
atexit.register(self.uninstall)
logging.basicConfig()
self._mongo_client = pymongo.MongoClient(
@ -99,6 +120,16 @@ class DbConnector:
self._mongo_client = None
self._database = None
self._is_installed = False
atexit.unregister(self.uninstall)
def create_table(self, name, **options):
if self.exist_table(name):
return
return self._database.create_collection(name, **options)
def exist_table(self, table_name):
return table_name in self.tables()
def tables(self):
"""List available tables
@ -115,93 +146,80 @@ class DbConnector:
def collections(self):
return self._database.collection_names()
@check_active_table
@auto_reconnect
def insert_one(self, item, session=None):
def insert_one(self, item, **options):
assert isinstance(item, dict), "item must be of type <dict>"
return self._database[self.active_table].insert_one(
item,
session=session
)
return self._database[self.active_table].insert_one(item, **options)
@check_active_table
@auto_reconnect
def insert_many(self, items, ordered=True, session=None):
def insert_many(self, items, ordered=True, **options):
# check if all items are valid
assert isinstance(items, list), "`items` must be of type <list>"
for item in items:
assert isinstance(item, dict), "`item` must be of type <dict>"
return self._database[self.active_table].insert_many(
items,
ordered=ordered,
session=session
)
options["ordered"] = ordered
return self._database[self.active_table].insert_many(items, **options)
@check_active_table
@auto_reconnect
def find(self, filter, projection=None, sort=None, session=None):
return self._database[self.active_table].find(
filter=filter,
projection=projection,
sort=sort,
session=session
)
def find(self, filter, projection=None, sort=None, **options):
options["projection"] = projection
options["sort"] = sort
return self._database[self.active_table].find(filter, **options)
@check_active_table
@auto_reconnect
def find_one(self, filter, projection=None, sort=None, session=None):
def find_one(self, filter, projection=None, sort=None, **options):
assert isinstance(filter, dict), "filter must be <dict>"
return self._database[self.active_table].find_one(
filter=filter,
projection=projection,
sort=sort,
session=session
)
options["projection"] = projection
options["sort"] = sort
return self._database[self.active_table].find_one(filter, **options)
@check_active_table
@auto_reconnect
def replace_one(self, filter, replacement, session=None):
def replace_one(self, filter, replacement, **options):
return self._database[self.active_table].replace_one(
filter, replacement,
session=session
filter, replacement, **options
)
@check_active_table
@auto_reconnect
def update_one(self, filter, update, session=None):
def update_one(self, filter, update, **options):
return self._database[self.active_table].update_one(
filter, update,
session=session
filter, update, **options
)
@check_active_table
@auto_reconnect
def update_many(self, filter, update, session=None):
def update_many(self, filter, update, **options):
return self._database[self.active_table].update_many(
filter, update,
session=session
filter, update, **options
)
@check_active_table
@auto_reconnect
def distinct(self, *args, **kwargs):
return self._database[self.active_table].distinct(
*args, **kwargs
)
return self._database[self.active_table].distinct(*args, **kwargs)
@check_active_table
@auto_reconnect
def drop_collection(self, name_or_collection, session=None):
def drop_collection(self, name_or_collection, **options):
return self._database[self.active_table].drop(
name_or_collection,
session=session
name_or_collection, **options
)
@check_active_table
@auto_reconnect
def delete_one(filter, collation=None, session=None):
return self._database[self.active_table].delete_one(
filter,
collation=collation,
session=session
)
def delete_one(self, filter, collation=None, **options):
options["collation"] = collation
return self._database[self.active_table].delete_one(filter, **options)
@check_active_table
@auto_reconnect
def delete_many(filter, collation=None, session=None):
return self._database[self.active_table].delete_many(
filter,
collation=collation,
session=session
)
def delete_many(self, filter, collation=None, **options):
options["collation"] = collation
return self._database[self.active_table].delete_many(filter, **options)

View file

@ -3,6 +3,7 @@ import time
from pypeapp import Logger
from pype.vendor import ftrack_api
from pype.vendor.ftrack_api import session as fa_session
from pype.ftrack.ftrack_server import session_processor
class MissingPermision(Exception):
@ -31,8 +32,21 @@ class BaseHandler(object):
def __init__(self, session, plugins_presets={}):
'''Expects a ftrack_api.Session instance'''
self._session = session
self.log = Logger().get_logger(self.__class__.__name__)
if not(
isinstance(session, ftrack_api.session.Session) or
isinstance(session, session_processor.ProcessSession)
):
raise Exception((
"Session object entered with args is instance of \"{}\""
" but expected instances are \"{}\" and \"{}\""
).format(
str(type(session)),
str(ftrack_api.session.Session),
str(session_processor.ProcessSession)
))
self._session = session
# Using decorator
self.register = self.register_decorator(self.register)