mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-25 13:24:54 +01:00
Merged in feature/ftrack_server_cleanup (pull request #413)
Feature/ftrack server cleanup Approved-by: Milan Kolar <milan@orbi.tools>
This commit is contained in:
commit
1d56d8adb9
8 changed files with 360 additions and 591 deletions
|
|
@ -1,10 +1,32 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import getpass
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import datetime
|
||||
import time
|
||||
import queue
|
||||
import pymongo
|
||||
|
||||
import requests
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
try:
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
except ImportError:
|
||||
from urlparse import urlparse, parse_qs
|
||||
|
||||
from pypeapp import Logger
|
||||
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
|
||||
|
||||
def ftrack_events_mongo_settings():
|
||||
host = None
|
||||
|
|
@ -49,7 +71,9 @@ def ftrack_events_mongo_settings():
|
|||
|
||||
|
||||
def get_ftrack_event_mongo_info():
|
||||
host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings()
|
||||
host, port, database, username, password, collection, auth_db = (
|
||||
ftrack_events_mongo_settings()
|
||||
)
|
||||
user_pass = ""
|
||||
if username and password:
|
||||
user_pass = "{}:{}@".format(username, password)
|
||||
|
|
@ -97,3 +121,303 @@ def check_ftrack_url(url, log_errors=True):
|
|||
print('DEBUG: Ftrack server {} is accessible.'.format(url))
|
||||
|
||||
return url
|
||||
|
||||
|
||||
class StorerEventHub(ftrack_api.event.hub.EventHub):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(StorerEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "heartbeat":
|
||||
# Reply with heartbeat.
|
||||
self.sock.sendall(b"storer")
|
||||
return self._send_packet(self._code_name_mapping['heartbeat'])
|
||||
|
||||
elif code_name == "connect":
|
||||
event = ftrack_api.event.base.Event(
|
||||
topic="pype.storer.started",
|
||||
data={},
|
||||
source={
|
||||
"id": self.id,
|
||||
"user": {"username": self._api_user}
|
||||
}
|
||||
)
|
||||
self._event_queue.put(event)
|
||||
|
||||
return super(StorerEventHub, self)._handle_packet(
|
||||
code, packet_identifier, path, data
|
||||
)
|
||||
|
||||
|
||||
class ProcessEventHub(ftrack_api.event.hub.EventHub):
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
is_table_created = False
|
||||
pypelog = Logger().get_logger("Session Processor")
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dbcon = DbConnector(
|
||||
mongo_url=self.url,
|
||||
database_name=self.database,
|
||||
table_name=self.table_name
|
||||
)
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(ProcessEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def prepare_dbcon(self):
|
||||
try:
|
||||
self.dbcon.install()
|
||||
self.dbcon._database.list_collection_names()
|
||||
except pymongo.errors.AutoReconnect:
|
||||
self.pypelog.error(
|
||||
"Mongo server \"{}\" is not responding, exiting.".format(
|
||||
os.environ["AVALON_MONGO"]
|
||||
)
|
||||
)
|
||||
sys.exit(0)
|
||||
|
||||
except pymongo.errors.OperationFailure:
|
||||
self.pypelog.error((
|
||||
"Error with Mongo access, probably permissions."
|
||||
"Check if exist database with name \"{}\""
|
||||
" and collection \"{}\" inside."
|
||||
).format(self.database, self.table_name))
|
||||
self.sock.sendall(b"MongoError")
|
||||
sys.exit(0)
|
||||
|
||||
def wait(self, duration=None):
|
||||
"""Overriden wait
|
||||
|
||||
Event are loaded from Mongo DB when queue is empty. Handled event is
|
||||
set as processed in Mongo DB.
|
||||
"""
|
||||
started = time.time()
|
||||
self.prepare_dbcon()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.load_events():
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
try:
|
||||
self._handle(event)
|
||||
self.dbcon.update_one(
|
||||
{"id": event["id"]},
|
||||
{"$set": {"pype_data.is_processed": True}}
|
||||
)
|
||||
except pymongo.errors.AutoReconnect:
|
||||
self.pypelog.error((
|
||||
"Mongo server \"{}\" is not responding, exiting."
|
||||
).format(os.environ["AVALON_MONGO"]))
|
||||
sys.exit(0)
|
||||
# Additional special processing of events.
|
||||
if event['topic'] == 'ftrack.meta.disconnected':
|
||||
break
|
||||
|
||||
if duration is not None:
|
||||
if (time.time() - started) > duration:
|
||||
break
|
||||
|
||||
def load_events(self):
|
||||
"""Load not processed events sorted by stored date"""
|
||||
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
|
||||
result = self.dbcon.delete_many({
|
||||
"pype_data.stored": {"$lte": ago_date},
|
||||
"pype_data.is_processed": True
|
||||
})
|
||||
|
||||
not_processed_events = self.dbcon.find(
|
||||
{"pype_data.is_processed": False}
|
||||
).sort(
|
||||
[("pype_data.stored", pymongo.ASCENDING)]
|
||||
)
|
||||
|
||||
found = False
|
||||
for event_data in not_processed_events:
|
||||
new_event_data = {
|
||||
k: v for k, v in event_data.items()
|
||||
if k not in ["_id", "pype_data"]
|
||||
}
|
||||
try:
|
||||
event = ftrack_api.event.base.Event(**new_event_data)
|
||||
except Exception:
|
||||
self.logger.exception(L(
|
||||
'Failed to convert payload into event: {0}',
|
||||
event_data
|
||||
))
|
||||
continue
|
||||
found = True
|
||||
self._event_queue.put(event)
|
||||
|
||||
return found
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which skip events and extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "event":
|
||||
return
|
||||
if code_name == "heartbeat":
|
||||
self.sock.sendall(b"processor")
|
||||
return self._send_packet(self._code_name_mapping["heartbeat"])
|
||||
|
||||
return super()._handle_packet(code, packet_identifier, path, data)
|
||||
class SocketSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None, Eventhub=None
|
||||
):
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
if Eventhub is None:
|
||||
Eventhub = ftrack_api.event.hub.EventHub
|
||||
self._event_hub = Eventhub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,292 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import datetime
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import queue
|
||||
import pymongo
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Session processor")
|
||||
|
||||
|
||||
class ProcessEventHub(ftrack_api.event.hub.EventHub):
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
is_table_created = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dbcon = DbConnector(
|
||||
mongo_url=self.url,
|
||||
database_name=self.database,
|
||||
table_name=self.table_name
|
||||
)
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(ProcessEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def prepare_dbcon(self):
|
||||
try:
|
||||
self.dbcon.install()
|
||||
self.dbcon._database.list_collection_names()
|
||||
except pymongo.errors.AutoReconnect:
|
||||
log.error("Mongo server \"{}\" is not responding, exiting.".format(
|
||||
os.environ["AVALON_MONGO"]
|
||||
))
|
||||
sys.exit(0)
|
||||
|
||||
except pymongo.errors.OperationFailure:
|
||||
log.error((
|
||||
"Error with Mongo access, probably permissions."
|
||||
"Check if exist database with name \"{}\""
|
||||
" and collection \"{}\" inside."
|
||||
).format(self.database, self.table_name))
|
||||
self.sock.sendall(b"MongoError")
|
||||
sys.exit(0)
|
||||
|
||||
def wait(self, duration=None):
|
||||
"""Overriden wait
|
||||
|
||||
Event are loaded from Mongo DB when queue is empty. Handled event is
|
||||
set as processed in Mongo DB.
|
||||
"""
|
||||
started = time.time()
|
||||
self.prepare_dbcon()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.load_events():
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
try:
|
||||
self._handle(event)
|
||||
self.dbcon.update_one(
|
||||
{"id": event["id"]},
|
||||
{"$set": {"pype_data.is_processed": True}}
|
||||
)
|
||||
except pymongo.errors.AutoReconnect:
|
||||
log.error((
|
||||
"Mongo server \"{}\" is not responding, exiting."
|
||||
).format(os.environ["AVALON_MONGO"]))
|
||||
sys.exit(0)
|
||||
# Additional special processing of events.
|
||||
if event['topic'] == 'ftrack.meta.disconnected':
|
||||
break
|
||||
|
||||
if duration is not None:
|
||||
if (time.time() - started) > duration:
|
||||
break
|
||||
|
||||
def load_events(self):
|
||||
"""Load not processed events sorted by stored date"""
|
||||
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
|
||||
result = self.dbcon.delete_many({
|
||||
"pype_data.stored": {"$lte": ago_date},
|
||||
"pype_data.is_processed": True
|
||||
})
|
||||
|
||||
not_processed_events = self.dbcon.find(
|
||||
{"pype_data.is_processed": False}
|
||||
).sort(
|
||||
[("pype_data.stored", pymongo.ASCENDING)]
|
||||
)
|
||||
|
||||
found = False
|
||||
for event_data in not_processed_events:
|
||||
new_event_data = {
|
||||
k: v for k, v in event_data.items()
|
||||
if k not in ["_id", "pype_data"]
|
||||
}
|
||||
try:
|
||||
event = ftrack_api.event.base.Event(**new_event_data)
|
||||
except Exception:
|
||||
self.logger.exception(L(
|
||||
'Failed to convert payload into event: {0}',
|
||||
event_data
|
||||
))
|
||||
continue
|
||||
found = True
|
||||
self._event_queue.put(event)
|
||||
|
||||
return found
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which skip events and extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "event":
|
||||
return
|
||||
if code_name == "heartbeat":
|
||||
self.sock.sendall(b"processor")
|
||||
return self._send_packet(self._code_name_mapping["heartbeat"])
|
||||
|
||||
return super()._handle_packet(code, packet_identifier, path, data)
|
||||
|
||||
|
||||
class ProcessSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = ProcessEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
@ -1,269 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import requests
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
|
||||
class StorerEventHub(ftrack_api.event.hub.EventHub):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(StorerEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "heartbeat":
|
||||
# Reply with heartbeat.
|
||||
self.sock.sendall(b"storer")
|
||||
return self._send_packet(self._code_name_mapping['heartbeat'])
|
||||
|
||||
elif code_name == "connect":
|
||||
event = ftrack_api.event.base.Event(
|
||||
topic="pype.storer.started",
|
||||
data={},
|
||||
source={
|
||||
"id": self.id,
|
||||
"user": {"username": self._api_user}
|
||||
}
|
||||
)
|
||||
self._event_queue.put(event)
|
||||
|
||||
return super(StorerEventHub, self)._handle_packet(
|
||||
code, packet_identifier, path, data
|
||||
)
|
||||
|
||||
|
||||
class StorerSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
'''Initialise session.
|
||||
|
||||
*server_url* should be the URL of the ftrack server to connect to
|
||||
including any port number. If not specified attempt to look up from
|
||||
:envvar:`FTRACK_SERVER`.
|
||||
|
||||
*api_key* should be the API key to use for authentication whilst
|
||||
*api_user* should be the username of the user in ftrack to record
|
||||
operations against. If not specified, *api_key* should be retrieved
|
||||
from :envvar:`FTRACK_API_KEY` and *api_user* from
|
||||
:envvar:`FTRACK_API_USER`.
|
||||
|
||||
If *auto_populate* is True (the default), then accessing entity
|
||||
attributes will cause them to be automatically fetched from the server
|
||||
if they are not already. This flag can be changed on the session
|
||||
directly at any time.
|
||||
|
||||
*plugin_paths* should be a list of paths to search for plugins. If not
|
||||
specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`.
|
||||
|
||||
*cache* should be an instance of a cache that fulfils the
|
||||
:class:`ftrack_api.cache.Cache` interface and will be used as the cache
|
||||
for the session. It can also be a callable that will be called with the
|
||||
session instance as sole argument. The callable should return ``None``
|
||||
if a suitable cache could not be configured, but session instantiation
|
||||
can continue safely.
|
||||
|
||||
.. note::
|
||||
|
||||
The session will add the specified cache to a pre-configured layered
|
||||
cache that specifies the top level cache as a
|
||||
:class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary
|
||||
to construct a separate memory cache for typical behaviour. Working
|
||||
around this behaviour or removing the memory cache can lead to
|
||||
unexpected behaviour.
|
||||
|
||||
*cache_key_maker* should be an instance of a key maker that fulfils the
|
||||
:class:`ftrack_api.cache.KeyMaker` interface and will be used to
|
||||
generate keys for objects being stored in the *cache*. If not specified,
|
||||
a :class:`~ftrack_api.cache.StringKeyMaker` will be used.
|
||||
|
||||
If *auto_connect_event_hub* is True then embedded event hub will be
|
||||
automatically connected to the event server and allow for publishing and
|
||||
subscribing to **non-local** events. If False, then only publishing and
|
||||
subscribing to **local** events will be possible until the hub is
|
||||
manually connected using :meth:`EventHub.connect
|
||||
<ftrack_api.event.hub.EventHub.connect>`.
|
||||
|
||||
.. note::
|
||||
|
||||
The event hub connection is performed in a background thread to
|
||||
improve session startup time. If a registered plugin requires a
|
||||
connected event hub then it should check the event hub connection
|
||||
status explicitly. Subscribing to events does *not* require a
|
||||
connected event hub.
|
||||
|
||||
Enable schema caching by setting *schema_cache_path* to a folder path.
|
||||
If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to
|
||||
determine the path to store cache in. If the environment variable is
|
||||
also not specified then a temporary directory will be used. Set to
|
||||
`False` to disable schema caching entirely.
|
||||
|
||||
*plugin_arguments* should be an optional mapping (dict) of keyword
|
||||
arguments to pass to plugin register functions upon discovery. If a
|
||||
discovered plugin has a signature that is incompatible with the passed
|
||||
arguments, the discovery mechanism will attempt to reduce the passed
|
||||
arguments to only those that the plugin accepts. Note that a warning
|
||||
will be logged in this case.
|
||||
|
||||
'''
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = StorerEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
@ -1,7 +1,5 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
import subprocess
|
||||
|
|
@ -10,7 +8,9 @@ from pypeapp import Logger
|
|||
|
||||
class SocketThread(threading.Thread):
|
||||
"""Thread that checks suprocess of storer of processor of events"""
|
||||
|
||||
MAX_TIMEOUT = 35
|
||||
|
||||
def __init__(self, name, port, filepath):
|
||||
super(SocketThread, self).__init__()
|
||||
self.log = Logger().get_logger("SocketThread", "Event Thread")
|
||||
|
|
|
|||
|
|
@ -1,12 +1,9 @@
|
|||
import os
|
||||
import sys
|
||||
import datetime
|
||||
import signal
|
||||
import socket
|
||||
import pymongo
|
||||
|
||||
from ftrack_server import FtrackServer
|
||||
from pype.ftrack.ftrack_server.session_processor import ProcessSession
|
||||
from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event processor")
|
||||
|
|
@ -24,12 +21,14 @@ def main(args):
|
|||
|
||||
sock.sendall(b"CreatedProcess")
|
||||
try:
|
||||
session = ProcessSession(auto_connect_event_hub=True, sock=sock)
|
||||
server = FtrackServer('event')
|
||||
session = SocketSession(
|
||||
auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub
|
||||
)
|
||||
server = FtrackServer("event")
|
||||
log.debug("Launched Ftrack Event processor")
|
||||
server.run_server(session)
|
||||
|
||||
except Exception as exc:
|
||||
except Exception:
|
||||
log.error("Event server crashed. See traceback below", exc_info=True)
|
||||
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -7,22 +7,22 @@ import pymongo
|
|||
|
||||
import ftrack_api
|
||||
from ftrack_server import FtrackServer
|
||||
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
|
||||
from pype.ftrack.ftrack_server.lib import (
|
||||
get_ftrack_event_mongo_info,
|
||||
SocketSession,
|
||||
StorerEventHub
|
||||
)
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
from session_storer import StorerSession
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event storer")
|
||||
|
||||
|
||||
class SessionFactory:
|
||||
session = None
|
||||
|
||||
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
|
||||
class SessionClass:
|
||||
def __init__(self):
|
||||
self.session = None
|
||||
|
||||
|
||||
session_obj = SessionClass()
|
||||
dbcon = DbConnector(
|
||||
mongo_url=url,
|
||||
database_name=database,
|
||||
|
|
@ -75,7 +75,11 @@ def launch(event):
|
|||
|
||||
|
||||
def trigger_sync(event):
|
||||
session = session_obj.session
|
||||
session = SessionFactory.session
|
||||
source_id = event.get("source", {}).get("id")
|
||||
if not source_id or source_id != session.event_hub.id:
|
||||
return
|
||||
|
||||
if session is None:
|
||||
log.warning("Session is not set. Can't trigger Sync to avalon action.")
|
||||
return True
|
||||
|
|
@ -93,7 +97,7 @@ def trigger_sync(event):
|
|||
"$set": {"pype_data.is_processed": True}
|
||||
}
|
||||
dbcon.update_many(query, set_dict)
|
||||
|
||||
|
||||
selections = []
|
||||
for project in projects:
|
||||
if project["status"] != "active":
|
||||
|
|
@ -154,8 +158,10 @@ def main(args):
|
|||
sock.sendall(b"CreatedStore")
|
||||
|
||||
try:
|
||||
session = StorerSession(auto_connect_event_hub=True, sock=sock)
|
||||
session_obj.session = session
|
||||
session = SocketSession(
|
||||
auto_connect_event_hub=True, sock=sock, Eventhub=StorerEventHub
|
||||
)
|
||||
SessionFactory.session = session
|
||||
register(session)
|
||||
server = FtrackServer("event")
|
||||
log.debug("Launched Ftrack Event storer")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import datetime
|
||||
|
|
@ -7,7 +6,6 @@ import threading
|
|||
|
||||
from ftrack_server import FtrackServer
|
||||
import ftrack_api
|
||||
from ftrack_api.event.hub import EventHub
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event Server Legacy")
|
||||
|
|
@ -37,7 +35,10 @@ class TimerChecker(threading.Thread):
|
|||
|
||||
if not self.session.event_hub.connected:
|
||||
if not connected:
|
||||
if (datetime.datetime.now() - start).seconds > self.max_time_out:
|
||||
if (
|
||||
(datetime.datetime.now() - start).seconds >
|
||||
self.max_time_out
|
||||
):
|
||||
log.error((
|
||||
"Exiting event server. Session was not connected"
|
||||
" to ftrack server in {} seconds."
|
||||
|
|
@ -61,7 +62,7 @@ class TimerChecker(threading.Thread):
|
|||
def main(args):
|
||||
check_thread = None
|
||||
try:
|
||||
server = FtrackServer('event')
|
||||
server = FtrackServer("event")
|
||||
session = ftrack_api.Session(auto_connect_event_hub=True)
|
||||
|
||||
check_thread = TimerChecker(server, session)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import functools
|
|||
import time
|
||||
from pypeapp import Logger
|
||||
import ftrack_api
|
||||
from pype.ftrack.ftrack_server import session_processor
|
||||
from pype.ftrack.ftrack_server.lib import SocketSession
|
||||
|
||||
|
||||
class MissingPermision(Exception):
|
||||
|
|
@ -41,7 +41,7 @@ class BaseHandler(object):
|
|||
self.log = Logger().get_logger(self.__class__.__name__)
|
||||
if not(
|
||||
isinstance(session, ftrack_api.session.Session) or
|
||||
isinstance(session, session_processor.ProcessSession)
|
||||
isinstance(session, SocketSession)
|
||||
):
|
||||
raise Exception((
|
||||
"Session object entered with args is instance of \"{}\""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue