mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
added modified ftrack sessions, one for storing, second for processing events
This commit is contained in:
parent
1ee8ab0dab
commit
631dcb56ea
2 changed files with 516 additions and 0 deletions
|
|
@ -0,0 +1,259 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import queue
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
|
||||
|
||||
class ProcessEventHub(ftrack_api.event.hub.EventHub):
|
||||
dbcon = DbConnector(
|
||||
mongo_url=os.environ["AVALON_MONGO"],
|
||||
database_name="pype"
|
||||
)
|
||||
table_name = "ftrack_events"
|
||||
is_table_created = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(ProcessEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def prepare_dbcon(self):
|
||||
self.dbcon.install()
|
||||
if not self.is_table_created:
|
||||
self.dbcon.create_table(self.table_name, capped=False)
|
||||
self.dbcon.active_table = self.table_name
|
||||
self.is_table_created = True
|
||||
|
||||
def wait(self, duration=None):
|
||||
'''Wait for events and handle as they arrive.
|
||||
|
||||
If *duration* is specified, then only process events until duration is
|
||||
reached. *duration* is in seconds though float values can be used for
|
||||
smaller values.
|
||||
|
||||
'''
|
||||
started = time.time()
|
||||
|
||||
self.prepare_dbcon()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.load_events():
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
self._handle(event)
|
||||
self.dbcon.update_one(
|
||||
{"id": event["id"]},
|
||||
{"$set": {"pype_data.is_processed": True}}
|
||||
)
|
||||
# Additional special processing of events.
|
||||
if event['topic'] == 'ftrack.meta.disconnected':
|
||||
break
|
||||
|
||||
if duration is not None:
|
||||
if (time.time() - started) > duration:
|
||||
break
|
||||
|
||||
def load_events(self):
|
||||
not_processed_events = self.dbcon.find({"pype_data.is_processed": False})
|
||||
found = False
|
||||
for event_data in not_processed_events:
|
||||
new_event_data = {
|
||||
k: v for k, v in event_data.items()
|
||||
if k not in ["_id", "pype_data"]
|
||||
}
|
||||
try:
|
||||
event = ftrack_api.event.base.Event(**new_event_data)
|
||||
print(event)
|
||||
except Exception:
|
||||
self.logger.exception(L(
|
||||
'Failed to convert payload into event: {0}',
|
||||
event_data
|
||||
))
|
||||
continue
|
||||
found = True
|
||||
self._event_queue.put(event)
|
||||
|
||||
return found
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
'''Handle packet received from server.'''
|
||||
# if self.is_waiting:
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "event":
|
||||
return
|
||||
if code_name == "heartbeat":
|
||||
self.sock.sendall(b"processor")
|
||||
return self._send_packet(self._code_name_mapping["heartbeat"])
|
||||
|
||||
return super()._handle_packet(code, packet_identifier, path, data)
|
||||
|
||||
|
||||
class ProcessSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = ProcessEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
@ -0,0 +1,257 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import requests
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
|
||||
class StorerEventHub(ftrack_api.event.hub.EventHub):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(StorerEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
'''Handle packet received from server.'''
|
||||
if self._code_name_mapping[code] == "heartbeat":
|
||||
# Reply with heartbeat.
|
||||
self.sock.sendall(b"storer")
|
||||
return self._send_packet(self._code_name_mapping['heartbeat'])
|
||||
|
||||
return super(StorerEventHub, self)._handle_packet(
|
||||
code, packet_identifier, path, data
|
||||
)
|
||||
|
||||
|
||||
class StorerSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
'''Initialise session.
|
||||
|
||||
*server_url* should be the URL of the ftrack server to connect to
|
||||
including any port number. If not specified attempt to look up from
|
||||
:envvar:`FTRACK_SERVER`.
|
||||
|
||||
*api_key* should be the API key to use for authentication whilst
|
||||
*api_user* should be the username of the user in ftrack to record
|
||||
operations against. If not specified, *api_key* should be retrieved
|
||||
from :envvar:`FTRACK_API_KEY` and *api_user* from
|
||||
:envvar:`FTRACK_API_USER`.
|
||||
|
||||
If *auto_populate* is True (the default), then accessing entity
|
||||
attributes will cause them to be automatically fetched from the server
|
||||
if they are not already. This flag can be changed on the session
|
||||
directly at any time.
|
||||
|
||||
*plugin_paths* should be a list of paths to search for plugins. If not
|
||||
specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`.
|
||||
|
||||
*cache* should be an instance of a cache that fulfils the
|
||||
:class:`ftrack_api.cache.Cache` interface and will be used as the cache
|
||||
for the session. It can also be a callable that will be called with the
|
||||
session instance as sole argument. The callable should return ``None``
|
||||
if a suitable cache could not be configured, but session instantiation
|
||||
can continue safely.
|
||||
|
||||
.. note::
|
||||
|
||||
The session will add the specified cache to a pre-configured layered
|
||||
cache that specifies the top level cache as a
|
||||
:class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary
|
||||
to construct a separate memory cache for typical behaviour. Working
|
||||
around this behaviour or removing the memory cache can lead to
|
||||
unexpected behaviour.
|
||||
|
||||
*cache_key_maker* should be an instance of a key maker that fulfils the
|
||||
:class:`ftrack_api.cache.KeyMaker` interface and will be used to
|
||||
generate keys for objects being stored in the *cache*. If not specified,
|
||||
a :class:`~ftrack_api.cache.StringKeyMaker` will be used.
|
||||
|
||||
If *auto_connect_event_hub* is True then embedded event hub will be
|
||||
automatically connected to the event server and allow for publishing and
|
||||
subscribing to **non-local** events. If False, then only publishing and
|
||||
subscribing to **local** events will be possible until the hub is
|
||||
manually connected using :meth:`EventHub.connect
|
||||
<ftrack_api.event.hub.EventHub.connect>`.
|
||||
|
||||
.. note::
|
||||
|
||||
The event hub connection is performed in a background thread to
|
||||
improve session startup time. If a registered plugin requires a
|
||||
connected event hub then it should check the event hub connection
|
||||
status explicitly. Subscribing to events does *not* require a
|
||||
connected event hub.
|
||||
|
||||
Enable schema caching by setting *schema_cache_path* to a folder path.
|
||||
If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to
|
||||
determine the path to store cache in. If the environment variable is
|
||||
also not specified then a temporary directory will be used. Set to
|
||||
`False` to disable schema caching entirely.
|
||||
|
||||
*plugin_arguments* should be an optional mapping (dict) of keyword
|
||||
arguments to pass to plugin register functions upon discovery. If a
|
||||
discovered plugin has a signature that is incompatible with the passed
|
||||
arguments, the discovery mechanism will attempt to reduce the passed
|
||||
arguments to only those that the plugin accepts. Note that a warning
|
||||
will be logged in this case.
|
||||
|
||||
'''
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = StorerEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue