From a88b808207c2b75e5b16e612f42822547cfbb244 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Fri, 3 Apr 2020 14:05:59 +0200 Subject: [PATCH] feat(service): adding adobe restapi service --- .../adobe_communicator/adobe_comunicator.py | 40 ++ .../adobe_communicator/lib/__init__.py | 1 + .../adobe_communicator/lib/io_nonsingleton.py | 446 ++++++++++++++++++ pype/services/adobe_communicator/lib/lib.py | 102 ++++ .../premiere_communicator/__init__.py | 5 - .../premiere_comunicator.py | 20 - 6 files changed, 589 insertions(+), 25 deletions(-) create mode 100644 pype/services/adobe_communicator/adobe_comunicator.py create mode 100644 pype/services/adobe_communicator/lib/__init__.py create mode 100644 pype/services/adobe_communicator/lib/io_nonsingleton.py create mode 100644 pype/services/adobe_communicator/lib/lib.py delete mode 100644 pype/services/premiere_communicator/__init__.py delete mode 100644 pype/services/premiere_communicator/premiere_comunicator.py diff --git a/pype/services/adobe_communicator/adobe_comunicator.py b/pype/services/adobe_communicator/adobe_comunicator.py new file mode 100644 index 0000000000..a2741270fc --- /dev/null +++ b/pype/services/adobe_communicator/adobe_comunicator.py @@ -0,0 +1,40 @@ +import os +from pypeapp import config, Logger + +log = Logger().get_logger("AdobeCommunicator") + + +class AdobeCommunicator: + rest_api_obj = None + + def __init__(self): + try: + self.presets = config.get_presets( + )["services"]["adobe_commuticator"] + except Exception: + self.presets = {"statics": {}, "rest_api": False} + log.debug(( + "There are not set presets for AdobeCommunicator." + " Using defaults \"{}\"" + ).format(str(self.presets))) + + def tray_start(self): + return + + def process_modules(self, modules): + rest_api_module = modules.get("RestApiServer") + if rest_api_module: + self.rest_api_registration(rest_api_module) + + def rest_api_registration(self, module): + for prefix, static_path in self.presets["statics"].items(): + static_path = static_path.format( + **dict(os.environ)).replace("\\", "/") + module.register_statics(prefix, static_path) + log.info((f"Adobe Communicator Registering static" + f"> `{prefix}` at `{static_path}`")) + + if all((self.presets["rest_api"], + not bool(self.rest_api_obj))): + from .lib import AdobeRestApi + self.rest_api_obj = AdobeRestApi() diff --git a/pype/services/adobe_communicator/lib/__init__.py b/pype/services/adobe_communicator/lib/__init__.py new file mode 100644 index 0000000000..76ed63c3fc --- /dev/null +++ b/pype/services/adobe_communicator/lib/__init__.py @@ -0,0 +1 @@ +from .lib import AdobeRestApi diff --git a/pype/services/adobe_communicator/lib/io_nonsingleton.py b/pype/services/adobe_communicator/lib/io_nonsingleton.py new file mode 100644 index 0000000000..6380e4eb23 --- /dev/null +++ b/pype/services/adobe_communicator/lib/io_nonsingleton.py @@ -0,0 +1,446 @@ +""" +Wrapper around interactions with the database + +Copy of io module in avalon-core. + - In this case not working as singleton with api.Session! +""" + +import os +import time +import errno +import shutil +import logging +import tempfile +import functools +import contextlib + +from avalon import schema +from avalon.vendor import requests + +# Third-party dependencies +import pymongo + + +def auto_reconnect(func): + """Handling auto reconnect in 3 retry times""" + @functools.wraps(func) + def decorated(*args, **kwargs): + object = args[0] + for retry in range(3): + try: + return func(*args, **kwargs) + except pymongo.errors.AutoReconnect: + object.log.error("Reconnecting..") + time.sleep(0.1) + else: + raise + + return decorated + + +class DbConnector(object): + + log = logging.getLogger(__name__) + + def __init__(self): + self.Session = {} + self._mongo_client = None + self._sentry_client = None + self._sentry_logging_handler = None + self._database = None + self._is_installed = False + + def __getitem__(self, key): + # gives direct access to collection withou setting `active_table` + return self._database[key] + + def __getattribute__(self, attr): + # not all methods of PyMongo database are implemented with this it is + # possible to use them too + try: + return super(DbConnector, self).__getattribute__(attr) + except AttributeError: + cur_proj = self.Session["AVALON_PROJECT"] + return self._database[cur_proj].__getattribute__(attr) + + def install(self): + """Establish a persistent connection to the database""" + if self._is_installed: + return + + logging.basicConfig() + self.Session.update(self._from_environment()) + + timeout = int(self.Session["AVALON_TIMEOUT"]) + self._mongo_client = pymongo.MongoClient( + self.Session["AVALON_MONGO"], serverSelectionTimeoutMS=timeout) + + for retry in range(3): + try: + t1 = time.time() + self._mongo_client.server_info() + + except Exception: + self.log.error("Retrying..") + time.sleep(1) + timeout *= 1.5 + + else: + break + + else: + raise IOError( + "ERROR: Couldn't connect to %s in " + "less than %.3f ms" % (self.Session["AVALON_MONGO"], timeout)) + + self.log.info("Connected to %s, delay %.3f s" % ( + self.Session["AVALON_MONGO"], time.time() - t1)) + + self._install_sentry() + + self._database = self._mongo_client[self.Session["AVALON_DB"]] + self._is_installed = True + + def _install_sentry(self): + if "AVALON_SENTRY" not in self.Session: + return + + try: + from raven import Client + from raven.handlers.logging import SentryHandler + from raven.conf import setup_logging + except ImportError: + # Note: There was a Sentry address in this Session + return self.log.warning("Sentry disabled, raven not installed") + + client = Client(self.Session["AVALON_SENTRY"]) + + # Transmit log messages to Sentry + handler = SentryHandler(client) + handler.setLevel(logging.WARNING) + + setup_logging(handler) + + self._sentry_client = client + self._sentry_logging_handler = handler + self.log.info( + "Connected to Sentry @ %s" % self.Session["AVALON_SENTRY"] + ) + + def _from_environment(self): + Session = { + item[0]: os.getenv(item[0], item[1]) + for item in ( + # Root directory of projects on disk + ("AVALON_PROJECTS", None), + + # Name of current Project + ("AVALON_PROJECT", ""), + + # Name of current Asset + ("AVALON_ASSET", ""), + + # Name of current silo + ("AVALON_SILO", ""), + + # Name of current task + ("AVALON_TASK", None), + + # Name of current app + ("AVALON_APP", None), + + # Path to working directory + ("AVALON_WORKDIR", None), + + # Name of current Config + # TODO(marcus): Establish a suitable default config + ("AVALON_CONFIG", "no_config"), + + # Name of Avalon in graphical user interfaces + # Use this to customise the visual appearance of Avalon + # to better integrate with your surrounding pipeline + ("AVALON_LABEL", "Avalon"), + + # Used during any connections to the outside world + ("AVALON_TIMEOUT", "1000"), + + # Address to Asset Database + ("AVALON_MONGO", "mongodb://localhost:27017"), + + # Name of database used in MongoDB + ("AVALON_DB", "avalon"), + + # Address to Sentry + ("AVALON_SENTRY", None), + + # Address to Deadline Web Service + # E.g. http://192.167.0.1:8082 + ("AVALON_DEADLINE", None), + + # Enable features not necessarily stable. The user's own risk + ("AVALON_EARLY_ADOPTER", None), + + # Address of central asset repository, contains + # the following interface: + # /upload + # /download + # /manager (optional) + ("AVALON_LOCATION", "http://127.0.0.1"), + + # Boolean of whether to upload published material + # to central asset repository + ("AVALON_UPLOAD", None), + + # Generic username and password + ("AVALON_USERNAME", "avalon"), + ("AVALON_PASSWORD", "secret"), + + # Unique identifier for instances in working files + ("AVALON_INSTANCE_ID", "avalon.instance"), + ("AVALON_CONTAINER_ID", "avalon.container"), + + # Enable debugging + ("AVALON_DEBUG", None), + + ) if os.getenv(item[0], item[1]) is not None + } + + Session["schema"] = "avalon-core:session-2.0" + try: + schema.validate(Session) + except schema.ValidationError as e: + # TODO(marcus): Make this mandatory + self.log.warning(e) + + return Session + + def uninstall(self): + """Close any connection to the database""" + try: + self._mongo_client.close() + except AttributeError: + pass + + self._mongo_client = None + self._database = None + self._is_installed = False + + def active_project(self): + """Return the name of the active project""" + return self.Session["AVALON_PROJECT"] + + def activate_project(self, project_name): + self.Session["AVALON_PROJECT"] = project_name + + def projects(self): + """List available projects + + Returns: + list of project documents + + """ + + collection_names = self.collections() + for project in collection_names: + if project in ("system.indexes",): + continue + + # Each collection will have exactly one project document + document = self.find_project(project) + + if document is not None: + yield document + + def locate(self, path): + """Traverse a hierarchy from top-to-bottom + + Example: + representation = locate(["hulk", "Bruce", "modelDefault", 1, "ma"]) + + Returns: + representation (ObjectId) + + """ + + components = zip( + ("project", "asset", "subset", "version", "representation"), + path + ) + + parent = None + for type_, name in components: + latest = (type_ == "version") and name in (None, -1) + + try: + if latest: + parent = self.find_one( + filter={ + "type": type_, + "parent": parent + }, + projection={"_id": 1}, + sort=[("name", -1)] + )["_id"] + else: + parent = self.find_one( + filter={ + "type": type_, + "name": name, + "parent": parent + }, + projection={"_id": 1}, + )["_id"] + + except TypeError: + return None + + return parent + + @auto_reconnect + def collections(self): + return self._database.collection_names() + + @auto_reconnect + def find_project(self, project): + return self._database[project].find_one({"type": "project"}) + + @auto_reconnect + def insert_one(self, item): + assert isinstance(item, dict), "item must be of type " + schema.validate(item) + return self._database[self.Session["AVALON_PROJECT"]].insert_one(item) + + @auto_reconnect + def insert_many(self, items, ordered=True): + # check if all items are valid + assert isinstance(items, list), "`items` must be of type " + for item in items: + assert isinstance(item, dict), "`item` must be of type " + schema.validate(item) + + return self._database[self.Session["AVALON_PROJECT"]].insert_many( + items, + ordered=ordered) + + @auto_reconnect + def find(self, filter, projection=None, sort=None): + return self._database[self.Session["AVALON_PROJECT"]].find( + filter=filter, + projection=projection, + sort=sort + ) + + @auto_reconnect + def find_one(self, filter, projection=None, sort=None): + assert isinstance(filter, dict), "filter must be " + + return self._database[self.Session["AVALON_PROJECT"]].find_one( + filter=filter, + projection=projection, + sort=sort + ) + + @auto_reconnect + def save(self, *args, **kwargs): + return self._database[self.Session["AVALON_PROJECT"]].save( + *args, **kwargs) + + @auto_reconnect + def replace_one(self, filter, replacement): + return self._database[self.Session["AVALON_PROJECT"]].replace_one( + filter, replacement) + + @auto_reconnect + def update_many(self, filter, update): + return self._database[self.Session["AVALON_PROJECT"]].update_many( + filter, update) + + @auto_reconnect + def distinct(self, *args, **kwargs): + return self._database[self.Session["AVALON_PROJECT"]].distinct( + *args, **kwargs) + + @auto_reconnect + def drop(self, *args, **kwargs): + return self._database[self.Session["AVALON_PROJECT"]].drop( + *args, **kwargs) + + @auto_reconnect + def delete_many(self, *args, **kwargs): + return self._database[self.Session["AVALON_PROJECT"]].delete_many( + *args, **kwargs) + + def parenthood(self, document): + assert document is not None, "This is a bug" + + parents = list() + + while document.get("parent") is not None: + document = self.find_one({"_id": document["parent"]}) + + if document is None: + break + + parents.append(document) + + return parents + + @contextlib.contextmanager + def tempdir(self): + tempdir = tempfile.mkdtemp() + try: + yield tempdir + finally: + shutil.rmtree(tempdir) + + def download(self, src, dst): + """Download `src` to `dst` + + Arguments: + src (str): URL to source file + dst (str): Absolute path to destination file + + Yields tuple (progress, error): + progress (int): Between 0-100 + error (Exception): Any exception raised when first making connection + + """ + + try: + response = requests.get( + src, + stream=True, + auth=requests.auth.HTTPBasicAuth( + self.Session["AVALON_USERNAME"], + self.Session["AVALON_PASSWORD"] + ) + ) + except requests.ConnectionError as e: + yield None, e + return + + with self.tempdir() as dirname: + tmp = os.path.join(dirname, os.path.basename(src)) + + with open(tmp, "wb") as f: + total_length = response.headers.get("content-length") + + if total_length is None: # no content length header + f.write(response.content) + else: + downloaded = 0 + total_length = int(total_length) + for data in response.iter_content(chunk_size=4096): + downloaded += len(data) + f.write(data) + + yield int(100.0 * downloaded / total_length), None + + try: + os.makedirs(os.path.dirname(dst)) + except OSError as e: + # An already existing destination directory is fine. + if e.errno != errno.EEXIST: + raise + + shutil.copy(tmp, dst) diff --git a/pype/services/adobe_communicator/lib/lib.py b/pype/services/adobe_communicator/lib/lib.py new file mode 100644 index 0000000000..60635a4e52 --- /dev/null +++ b/pype/services/adobe_communicator/lib/lib.py @@ -0,0 +1,102 @@ +import os +import re +import json +import bson +import bson.json_util +from pype.services.rest_api import RestApi, abort, CallbackResult +from .io_nonsingleton import DbConnector + + +class AdobeRestApi(RestApi): + dbcon = DbConnector() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dbcon.install() + + @RestApi.route("/projects/", url_prefix="/adobe", methods="GET") + def get_project(self, request): + project_name = request.url_data["project_name"] + if not project_name: + output = {} + for project_name in self.dbcon.tables(): + project = self.dbcon[project_name].find_one({ + "type": "project" + }) + output[project_name] = project + + return CallbackResult(data=self.result_to_json(output)) + + project = self.dbcon[project_name].find_one({"type": "project"}) + + if project: + return CallbackResult(data=self.result_to_json(project)) + + abort(404, "Project \"{}\" was not found in database".format( + project_name + )) + + @RestApi.route("/projects//assets/", url_prefix="/adobe", methods="GET") + def get_assets(self, request): + _project_name = request.url_data["project_name"] + _asset = request.url_data["asset"] + + if not self.dbcon.exist_table(_project_name): + abort(404, "Project \"{}\" was not found in database".format( + _project_name + )) + + if not _asset: + assets = self.dbcon[_project_name].find({"type": "asset"}) + output = self.result_to_json(assets) + return CallbackResult(data=output) + + # identificator can be specified with url query (default is `name`) + identificator = request.query.get("identificator", "name") + + asset = self.dbcon[_project_name].find_one({ + "type": "asset", + identificator: _asset + }) + if asset: + id = asset["_id"] + asset["_id"] = str(id) + return asset + + abort(404, "Asset \"{}\" with {} was not found in project {}".format( + _asset, identificator, _project_name + )) + + @RestApi.route("/publish/", url_prefix="/adobe", methods="GET") + def publish(self, request): + """ + http://localhost:8021/premiere/publish/shot021?json_in=this/path/file_in.json&json_out=this/path/file_out.json + """ + asset_name = request.url_data["asset_name"] + query = request.query + data = request.request_data + + output = { + "message": "Got your data. Thanks.", + "your_data": data, + "your_query": query, + "your_asset_is": asset_name + } + return CallbackResult(data=self.result_to_json(output)) + + def result_to_json(self, result): + """ Converts result of MongoDB query to dict without $oid (ObjectId) + keys with help of regex matching. + + ..note: + This will convert object type entries similar to ObjectId. + """ + bson_json = bson.json_util.dumps(result) + # Replace "{$oid: "{entity id}"}" with "{entity id}" + regex1 = '(?P{\"\$oid\": \"[^\"]+\"})' + regex2 = '{\"\$oid\": (?P\"[^\"]+\")}' + for value in re.findall(regex1, bson_json): + for substr in re.findall(regex2, value): + bson_json = bson_json.replace(value, substr) + + return json.loads(bson_json) diff --git a/pype/services/premiere_communicator/__init__.py b/pype/services/premiere_communicator/__init__.py deleted file mode 100644 index f5ff1868f1..0000000000 --- a/pype/services/premiere_communicator/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .premiere_comunicator import PremiereCommunicator - - -def tray_init(tray_widget, main_widget): - return PremiereCommunicator() diff --git a/pype/services/premiere_communicator/premiere_comunicator.py b/pype/services/premiere_communicator/premiere_comunicator.py deleted file mode 100644 index a5036c6976..0000000000 --- a/pype/services/premiere_communicator/premiere_comunicator.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - - -class PremiereCommunicator: - def tray_start(self): - return - - def process_modules(self, modules): - rest_api_module = modules.get("RestApiServer") - if rest_api_module: - self.rest_api_registration(rest_api_module) - - def rest_api_registration(self, module): - static_site_dir_path = os.path.join( - os.environ["PYPE_MODULE_ROOT"], - "pype", - "premiere", - "ppro" - ).replace("\\", "/") - module.register_statics("/ppro", static_site_dir_path)