From 1e76ec6fd48d0e0b303140eb5d335921014a6f91 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:57:10 +0100 Subject: [PATCH 01/20] fix(global): letter box was not applying on 1929x1080 --- pype/plugins/global/publish/extract_review.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pype/plugins/global/publish/extract_review.py b/pype/plugins/global/publish/extract_review.py index bf4682b26e..11b7b6ee8a 100644 --- a/pype/plugins/global/publish/extract_review.py +++ b/pype/plugins/global/publish/extract_review.py @@ -156,7 +156,7 @@ class ExtractReview(pyblish.api.InstancePlugin): lb = profile.get('letter_box', None) if lb: output_args.append( - "-filter:v drawbox=0:0:iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black,drawbox=0:ih-round((ih-(iw*(1/{0})))/2):iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black".format(lb)) + "-filter:v scale=1920x1080:flags=lanczos,setsar=1,drawbox=0:0:iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black,drawbox=0:ih-round((ih-(iw*(1/{0})))/2):iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black".format(lb)) # In case audio is longer than video. output_args.append("-shortest") From 2dd940078ebd23631fcf1eb662754a394f230c7d Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:57:34 +0100 Subject: [PATCH 02/20] fix(global): missing coma --- pype/plugins/global/publish/integrate_new.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pype/plugins/global/publish/integrate_new.py b/pype/plugins/global/publish/integrate_new.py index 9021a3f997..faade613f2 100644 --- a/pype/plugins/global/publish/integrate_new.py +++ b/pype/plugins/global/publish/integrate_new.py @@ -71,7 +71,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): "yetiRig", "yeticache", "nukenodes", - "gizmo" + "gizmo", "source", "matchmove", "image" @@ -414,7 +414,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): } if sequence_repre and repre.get("frameStart"): - representation['context']['frame'] = src_padding_exp % repre.get("frameStart") + representation['context']['frame'] = src_padding_exp % int(repre.get("frameStart")) self.log.debug("__ representation: {}".format(representation)) destination_list.append(dst) From 399d9e8b7531b3202dab5c884af7118d57ce94b3 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:58:14 +0100 Subject: [PATCH 03/20] fix(nuke): instance didnt detect other then `write` family --- pype/plugins/nuke/publish/collect_instances.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pype/plugins/nuke/publish/collect_instances.py b/pype/plugins/nuke/publish/collect_instances.py index fbff28b282..74cb0a8226 100644 --- a/pype/plugins/nuke/publish/collect_instances.py +++ b/pype/plugins/nuke/publish/collect_instances.py @@ -30,7 +30,7 @@ class CollectNukeInstances(pyblish.api.ContextPlugin): continue except Exception as E: self.log.warning(E) - continue + # get data from avalon knob self.log.debug("node[name]: {}".format(node['name'].value())) @@ -84,10 +84,16 @@ class CollectNukeInstances(pyblish.api.ContextPlugin): node.end() family = avalon_knob_data["family"] - families = [avalon_knob_data["families"]] + families = avalon_knob_data.get("families") + if families: + families = [families] + else: + families = [family] if node.Class() not in "Read": - if node["render"].value(): + if "render" not in node.knobs().keys(): + families.insert(0, family) + elif node["render"].value(): self.log.info("flagged for render") add_family = "render.local" # dealing with local/farm rendering From 5b89eca421f543e04bd9fcb518327fbe7251fe39 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:58:38 +0100 Subject: [PATCH 04/20] fix(nuke): families mishmash --- pype/plugins/nuke/publish/collect_writes.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pype/plugins/nuke/publish/collect_writes.py b/pype/plugins/nuke/publish/collect_writes.py index ba8a0534b1..cd3dd67bef 100644 --- a/pype/plugins/nuke/publish/collect_writes.py +++ b/pype/plugins/nuke/publish/collect_writes.py @@ -11,7 +11,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin): order = pyblish.api.CollectorOrder + 0.1 label = "Collect Writes" hosts = ["nuke", "nukeassist"] - families = ["render", "render.local", "render.farm"] + families = ["write"] def process(self, instance): @@ -95,7 +95,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin): "frameEnd": last_frame - handle_end, "version": int(version), "colorspace": node["colorspace"].value(), - "families": [instance.data["family"]] + instance.data["families"], + "families": [instance.data["family"]], "subset": instance.data["subset"], "fps": instance.context.data["fps"] } @@ -109,6 +109,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin): if "deadlinePriority" in group_node.knobs(): deadlinePriority = group_node["deadlinePriority"].value() + families = [f for f in instance.data["families"] if "write" not in f] instance.data.update({ "versionData": version_data, "path": path, @@ -119,10 +120,13 @@ class CollectNukeWrites(pyblish.api.InstancePlugin): "frameStart": first_frame, "frameEnd": last_frame, "outputType": output_type, + "family": "write", + "families": families, "colorspace": node["colorspace"].value(), "deadlineChunkSize": deadlineChunkSize, "deadlinePriority": deadlinePriority, "subsetGroup": "renders" }) + self.log.debug("instance.data: {}".format(instance.data)) From ac1fab9bdd42242dc77374fc82ca61b20088b01a Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:59:00 +0100 Subject: [PATCH 05/20] fix(nuke): path with hashes error --- pype/plugins/nuke/publish/extract_review_data.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pype/plugins/nuke/publish/extract_review_data.py b/pype/plugins/nuke/publish/extract_review_data.py index 791b9d7969..f63ca4d426 100644 --- a/pype/plugins/nuke/publish/extract_review_data.py +++ b/pype/plugins/nuke/publish/extract_review_data.py @@ -69,6 +69,9 @@ class ExtractReviewData(pype.api.Extractor): first_frame = instance.data.get("frameStart", None) last_frame = instance.data.get("frameEnd", None) + if "#" in fhead: + fhead = fhead.replace("#", "")[:-1] + rnode = nuke.createNode("Read") rnode["file"].setValue( From a60e02e7c0dea9372acd7d0032af051f41587681 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Mon, 9 Dec 2019 14:59:22 +0100 Subject: [PATCH 06/20] fix(nuke): family mishmash --- pype/plugins/nuke/publish/validate_rendered_frames.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pype/plugins/nuke/publish/validate_rendered_frames.py b/pype/plugins/nuke/publish/validate_rendered_frames.py index e244a9b4b6..3887b5d5b7 100644 --- a/pype/plugins/nuke/publish/validate_rendered_frames.py +++ b/pype/plugins/nuke/publish/validate_rendered_frames.py @@ -28,7 +28,7 @@ class ValidateRenderedFrames(pyblish.api.InstancePlugin): """ Validates file output. """ order = pyblish.api.ValidatorOrder + 0.1 - families = ["render.no"] + families = ["render"] label = "Validate rendered frame" hosts = ["nuke", "nukestudio"] From 482addd6439265ed71e7c3efb4e8d46470af77e3 Mon Sep 17 00:00:00 2001 From: Ondrej Samohel Date: Mon, 9 Dec 2019 16:52:15 +0100 Subject: [PATCH 07/20] (maya) fixed correct colorspace for linearized textures during look extraction --- pype/plugins/maya/publish/extract_look.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pype/plugins/maya/publish/extract_look.py b/pype/plugins/maya/publish/extract_look.py index 5f3c1b33f3..ad43e02d21 100644 --- a/pype/plugins/maya/publish/extract_look.py +++ b/pype/plugins/maya/publish/extract_look.py @@ -231,11 +231,12 @@ class ExtractLook(pype.api.Extractor): # ensure after context it's still the original value. color_space_attr = resource["node"] + ".colorSpace" color_space = cmds.getAttr(color_space_attr) - + if files_metadata[source]["color_space"] == "raw": + # set colorpsace to raw if we linearized it + color_space = "Raw" # Remap file node filename to destination attr = resource["attribute"] remap[attr] = destinations[source] - remap[color_space_attr] = color_space self.log.info("Finished remapping destinations ...") @@ -310,6 +311,12 @@ class ExtractLook(pype.api.Extractor): # Source hash for the textures instance.data["sourceHashes"] = hashes + """ + self.log.info("Returning colorspaces to their original values ...") + for attr, value in remap.items(): + self.log.info(" - {}: {}".format(attr, value)) + cmds.setAttr(attr, value, type="string") + """ self.log.info("Extracted instance '%s' to: %s" % (instance.name, maya_path)) From 3469becf6e298744a4b035022699cc81a7a34da8 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Mon, 9 Dec 2019 17:46:32 +0100 Subject: [PATCH 08/20] remove `delete_asset_by_name` action which may cause issues on duplicated names or not syncrhonized entities --- .../actions/action_delete_asset_byname.py | 175 ------------------ 1 file changed, 175 deletions(-) delete mode 100644 pype/ftrack/actions/action_delete_asset_byname.py diff --git a/pype/ftrack/actions/action_delete_asset_byname.py b/pype/ftrack/actions/action_delete_asset_byname.py deleted file mode 100644 index c05c135991..0000000000 --- a/pype/ftrack/actions/action_delete_asset_byname.py +++ /dev/null @@ -1,175 +0,0 @@ -import os -import sys -import logging -import argparse -import ftrack_api -from pype.ftrack import BaseAction -from pype.ftrack.lib.io_nonsingleton import DbConnector - - -class AssetsRemover(BaseAction): - '''Edit meta data action.''' - - #: Action identifier. - identifier = 'remove.assets' - #: Action label. - label = "Pype Admin" - variant = '- Delete Assets by Name' - #: Action description. - description = 'Removes assets from Ftrack and Avalon db with all childs' - #: roles that are allowed to register this action - role_list = ['Pypeclub', 'Administrator'] - icon = '{}/ftrack/action_icons/PypeAdmin.svg'.format( - os.environ.get('PYPE_STATICS_SERVER', '') - ) - #: Db - db = DbConnector() - - def discover(self, session, entities, event): - ''' Validation ''' - if len(entities) != 1: - return False - - valid = ["show", "task"] - entityType = event["data"]["selection"][0].get("entityType", "") - if entityType.lower() not in valid: - return False - - return True - - def interface(self, session, entities, event): - if not event['data'].get('values', {}): - title = 'Enter Asset names to delete' - - items = [] - for i in range(15): - - item = { - 'label': 'Asset {}'.format(i+1), - 'name': 'asset_{}'.format(i+1), - 'type': 'text', - 'value': '' - } - items.append(item) - - return { - 'items': items, - 'title': title - } - - def launch(self, session, entities, event): - entity = entities[0] - if entity.entity_type.lower() != 'Project': - project = entity['project'] - else: - project = entity - - if 'values' not in event['data']: - return - - values = event['data']['values'] - if len(values) <= 0: - return { - 'success': True, - 'message': 'No Assets to delete!' - } - - asset_names = [] - - for k, v in values.items(): - if v.replace(' ', '') != '': - asset_names.append(v) - - self.db.install() - self.db.Session['AVALON_PROJECT'] = project["full_name"] - - assets = self.find_assets(asset_names) - - all_ids = [] - for asset in assets: - all_ids.append(asset['_id']) - all_ids.extend(self.find_child(asset)) - - if len(all_ids) == 0: - self.db.uninstall() - return { - 'success': True, - 'message': 'None of assets' - } - - delete_query = {'_id': {'$in': all_ids}} - self.db.delete_many(delete_query) - - self.db.uninstall() - return { - 'success': True, - 'message': 'All assets were deleted!' - } - - def find_child(self, entity): - output = [] - id = entity['_id'] - visuals = [x for x in self.db.find({'data.visualParent': id})] - assert len(visuals) == 0, 'This asset has another asset as child' - childs = self.db.find({'parent': id}) - for child in childs: - output.append(child['_id']) - output.extend(self.find_child(child)) - return output - - def find_assets(self, asset_names): - assets = [] - for name in asset_names: - entity = self.db.find_one({ - 'type': 'asset', - 'name': name - }) - if entity is not None and entity not in assets: - assets.append(entity) - return assets - - -def register(session, plugins_presets={}): - '''Register plugin. Called when used as an plugin.''' - - AssetsRemover(session, plugins_presets).register() - - -def main(arguments=None): - '''Set up logging and register action.''' - if arguments is None: - arguments = [] - - parser = argparse.ArgumentParser() - # Allow setting of logging level from arguments. - loggingLevels = {} - for level in ( - logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING, - logging.ERROR, logging.CRITICAL - ): - loggingLevels[logging.getLevelName(level).lower()] = level - - parser.add_argument( - '-v', '--verbosity', - help='Set the logging output verbosity.', - choices=loggingLevels.keys(), - default='info' - ) - namespace = parser.parse_args(arguments) - - # Set up basic logging - logging.basicConfig(level=loggingLevels[namespace.verbosity]) - - session = ftrack_api.Session() - - register(session) - - # Wait for events - logging.info( - 'Registered actions and listening for events. Use Ctrl-C to abort.' - ) - session.event_hub.wait() - - -if __name__ == '__main__': - raise SystemExit(main(sys.argv[1:])) From 5df3afd0bc7ec0b1b25d440e552bd8a95afdd64b Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Mon, 9 Dec 2019 17:48:57 +0100 Subject: [PATCH 09/20] delete asset is totally rewritten --- pype/ftrack/actions/action_delete_asset.py | 794 ++++++++++++++------- 1 file changed, 523 insertions(+), 271 deletions(-) diff --git a/pype/ftrack/actions/action_delete_asset.py b/pype/ftrack/actions/action_delete_asset.py index df760f7c21..7eb9126fca 100644 --- a/pype/ftrack/actions/action_delete_asset.py +++ b/pype/ftrack/actions/action_delete_asset.py @@ -1,354 +1,606 @@ import os -import sys -import logging +import collections +import uuid +from datetime import datetime +from queue import Queue + from bson.objectid import ObjectId -import argparse -import ftrack_api from pype.ftrack import BaseAction from pype.ftrack.lib.io_nonsingleton import DbConnector -class DeleteAsset(BaseAction): +class DeleteAssetSubset(BaseAction): '''Edit meta data action.''' #: Action identifier. - identifier = 'delete.asset' + identifier = "delete.asset.subset" #: Action label. - label = 'Delete Asset/Subsets' + label = "Delete Asset/Subsets" #: Action description. - description = 'Removes from Avalon with all childs and asset from Ftrack' - icon = '{}/ftrack/action_icons/DeleteAsset.svg'.format( - os.environ.get('PYPE_STATICS_SERVER', '') + description = "Removes from Avalon with all childs and asset from Ftrack" + icon = "{}/ftrack/action_icons/DeleteAsset.svg".format( + os.environ.get("PYPE_STATICS_SERVER", "") ) #: roles that are allowed to register this action - role_list = ['Pypeclub', 'Administrator'] - #: Db - db = DbConnector() + role_list = ["Pypeclub", "Administrator", "Project Manager"] + #: Db connection + dbcon = DbConnector() - value = None + splitter = {"type": "label", "value": "---"} + action_data_by_id = {} + asset_prefix = "asset:" + subset_prefix = "subset:" def discover(self, session, entities, event): - ''' Validation ''' - if len(entities) != 1: - return False + """ Validation """ + task_ids = [] + for ent_info in event["data"]["selection"]: + entType = ent_info.get("entityType", "") + if entType == "task": + task_ids.append(ent_info["entityId"]) - valid = ["task"] - entityType = event["data"]["selection"][0].get("entityType", "") - if entityType.lower() not in valid: - return False - - return True + for entity in entities: + ftrack_id = entity["id"] + if ftrack_id not in task_ids: + continue + if entity.entity_type.lower() != "task": + return True + return False def _launch(self, event): - self.reset_session() try: - self.db.install() args = self._translate_event( self.session, event ) + if "values" not in event["data"]: + self.dbcon.install() + return self._interface(self.session, *args) - interface = self._interface( - self.session, *args - ) - - confirmation = self.confirm_delete( - True, *args - ) - - if interface: - return interface - + confirmation = self.confirm_delete(*args) if confirmation: return confirmation + self.dbcon.install() response = self.launch( self.session, *args ) finally: - self.db.uninstall() + self.dbcon.uninstall() return self._handle_result( self.session, response, *args ) def interface(self, session, entities, event): - if not event['data'].get('values', {}): - self.attempt = 1 - items = [] - entity = entities[0] - title = 'Choose items to delete from "{}"'.format(entity['name']) - project = entity['project'] + self.show_message(event, "Preparing data...", True) + items = [] + title = "Choose items to delete" - self.db.Session['AVALON_PROJECT'] = project["full_name"] + # Filter selection and get ftrack ids + selection = event["data"].get("selection") or [] + ftrack_ids = [] + project_in_selection = False + for entity in selection: + entity_type = (entity.get("entityType") or "").lower() + if entity_type != "task": + if entity_type == "show": + project_in_selection = True + continue - av_entity = self.db.find_one({ - 'type': 'asset', - 'name': entity['name'] + ftrack_id = entity.get("entityId") + if not ftrack_id: + continue + + ftrack_ids.append(ftrack_id) + + if project_in_selection: + msg = "It is not possible to use this action on project entity." + self.show_message(event, msg, True) + + # Filter event even more (skip task entities) + # - task entities are not relevant for avalon + for entity in entities: + ftrack_id = entity["id"] + if ftrack_id not in ftrack_ids: + continue + + if entity.entity_type.lower() == "task": + ftrack_ids.remove(ftrack_id) + + if not ftrack_ids: + # It is bug if this happens! + return { + "success": False, + "message": "Invalid selection for this action (Bug)" + } + + if entities[0].entity_type.lower() == "project": + project = entities[0] + else: + project = entities[0]["project"] + + project_name = project["full_name"] + self.dbcon.Session["AVALON_PROJECT"] = project_name + + selected_av_entities = self.dbcon.find({ + "type": "asset", + "data.ftrackId": {"$in": ftrack_ids} + }) + selected_av_entities = [ent for ent in selected_av_entities] + if not selected_av_entities: + return { + "success": False, + "message": "Didn't found entities in avalon" + } + + # Remove cached action older than 2 minutes + old_action_ids = [] + for id, data in self.action_data_by_id.items(): + created_at = data.get("created_at") + if not created_at: + old_action_ids.append(id) + continue + cur_time = datetime.now() + existing_in_sec = (created_at - cur_time).total_seconds() + if existing_in_sec > 60 * 2: + old_action_ids.append(id) + + for id in old_action_ids: + self.action_data_by_id.pop(id, None) + + # Store data for action id + action_id = str(uuid.uuid1()) + self.action_data_by_id[action_id] = { + "attempt": 1, + "created_at": datetime.now(), + "project_name": project_name, + "subset_ids_by_name": {}, + "subset_ids_by_parent": {} + } + + id_item = { + "type": "hidden", + "name": "action_id", + "value": action_id + } + + items.append(id_item) + asset_ids = [ent["_id"] for ent in selected_av_entities] + subsets_for_selection = self.dbcon.find({ + "type": "subset", + "parent": {"$in": asset_ids} + }) + + asset_ending = "" + if len(selected_av_entities) > 1: + asset_ending = "s" + + asset_title = { + "type": "label", + "value": "# Delete asset{}:".format(asset_ending) + } + asset_note = { + "type": "label", + "value": ( + "

NOTE: Action will delete checked entities" + " in Ftrack and Avalon with all children entities and" + " published content.

" + ) + } + + items.append(asset_title) + items.append(asset_note) + + asset_items = collections.defaultdict(list) + for asset in selected_av_entities: + ent_path_items = [project_name] + ent_path_items.extend(asset.get("data", {}).get("parents") or []) + ent_path_to_parent = "/".join(ent_path_items) + "/" + asset_items[ent_path_to_parent].append(asset) + + for asset_parent_path, assets in sorted(asset_items.items()): + items.append({ + "type": "label", + "value": "## - {}".format(asset_parent_path) }) - - if av_entity is None: - return { - 'success': False, - 'message': 'Didn\'t found assets in avalon' - } - - asset_label = { - 'type': 'label', - 'value': '## Delete whole asset: ##' - } - asset_item = { - 'label': av_entity['name'], - 'name': 'whole_asset', - 'type': 'boolean', - 'value': False - } - splitter = { - 'type': 'label', - 'value': '{}'.format(200*"-") - } - subset_label = { - 'type': 'label', - 'value': '## Subsets: ##' - } - if av_entity is not None: - items.append(asset_label) - items.append(asset_item) - items.append(splitter) - - all_subsets = self.db.find({ - 'type': 'subset', - 'parent': av_entity['_id'] + for asset in assets: + items.append({ + "label": asset["name"], + "name": "{}{}".format( + self.asset_prefix, str(asset["_id"]) + ), + "type": 'boolean', + "value": False }) - subset_items = [] - for subset in all_subsets: - item = { - 'label': subset['name'], - 'name': str(subset['_id']), - 'type': 'boolean', - 'value': False - } - subset_items.append(item) - if len(subset_items) > 0: - items.append(subset_label) - items.extend(subset_items) - else: - return { - 'success': False, - 'message': 'Didn\'t found assets in avalon' - } + subset_ids_by_name = collections.defaultdict(list) + subset_ids_by_parent = collections.defaultdict(list) + for subset in subsets_for_selection: + subset_id = subset["_id"] + name = subset["name"] + parent_id = subset["parent"] + subset_ids_by_name[name].append(subset_id) + subset_ids_by_parent[parent_id].append(subset_id) + if not subset_ids_by_name: return { - 'items': items, - 'title': title + "items": items, + "title": title } - def confirm_delete(self, first_attempt, entities, event): - if first_attempt is True: - if 'values' not in event['data']: - return + subset_ending = "" + if len(subset_ids_by_name.keys()) > 1: + subset_ending = "s" - values = event['data']['values'] + subset_title = { + "type": "label", + "value": "# Subset{} to delete:".format(subset_ending) + } + subset_note = { + "type": "label", + "value": ( + "

WARNING: Subset{} will be removed" + " for all selected entities.

" + ).format(subset_ending) + } - if len(values) <= 0: - return - if 'whole_asset' not in values: - return - else: - values = self.values + items.append(self.splitter) + items.append(subset_title) + items.append(subset_note) - title = 'Confirmation of deleting {}' - if values['whole_asset'] is True: - title = title.format( - 'whole asset {}'.format( - entities[0]['name'] - ) - ) - else: - subsets = [] - for key, value in values.items(): - if value is True: - subsets.append(key) - len_subsets = len(subsets) - if len_subsets == 0: + for name in subset_ids_by_name: + items.append({ + "label": "{}".format(name), + "name": "{}{}".format(self.subset_prefix, name), + "type": "boolean", + "value": False + }) + + self.action_data_by_id[action_id]["subset_ids_by_parent"] = ( + subset_ids_by_parent + ) + self.action_data_by_id[action_id]["subset_ids_by_name"] = ( + subset_ids_by_name + ) + + return { + "items": items, + "title": title + } + + def confirm_delete(self, entities, event): + values = event["data"]["values"] + action_id = values.get("action_id") + spec_data = self.action_data_by_id.get(action_id) + if not spec_data: + # it is a bug if this happens! + return { + "success": False, + "message": "Something bad has happened. Please try again." + } + + # Process Delete confirmation + delete_key = values.get("delete_key") + if delete_key: + delete_key = delete_key.lower().strip() + # Go to launch part if user entered `delete` + if delete_key == "delete": + return + # Skip whole process if user didn't enter any text + elif delete_key == "": + self.action_data_by_id.pop(action_id, None) return { - 'success': True, - 'message': 'Nothing was selected to delete' + "success": True, + "message": "Deleting cancelled (delete entry was empty)" } - elif len_subsets == 1: - title = title.format( - '{} subset'.format(len_subsets) - ) - else: - title = title.format( - '{} subsets'.format(len_subsets) - ) + # Get data to show again + to_delete = spec_data["to_delete"] + + else: + to_delete = collections.defaultdict(list) + for key, value in values.items(): + if not value: + continue + if key.startswith(self.asset_prefix): + _key = key.replace(self.asset_prefix, "") + to_delete["assets"].append(_key) + + elif key.startswith(self.subset_prefix): + _key = key.replace(self.subset_prefix, "") + to_delete["subsets"].append(_key) + + self.action_data_by_id[action_id]["to_delete"] = to_delete + + asset_to_delete = len(to_delete.get("assets") or []) > 0 + subset_to_delete = len(to_delete.get("subsets") or []) > 0 + + if not asset_to_delete and not subset_to_delete: + self.action_data_by_id.pop(action_id, None) + return { + "success": True, + "message": "Nothing was selected to delete" + } + + attempt = spec_data["attempt"] + if attempt > 3: + self.action_data_by_id.pop(action_id, None) + return { + "success": False, + "message": "You didn't enter \"DELETE\" properly 3 times!" + } + + self.action_data_by_id[action_id]["attempt"] += 1 + + title = "Confirmation of deleting" + + if asset_to_delete: + asset_len = len(to_delete["assets"]) + asset_ending = "" + if asset_len > 1: + asset_ending = "s" + title += " {} Asset{}".format(asset_len, asset_ending) + if subset_to_delete: + title += " and" + + if subset_to_delete: + sub_len = len(to_delete["subsets"]) + type_ending = "" + sub_ending = "" + if sub_len == 1: + subset_ids_by_name = spec_data["subset_ids_by_name"] + if len(subset_ids_by_name[to_delete["subsets"][0]]) > 1: + sub_ending = "s" + + elif sub_len > 1: + type_ending = "s" + sub_ending = "s" + + title += " {} type{} of subset{}".format( + sub_len, type_ending, sub_ending + ) - self.values = values items = [] + id_item = {"type": "hidden", "name": "action_id", "value": action_id} delete_label = { 'type': 'label', 'value': '# Please enter "DELETE" to confirm #' } - delete_item = { - 'name': 'delete_key', - 'type': 'text', - 'value': '', - 'empty_text': 'Type Delete here...' + "name": "delete_key", + "type": "text", + "value": "", + "empty_text": "Type Delete here..." } + + items.append(id_item) items.append(delete_label) items.append(delete_item) return { - 'items': items, - 'title': title + "items": items, + "title": title } def launch(self, session, entities, event): - if 'values' not in event['data']: - return - - values = event['data']['values'] - if len(values) <= 0: - return - if 'delete_key' not in values: - return - - if values['delete_key'].lower() != 'delete': - if values['delete_key'].lower() == '': - return { - 'success': False, - 'message': 'Deleting cancelled' - } - if self.attempt < 3: - self.attempt += 1 - return_dict = self.confirm_delete(False, entities, event) - return_dict['title'] = '{} ({} attempt)'.format( - return_dict['title'], self.attempt - ) - return return_dict + self.show_message(event, "Processing...", True) + values = event["data"]["values"] + action_id = values.get("action_id") + spec_data = self.action_data_by_id.get(action_id) + if not spec_data: + # it is a bug if this happens! return { - 'success': False, - 'message': 'You didn\'t enter "DELETE" properly 3 times!' + "success": False, + "message": "Something bad has happened. Please try again." } - entity = entities[0] - project = entity['project'] + report_messages = collections.defaultdict(list) - self.db.Session['AVALON_PROJECT'] = project["full_name"] + project_name = spec_data["project_name"] + to_delete = spec_data["to_delete"] + self.dbcon.Session["AVALON_PROJECT"] = project_name - all_ids = [] - if self.values.get('whole_asset', False) is True: - av_entity = self.db.find_one({ - 'type': 'asset', - 'name': entity['name'] + assets_to_delete = to_delete.get("assets") or [] + subsets_to_delete = to_delete.get("subsets") or [] + + # Convert asset ids to ObjectId obj + assets_to_delete = [ObjectId(id) for id in assets_to_delete if id] + + subset_ids_by_parent = spec_data["subset_ids_by_parent"] + subset_ids_by_name = spec_data["subset_ids_by_name"] + + subset_ids_to_archive = [] + asset_ids_to_archive = [] + ftrack_ids_to_delete = [] + if len(assets_to_delete) > 0: + # Prepare data when deleting whole avalon asset + avalon_assets = self.dbcon.find({"type": "asset"}) + avalon_assets_by_parent = collections.defaultdict(list) + for asset in avalon_assets: + parent_id = asset["data"]["visualParent"] + avalon_assets_by_parent[parent_id].append(asset) + if asset["_id"] in assets_to_delete: + ftrack_id = asset["data"]["ftrackId"] + ftrack_ids_to_delete.append(ftrack_id) + + children_queue = Queue() + for mongo_id in assets_to_delete: + children_queue.put(mongo_id) + + while not children_queue.empty(): + mongo_id = children_queue.get() + if mongo_id in asset_ids_to_archive: + continue + + asset_ids_to_archive.append(mongo_id) + for subset_id in subset_ids_by_parent.get(mongo_id, []): + if subset_id not in subset_ids_to_archive: + subset_ids_to_archive.append(subset_id) + + children = avalon_assets_by_parent.get(mongo_id) + if not children: + continue + + for child in children: + child_id = child["_id"] + if child_id not in asset_ids_to_archive: + children_queue.put(child_id) + + # Prepare names of assets in ftrack and ids of subsets in mongo + asset_names_to_delete = [] + if len(subsets_to_delete) > 0: + for name in subsets_to_delete: + asset_names_to_delete.append(name) + for subset_id in subset_ids_by_name[name]: + if subset_id in subset_ids_to_archive: + continue + subset_ids_to_archive.append(subset_id) + + # Get ftrack ids of entities where will be delete only asset + not_deleted_entities_id = [] + ftrack_id_name_map = {} + if asset_names_to_delete: + for entity in entities: + ftrack_id = entity["id"] + ftrack_id_name_map[ftrack_id] = entity["name"] + if ftrack_id in ftrack_ids_to_delete: + continue + not_deleted_entities_id.append(ftrack_id) + + mongo_proc_txt = "MongoProcessing: " + ftrack_proc_txt = "Ftrack processing: " + if asset_ids_to_archive: + self.log.debug("{}Archivation of assets <{}>".format( + mongo_proc_txt, + ", ".join([str(id) for id in asset_ids_to_archive]) + )) + self.dbcon.update_many( + { + "_id": {"$in": asset_ids_to_archive}, + "type": "asset" + }, + {"$set": {"type": "archived_asset"}} + ) + + if subset_ids_to_archive: + self.log.debug("{}Archivation of subsets <{}>".format( + mongo_proc_txt, + ", ".join([str(id) for id in subset_ids_to_archive]) + )) + self.dbcon.update_many( + { + "_id": {"$in": subset_ids_to_archive}, + "type": "subset" + }, + {"$set": {"type": "archived_subset"}} + ) + + if ftrack_ids_to_delete: + self.log.debug("{}Deleting Ftrack Entities <{}>".format( + ftrack_proc_txt, ", ".join(ftrack_ids_to_delete) + )) + + joined_ids_to_delete = ", ".join( + ["\"{}\"".format(id) for id in ftrack_ids_to_delete] + ) + ftrack_ents_to_delete = self.session.query( + "select id, link from TypedContext where id in ({})".format( + joined_ids_to_delete + ) + ).all() + for entity in ftrack_ents_to_delete: + self.session.delete(entity) + try: + self.session.commit() + except Exception: + ent_path = "/".join( + [ent["name"] for ent in entity["link"]] + ) + msg = "Failed to delete entity" + report_messages[msg].append(ent_path) + self.session.rollback() + self.log.warning( + "{} <{}>".format(msg, ent_path), + exc_info=True + ) + + if not_deleted_entities_id: + joined_not_deleted = ", ".join([ + "\"{}\"".format(ftrack_id) + for ftrack_id in not_deleted_entities_id + ]) + joined_asset_names = ", ".join([ + "\"{}\"".format(name) + for name in asset_names_to_delete + ]) + # Find assets of selected entities with names of checked subsets + assets = self.session.query(( + "select id from Asset where" + " context_id in ({}) and name in ({})" + ).format(joined_not_deleted, joined_asset_names)).all() + + self.log.debug("{}Deleting Ftrack Assets <{}>".format( + ftrack_proc_txt, + ", ".join([asset["id"] for asset in assets]) + )) + for asset in assets: + self.session.delete(asset) + try: + self.session.commit() + except Exception: + self.session.rollback() + msg = "Failed to delete asset" + report_messages[msg].append(asset["id"]) + self.log.warning( + "{} <{}>".format(asset["id"]), + exc_info=True + ) + + return self.report_handle(report_messages, project_name, event) + + def report_handle(self, report_messages, project_name, event): + if not report_messages: + return { + "success": True, + "message": "Deletion was successful!" + } + + title = "Delete report ({}):".format(project_name) + items = [] + items.append({ + "type": "label", + "value": "# Deleting was not completely successful" + }) + items.append({ + "type": "label", + "value": "

Check logs for more information

" + }) + for msg, _items in report_messages.items(): + if not _items or not msg: + continue + + items.append({ + "type": "label", + "value": "# {}".format(msg) }) - if av_entity is not None: - all_ids.append(av_entity['_id']) - all_ids.extend(self.find_child(av_entity)) + if isinstance(_items, str): + _items = [_items] + items.append({ + "type": "label", + "value": '

{}

'.format("
".join(_items)) + }) + items.append(self.splitter) - session.delete(entity) - session.commit() - else: - subset_names = [] - for key, value in self.values.items(): - if key == 'delete_key' or value is False: - continue - - entity_id = ObjectId(key) - av_entity = self.db.find_one({'_id': entity_id}) - subset_names.append(av_entity['name']) - if av_entity is None: - continue - all_ids.append(entity_id) - all_ids.extend(self.find_child(av_entity)) - - for ft_asset in entity['assets']: - if ft_asset['name'] in subset_names: - session.delete(ft_asset) - session.commit() - - if len(all_ids) == 0: - return { - 'success': True, - 'message': 'No entities to delete in avalon' - } - - delete_query = {'_id': {'$in': all_ids}} - self.db.delete_many(delete_query) + self.show_interface(items, title, event) return { - 'success': True, - 'message': 'All assets were deleted!' + "success": False, + "message": "Deleting finished. Read report messages." } - def find_child(self, entity): - output = [] - id = entity['_id'] - visuals = [x for x in self.db.find({'data.visualParent': id})] - assert len(visuals) == 0, 'This asset has another asset as child' - childs = self.db.find({'parent': id}) - for child in childs: - output.append(child['_id']) - output.extend(self.find_child(child)) - return output - - def find_assets(self, asset_names): - assets = [] - for name in asset_names: - entity = self.db.find_one({ - 'type': 'asset', - 'name': name - }) - if entity is not None and entity not in assets: - assets.append(entity) - return assets - def register(session, plugins_presets={}): '''Register plugin. Called when used as an plugin.''' - DeleteAsset(session, plugins_presets).register() - - -def main(arguments=None): - '''Set up logging and register action.''' - if arguments is None: - arguments = [] - - parser = argparse.ArgumentParser() - # Allow setting of logging level from arguments. - loggingLevels = {} - for level in ( - logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING, - logging.ERROR, logging.CRITICAL - ): - loggingLevels[logging.getLevelName(level).lower()] = level - - parser.add_argument( - '-v', '--verbosity', - help='Set the logging output verbosity.', - choices=loggingLevels.keys(), - default='info' - ) - namespace = parser.parse_args(arguments) - - # Set up basic logging - logging.basicConfig(level=loggingLevels[namespace.verbosity]) - - session = ftrack_api.Session() - - register(session) - - # Wait for events - logging.info( - 'Registered actions and listening for events. Use Ctrl-C to abort.' - ) - session.event_hub.wait() - - -if __name__ == '__main__': - raise SystemExit(main(sys.argv[1:])) + DeleteAssetSubset(session, plugins_presets).register() From f9035a20d6ab16bfababc0e80b79e64d59cd41e0 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Tue, 10 Dec 2019 20:51:24 +0100 Subject: [PATCH 10/20] fix(nuke): selection of nodes for write render creator --- pype/plugins/nuke/create/create_write.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pype/plugins/nuke/create/create_write.py b/pype/plugins/nuke/create/create_write.py index 8a1f958f9e..042826d4d9 100644 --- a/pype/plugins/nuke/create/create_write.py +++ b/pype/plugins/nuke/create/create_write.py @@ -34,6 +34,7 @@ class CreateWriteRender(plugin.PypeCreator): data.update({k: v}) self.data = data + self.nodes = nuke.selectedNodes() self.log.info("self.data: '{}'".format(self.data)) def process(self): @@ -46,9 +47,9 @@ class CreateWriteRender(plugin.PypeCreator): # use selection if (self.options or {}).get("useSelection"): - nodes = nuke.selectedNodes() + nodes = self.nodes - assert len(nodes) == 1, self.log.error("Select only one node. The node you want to connect to, or tick off `Use selection`") + assert len(nodes) < 2, self.log.error("Select only one node. The node you want to connect to, or tick off `Use selection`") selected_node = nodes[0] inputs = [selected_node] From 8a14e5d544ade37068cda522628eaff068b411a5 Mon Sep 17 00:00:00 2001 From: Milan Kolar Date: Wed, 11 Dec 2019 00:05:52 +0100 Subject: [PATCH 11/20] make sure we process texture when force copy and maketx is enabled --- pype/plugins/maya/publish/collect_look.py | 4 ---- pype/plugins/maya/publish/extract_look.py | 17 +++++++++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pype/plugins/maya/publish/collect_look.py b/pype/plugins/maya/publish/collect_look.py index 17f8180fdf..7a5fea776c 100644 --- a/pype/plugins/maya/publish/collect_look.py +++ b/pype/plugins/maya/publish/collect_look.py @@ -219,10 +219,6 @@ class CollectLook(pyblish.api.InstancePlugin): with lib.renderlayer(instance.data["renderlayer"]): self.collect(instance) - # make ftrack publishable - self.maketx = instance.data.get('maketx', True) - instance.data['maketx'] = self.maketx - self.log.info('maketx: {}'.format(self.maketx)) def collect(self, instance): diff --git a/pype/plugins/maya/publish/extract_look.py b/pype/plugins/maya/publish/extract_look.py index ad43e02d21..5226f80f7a 100644 --- a/pype/plugins/maya/publish/extract_look.py +++ b/pype/plugins/maya/publish/extract_look.py @@ -74,6 +74,8 @@ def maketx(source, destination, *args): cmd.extend(args) cmd.extend(["-o", destination, source]) + cmd = " ".join(cmd) + CREATE_NO_WINDOW = 0x08000000 kwargs = dict(args=cmd, stderr=subprocess.STDOUT) @@ -183,6 +185,7 @@ class ExtractLook(pype.api.Extractor): transfers = list() hardlinks = list() hashes = dict() + forceCopy = instance.data.get("forceCopy", False) self.log.info(files) for filepath in files_metadata: @@ -195,20 +198,26 @@ class ExtractLook(pype.api.Extractor): files_metadata[filepath]["color_space"] = "raw" source, mode, hash = self._process_texture( - filepath, do_maketx, staging=dir_path, linearise=linearise + filepath, + do_maketx, + staging=dir_path, + linearise=linearise, + force=forceCopy ) destination = self.resource_destination(instance, source, do_maketx) # Force copy is specified. - if instance.data.get("forceCopy", False): + if forceCopy: mode = COPY if mode == COPY: transfers.append((source, destination)) + self.log.info('copying') elif mode == HARDLINK: hardlinks.append((source, destination)) + self.log.info('hardlinking') # Store the hashes from hash to destination to include in the # database @@ -337,7 +346,7 @@ class ExtractLook(pype.api.Extractor): instance.data["assumedDestination"], "resources", basename + ext ) - def _process_texture(self, filepath, do_maketx, staging, linearise): + def _process_texture(self, filepath, do_maketx, staging, linearise, force): """Process a single texture file on disk for publishing. This will: 1. Check whether it's already published, if so it will do hardlink @@ -359,7 +368,7 @@ class ExtractLook(pype.api.Extractor): # If source has been published before with the same settings, # then don't reprocess but hardlink from the original existing = find_paths_by_hash(texture_hash) - if existing: + if existing and not force: self.log.info("Found hash in database, preparing hardlink..") source = next((p for p in existing if os.path.exists(p)), None) if filepath: From 7b09e1c41e846c0ba1a0686653cdf5c3101c0c17 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:24:06 +0100 Subject: [PATCH 12/20] formatting --- pype/ftrack/ftrack_server/lib.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 748937c7bd..fd4c1fe7b9 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -49,7 +49,9 @@ def ftrack_events_mongo_settings(): def get_ftrack_event_mongo_info(): - host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings() + host, port, database, username, password, collection, auth_db = ( + ftrack_events_mongo_settings() + ) user_pass = "" if username and password: user_pass = "{}:{}@".format(username, password) From 4497a89f53198adfeb60d6ea663fd9e266f0ef5e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:24:36 +0100 Subject: [PATCH 13/20] Storer EventHub moved to lib --- pype/ftrack/ftrack_server/lib.py | 29 +++++++++++++++++++++ pype/ftrack/ftrack_server/session_storer.py | 27 ------------------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index fd4c1fe7b9..80c147d400 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -99,3 +99,32 @@ def check_ftrack_url(url, log_errors=True): print('DEBUG: Ftrack server {} is accessible.'.format(url)) return url + + +class StorerEventHub(ftrack_api.event.hub.EventHub): + def __init__(self, *args, **kwargs): + self.sock = kwargs.pop("sock") + super(StorerEventHub, self).__init__(*args, **kwargs) + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "heartbeat": + # Reply with heartbeat. + self.sock.sendall(b"storer") + return self._send_packet(self._code_name_mapping['heartbeat']) + + elif code_name == "connect": + event = ftrack_api.event.base.Event( + topic="pype.storer.started", + data={}, + source={ + "id": self.id, + "user": {"username": self._api_user} + } + ) + self._event_queue.put(event) + + return super(StorerEventHub, self)._handle_packet( + code, packet_identifier, path, data + ) diff --git a/pype/ftrack/ftrack_server/session_storer.py b/pype/ftrack/ftrack_server/session_storer.py index 0b44d7d3a1..29abf329f0 100644 --- a/pype/ftrack/ftrack_server/session_storer.py +++ b/pype/ftrack/ftrack_server/session_storer.py @@ -14,33 +14,6 @@ import ftrack_api.event from ftrack_api.logging import LazyLogMessage as L -class StorerEventHub(ftrack_api.event.hub.EventHub): - def __init__(self, *args, **kwargs): - self.sock = kwargs.pop("sock") - super(StorerEventHub, self).__init__(*args, **kwargs) - - def _handle_packet(self, code, packet_identifier, path, data): - """Override `_handle_packet` which extend heartbeat""" - code_name = self._code_name_mapping[code] - if code_name == "heartbeat": - # Reply with heartbeat. - self.sock.sendall(b"storer") - return self._send_packet(self._code_name_mapping['heartbeat']) - - elif code_name == "connect": - event = ftrack_api.event.base.Event( - topic="pype.storer.started", - data={}, - source={ - "id": self.id, - "user": {"username": self._api_user} - } - ) - self._event_queue.put(event) - - return super(StorerEventHub, self)._handle_packet( - code, packet_identifier, path, data - ) class StorerSession(ftrack_api.session.Session): From d714f5fb780c7c7db13e4610be385590e373778a Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:25:25 +0100 Subject: [PATCH 14/20] Processor Eventub moved to lib --- pype/ftrack/ftrack_server/lib.py | 115 ++++++++++++++++++ .../ftrack/ftrack_server/session_processor.py | 110 ----------------- 2 files changed, 115 insertions(+), 110 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 80c147d400..091df72a98 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -128,3 +128,118 @@ class StorerEventHub(ftrack_api.event.hub.EventHub): return super(StorerEventHub, self)._handle_packet( code, packet_identifier, path, data ) + + +class ProcessEventHub(ftrack_api.event.hub.EventHub): + url, database, table_name = get_ftrack_event_mongo_info() + + is_table_created = False + pypelog = Logger().get_logger("Session Processor") + + def __init__(self, *args, **kwargs): + self.dbcon = DbConnector( + mongo_url=self.url, + database_name=self.database, + table_name=self.table_name + ) + self.sock = kwargs.pop("sock") + super(ProcessEventHub, self).__init__(*args, **kwargs) + + def prepare_dbcon(self): + try: + self.dbcon.install() + self.dbcon._database.list_collection_names() + except pymongo.errors.AutoReconnect: + self.pypelog.error( + "Mongo server \"{}\" is not responding, exiting.".format( + os.environ["AVALON_MONGO"] + ) + ) + sys.exit(0) + + except pymongo.errors.OperationFailure: + self.pypelog.error(( + "Error with Mongo access, probably permissions." + "Check if exist database with name \"{}\"" + " and collection \"{}\" inside." + ).format(self.database, self.table_name)) + self.sock.sendall(b"MongoError") + sys.exit(0) + + def wait(self, duration=None): + """Overriden wait + + Event are loaded from Mongo DB when queue is empty. Handled event is + set as processed in Mongo DB. + """ + started = time.time() + self.prepare_dbcon() + while True: + try: + event = self._event_queue.get(timeout=0.1) + except queue.Empty: + if not self.load_events(): + time.sleep(0.5) + else: + try: + self._handle(event) + self.dbcon.update_one( + {"id": event["id"]}, + {"$set": {"pype_data.is_processed": True}} + ) + except pymongo.errors.AutoReconnect: + self.pypelog.error(( + "Mongo server \"{}\" is not responding, exiting." + ).format(os.environ["AVALON_MONGO"])) + sys.exit(0) + # Additional special processing of events. + if event['topic'] == 'ftrack.meta.disconnected': + break + + if duration is not None: + if (time.time() - started) > duration: + break + + def load_events(self): + """Load not processed events sorted by stored date""" + ago_date = datetime.datetime.now() - datetime.timedelta(days=3) + result = self.dbcon.delete_many({ + "pype_data.stored": {"$lte": ago_date}, + "pype_data.is_processed": True + }) + + not_processed_events = self.dbcon.find( + {"pype_data.is_processed": False} + ).sort( + [("pype_data.stored", pymongo.ASCENDING)] + ) + + found = False + for event_data in not_processed_events: + new_event_data = { + k: v for k, v in event_data.items() + if k not in ["_id", "pype_data"] + } + try: + event = ftrack_api.event.base.Event(**new_event_data) + except Exception: + self.logger.exception(L( + 'Failed to convert payload into event: {0}', + event_data + )) + continue + found = True + self._event_queue.put(event) + + return found + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which skip events and extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "event": + return + if code_name == "heartbeat": + self.sock.sendall(b"processor") + return self._send_packet(self._code_name_mapping["heartbeat"]) + + return super()._handle_packet(code, packet_identifier, path, data) diff --git a/pype/ftrack/ftrack_server/session_processor.py b/pype/ftrack/ftrack_server/session_processor.py index 133719bab4..a17f919969 100644 --- a/pype/ftrack/ftrack_server/session_processor.py +++ b/pype/ftrack/ftrack_server/session_processor.py @@ -24,116 +24,6 @@ from pypeapp import Logger log = Logger().get_logger("Session processor") -class ProcessEventHub(ftrack_api.event.hub.EventHub): - url, database, table_name = get_ftrack_event_mongo_info() - - is_table_created = False - - def __init__(self, *args, **kwargs): - self.dbcon = DbConnector( - mongo_url=self.url, - database_name=self.database, - table_name=self.table_name - ) - self.sock = kwargs.pop("sock") - super(ProcessEventHub, self).__init__(*args, **kwargs) - - def prepare_dbcon(self): - try: - self.dbcon.install() - self.dbcon._database.list_collection_names() - except pymongo.errors.AutoReconnect: - log.error("Mongo server \"{}\" is not responding, exiting.".format( - os.environ["AVALON_MONGO"] - )) - sys.exit(0) - - except pymongo.errors.OperationFailure: - log.error(( - "Error with Mongo access, probably permissions." - "Check if exist database with name \"{}\"" - " and collection \"{}\" inside." - ).format(self.database, self.table_name)) - self.sock.sendall(b"MongoError") - sys.exit(0) - - def wait(self, duration=None): - """Overriden wait - - Event are loaded from Mongo DB when queue is empty. Handled event is - set as processed in Mongo DB. - """ - started = time.time() - self.prepare_dbcon() - while True: - try: - event = self._event_queue.get(timeout=0.1) - except queue.Empty: - if not self.load_events(): - time.sleep(0.5) - else: - try: - self._handle(event) - self.dbcon.update_one( - {"id": event["id"]}, - {"$set": {"pype_data.is_processed": True}} - ) - except pymongo.errors.AutoReconnect: - log.error(( - "Mongo server \"{}\" is not responding, exiting." - ).format(os.environ["AVALON_MONGO"])) - sys.exit(0) - # Additional special processing of events. - if event['topic'] == 'ftrack.meta.disconnected': - break - - if duration is not None: - if (time.time() - started) > duration: - break - - def load_events(self): - """Load not processed events sorted by stored date""" - ago_date = datetime.datetime.now() - datetime.timedelta(days=3) - result = self.dbcon.delete_many({ - "pype_data.stored": {"$lte": ago_date}, - "pype_data.is_processed": True - }) - - not_processed_events = self.dbcon.find( - {"pype_data.is_processed": False} - ).sort( - [("pype_data.stored", pymongo.ASCENDING)] - ) - - found = False - for event_data in not_processed_events: - new_event_data = { - k: v for k, v in event_data.items() - if k not in ["_id", "pype_data"] - } - try: - event = ftrack_api.event.base.Event(**new_event_data) - except Exception: - self.logger.exception(L( - 'Failed to convert payload into event: {0}', - event_data - )) - continue - found = True - self._event_queue.put(event) - - return found - - def _handle_packet(self, code, packet_identifier, path, data): - """Override `_handle_packet` which skip events and extend heartbeat""" - code_name = self._code_name_mapping[code] - if code_name == "event": - return - if code_name == "heartbeat": - self.sock.sendall(b"processor") - return self._send_packet(self._code_name_mapping["heartbeat"]) - - return super()._handle_packet(code, packet_identifier, path, data) class ProcessSession(ftrack_api.session.Session): From edacecf04152ee36e2e6047747fbc69ba4dfc75f Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:27:59 +0100 Subject: [PATCH 15/20] sessions from processor and storer moved to lib and added possiblity to set EventHub class in init --- pype/ftrack/ftrack_server/lib.py | 178 +++++++++++++ .../ftrack/ftrack_server/session_processor.py | 182 ------------- pype/ftrack/ftrack_server/session_storer.py | 242 ------------------ 3 files changed, 178 insertions(+), 424 deletions(-) delete mode 100644 pype/ftrack/ftrack_server/session_processor.py delete mode 100644 pype/ftrack/ftrack_server/session_storer.py diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 091df72a98..edd3cee09b 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -1,10 +1,32 @@ import os +import sys +import logging +import getpass +import atexit +import tempfile +import threading +import datetime +import time +import queue +import pymongo + import requests +import ftrack_api +import ftrack_api.session +import ftrack_api.cache +import ftrack_api.operation +import ftrack_api._centralized_storage_scenario +import ftrack_api.event +from ftrack_api.logging import LazyLogMessage as L try: from urllib.parse import urlparse, parse_qs except ImportError: from urlparse import urlparse, parse_qs +from pypeapp import Logger + +from pype.ftrack.lib.custom_db_connector import DbConnector + def ftrack_events_mongo_settings(): host = None @@ -243,3 +265,159 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub): return self._send_packet(self._code_name_mapping["heartbeat"]) return super()._handle_packet(code, packet_identifier, path, data) +class SocketSession(ftrack_api.session.Session): + '''An isolated session for interaction with an ftrack server.''' + def __init__( + self, server_url=None, api_key=None, api_user=None, auto_populate=True, + plugin_paths=None, cache=None, cache_key_maker=None, + auto_connect_event_hub=None, schema_cache_path=None, + plugin_arguments=None, sock=None, Eventhub=None + ): + super(ftrack_api.session.Session, self).__init__() + self.logger = logging.getLogger( + __name__ + '.' + self.__class__.__name__ + ) + self._closed = False + + if server_url is None: + server_url = os.environ.get('FTRACK_SERVER') + + if not server_url: + raise TypeError( + 'Required "server_url" not specified. Pass as argument or set ' + 'in environment variable FTRACK_SERVER.' + ) + + self._server_url = server_url + + if api_key is None: + api_key = os.environ.get( + 'FTRACK_API_KEY', + # Backwards compatibility + os.environ.get('FTRACK_APIKEY') + ) + + if not api_key: + raise TypeError( + 'Required "api_key" not specified. Pass as argument or set in ' + 'environment variable FTRACK_API_KEY.' + ) + + self._api_key = api_key + + if api_user is None: + api_user = os.environ.get('FTRACK_API_USER') + if not api_user: + try: + api_user = getpass.getuser() + except Exception: + pass + + if not api_user: + raise TypeError( + 'Required "api_user" not specified. Pass as argument, set in ' + 'environment variable FTRACK_API_USER or one of the standard ' + 'environment variables used by Python\'s getpass module.' + ) + + self._api_user = api_user + + # Currently pending operations. + self.recorded_operations = ftrack_api.operation.Operations() + self.record_operations = True + + self.cache_key_maker = cache_key_maker + if self.cache_key_maker is None: + self.cache_key_maker = ftrack_api.cache.StringKeyMaker() + + # Enforce always having a memory cache at top level so that the same + # in-memory instance is returned from session. + self.cache = ftrack_api.cache.LayeredCache([ + ftrack_api.cache.MemoryCache() + ]) + + if cache is not None: + if callable(cache): + cache = cache(self) + + if cache is not None: + self.cache.caches.append(cache) + + self._managed_request = None + self._request = requests.Session() + self._request.auth = ftrack_api.session.SessionAuthentication( + self._api_key, self._api_user + ) + + self.auto_populate = auto_populate + + # Fetch server information and in doing so also check credentials. + self._server_information = self._fetch_server_information() + + # Now check compatibility of server based on retrieved information. + self.check_server_compatibility() + + # Construct event hub and load plugins. + if Eventhub is None: + Eventhub = ftrack_api.event.hub.EventHub + self._event_hub = Eventhub( + self._server_url, + self._api_user, + self._api_key, + sock=sock + ) + + self._auto_connect_event_hub_thread = None + if auto_connect_event_hub in (None, True): + # Connect to event hub in background thread so as not to block main + # session usage waiting for event hub connection. + self._auto_connect_event_hub_thread = threading.Thread( + target=self._event_hub.connect + ) + self._auto_connect_event_hub_thread.daemon = True + self._auto_connect_event_hub_thread.start() + + # To help with migration from auto_connect_event_hub default changing + # from True to False. + self._event_hub._deprecation_warning_auto_connect = ( + auto_connect_event_hub is None + ) + + # Register to auto-close session on exit. + atexit.register(self.close) + + self._plugin_paths = plugin_paths + if self._plugin_paths is None: + self._plugin_paths = os.environ.get( + 'FTRACK_EVENT_PLUGIN_PATH', '' + ).split(os.pathsep) + + self._discover_plugins(plugin_arguments=plugin_arguments) + + # TODO: Make schemas read-only and non-mutable (or at least without + # rebuilding types)? + if schema_cache_path is not False: + if schema_cache_path is None: + schema_cache_path = os.environ.get( + 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() + ) + + schema_cache_path = os.path.join( + schema_cache_path, 'ftrack_api_schema_cache.json' + ) + + self.schemas = self._load_schemas(schema_cache_path) + self.types = self._build_entity_type_classes(self.schemas) + + ftrack_api._centralized_storage_scenario.register(self) + + self._configure_locations() + self.event_hub.publish( + ftrack_api.event.base.Event( + topic='ftrack.api.session.ready', + data=dict( + session=self + ) + ), + synchronous=True + ) diff --git a/pype/ftrack/ftrack_server/session_processor.py b/pype/ftrack/ftrack_server/session_processor.py deleted file mode 100644 index a17f919969..0000000000 --- a/pype/ftrack/ftrack_server/session_processor.py +++ /dev/null @@ -1,182 +0,0 @@ -import logging -import os -import atexit -import datetime -import tempfile -import threading -import time -import requests -import queue -import pymongo - -import ftrack_api -import ftrack_api.session -import ftrack_api.cache -import ftrack_api.operation -import ftrack_api._centralized_storage_scenario -import ftrack_api.event -from ftrack_api.logging import LazyLogMessage as L - -from pype.ftrack.lib.custom_db_connector import DbConnector -from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info -from pypeapp import Logger - -log = Logger().get_logger("Session processor") - - - - -class ProcessSession(ftrack_api.session.Session): - '''An isolated session for interaction with an ftrack server.''' - def __init__( - self, server_url=None, api_key=None, api_user=None, auto_populate=True, - plugin_paths=None, cache=None, cache_key_maker=None, - auto_connect_event_hub=None, schema_cache_path=None, - plugin_arguments=None, sock=None - ): - super(ftrack_api.session.Session, self).__init__() - self.logger = logging.getLogger( - __name__ + '.' + self.__class__.__name__ - ) - self._closed = False - - if server_url is None: - server_url = os.environ.get('FTRACK_SERVER') - - if not server_url: - raise TypeError( - 'Required "server_url" not specified. Pass as argument or set ' - 'in environment variable FTRACK_SERVER.' - ) - - self._server_url = server_url - - if api_key is None: - api_key = os.environ.get( - 'FTRACK_API_KEY', - # Backwards compatibility - os.environ.get('FTRACK_APIKEY') - ) - - if not api_key: - raise TypeError( - 'Required "api_key" not specified. Pass as argument or set in ' - 'environment variable FTRACK_API_KEY.' - ) - - self._api_key = api_key - - if api_user is None: - api_user = os.environ.get('FTRACK_API_USER') - if not api_user: - try: - api_user = getpass.getuser() - except Exception: - pass - - if not api_user: - raise TypeError( - 'Required "api_user" not specified. Pass as argument, set in ' - 'environment variable FTRACK_API_USER or one of the standard ' - 'environment variables used by Python\'s getpass module.' - ) - - self._api_user = api_user - - # Currently pending operations. - self.recorded_operations = ftrack_api.operation.Operations() - self.record_operations = True - - self.cache_key_maker = cache_key_maker - if self.cache_key_maker is None: - self.cache_key_maker = ftrack_api.cache.StringKeyMaker() - - # Enforce always having a memory cache at top level so that the same - # in-memory instance is returned from session. - self.cache = ftrack_api.cache.LayeredCache([ - ftrack_api.cache.MemoryCache() - ]) - - if cache is not None: - if callable(cache): - cache = cache(self) - - if cache is not None: - self.cache.caches.append(cache) - - self._managed_request = None - self._request = requests.Session() - self._request.auth = ftrack_api.session.SessionAuthentication( - self._api_key, self._api_user - ) - - self.auto_populate = auto_populate - - # Fetch server information and in doing so also check credentials. - self._server_information = self._fetch_server_information() - - # Now check compatibility of server based on retrieved information. - self.check_server_compatibility() - - # Construct event hub and load plugins. - self._event_hub = ProcessEventHub( - self._server_url, - self._api_user, - self._api_key, - sock=sock - ) - - self._auto_connect_event_hub_thread = None - if auto_connect_event_hub in (None, True): - # Connect to event hub in background thread so as not to block main - # session usage waiting for event hub connection. - self._auto_connect_event_hub_thread = threading.Thread( - target=self._event_hub.connect - ) - self._auto_connect_event_hub_thread.daemon = True - self._auto_connect_event_hub_thread.start() - - # To help with migration from auto_connect_event_hub default changing - # from True to False. - self._event_hub._deprecation_warning_auto_connect = ( - auto_connect_event_hub is None - ) - - # Register to auto-close session on exit. - atexit.register(self.close) - - self._plugin_paths = plugin_paths - if self._plugin_paths is None: - self._plugin_paths = os.environ.get( - 'FTRACK_EVENT_PLUGIN_PATH', '' - ).split(os.pathsep) - - self._discover_plugins(plugin_arguments=plugin_arguments) - - # TODO: Make schemas read-only and non-mutable (or at least without - # rebuilding types)? - if schema_cache_path is not False: - if schema_cache_path is None: - schema_cache_path = os.environ.get( - 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() - ) - - schema_cache_path = os.path.join( - schema_cache_path, 'ftrack_api_schema_cache.json' - ) - - self.schemas = self._load_schemas(schema_cache_path) - self.types = self._build_entity_type_classes(self.schemas) - - ftrack_api._centralized_storage_scenario.register(self) - - self._configure_locations() - self.event_hub.publish( - ftrack_api.event.base.Event( - topic='ftrack.api.session.ready', - data=dict( - session=self - ) - ), - synchronous=True - ) diff --git a/pype/ftrack/ftrack_server/session_storer.py b/pype/ftrack/ftrack_server/session_storer.py deleted file mode 100644 index 29abf329f0..0000000000 --- a/pype/ftrack/ftrack_server/session_storer.py +++ /dev/null @@ -1,242 +0,0 @@ -import logging -import os -import atexit -import tempfile -import threading -import requests - -import ftrack_api -import ftrack_api.session -import ftrack_api.cache -import ftrack_api.operation -import ftrack_api._centralized_storage_scenario -import ftrack_api.event -from ftrack_api.logging import LazyLogMessage as L - - - - -class StorerSession(ftrack_api.session.Session): - '''An isolated session for interaction with an ftrack server.''' - def __init__( - self, server_url=None, api_key=None, api_user=None, auto_populate=True, - plugin_paths=None, cache=None, cache_key_maker=None, - auto_connect_event_hub=None, schema_cache_path=None, - plugin_arguments=None, sock=None - ): - '''Initialise session. - - *server_url* should be the URL of the ftrack server to connect to - including any port number. If not specified attempt to look up from - :envvar:`FTRACK_SERVER`. - - *api_key* should be the API key to use for authentication whilst - *api_user* should be the username of the user in ftrack to record - operations against. If not specified, *api_key* should be retrieved - from :envvar:`FTRACK_API_KEY` and *api_user* from - :envvar:`FTRACK_API_USER`. - - If *auto_populate* is True (the default), then accessing entity - attributes will cause them to be automatically fetched from the server - if they are not already. This flag can be changed on the session - directly at any time. - - *plugin_paths* should be a list of paths to search for plugins. If not - specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`. - - *cache* should be an instance of a cache that fulfils the - :class:`ftrack_api.cache.Cache` interface and will be used as the cache - for the session. It can also be a callable that will be called with the - session instance as sole argument. The callable should return ``None`` - if a suitable cache could not be configured, but session instantiation - can continue safely. - - .. note:: - - The session will add the specified cache to a pre-configured layered - cache that specifies the top level cache as a - :class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary - to construct a separate memory cache for typical behaviour. Working - around this behaviour or removing the memory cache can lead to - unexpected behaviour. - - *cache_key_maker* should be an instance of a key maker that fulfils the - :class:`ftrack_api.cache.KeyMaker` interface and will be used to - generate keys for objects being stored in the *cache*. If not specified, - a :class:`~ftrack_api.cache.StringKeyMaker` will be used. - - If *auto_connect_event_hub* is True then embedded event hub will be - automatically connected to the event server and allow for publishing and - subscribing to **non-local** events. If False, then only publishing and - subscribing to **local** events will be possible until the hub is - manually connected using :meth:`EventHub.connect - `. - - .. note:: - - The event hub connection is performed in a background thread to - improve session startup time. If a registered plugin requires a - connected event hub then it should check the event hub connection - status explicitly. Subscribing to events does *not* require a - connected event hub. - - Enable schema caching by setting *schema_cache_path* to a folder path. - If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to - determine the path to store cache in. If the environment variable is - also not specified then a temporary directory will be used. Set to - `False` to disable schema caching entirely. - - *plugin_arguments* should be an optional mapping (dict) of keyword - arguments to pass to plugin register functions upon discovery. If a - discovered plugin has a signature that is incompatible with the passed - arguments, the discovery mechanism will attempt to reduce the passed - arguments to only those that the plugin accepts. Note that a warning - will be logged in this case. - - ''' - super(ftrack_api.session.Session, self).__init__() - self.logger = logging.getLogger( - __name__ + '.' + self.__class__.__name__ - ) - self._closed = False - - if server_url is None: - server_url = os.environ.get('FTRACK_SERVER') - - if not server_url: - raise TypeError( - 'Required "server_url" not specified. Pass as argument or set ' - 'in environment variable FTRACK_SERVER.' - ) - - self._server_url = server_url - - if api_key is None: - api_key = os.environ.get( - 'FTRACK_API_KEY', - # Backwards compatibility - os.environ.get('FTRACK_APIKEY') - ) - - if not api_key: - raise TypeError( - 'Required "api_key" not specified. Pass as argument or set in ' - 'environment variable FTRACK_API_KEY.' - ) - - self._api_key = api_key - - if api_user is None: - api_user = os.environ.get('FTRACK_API_USER') - if not api_user: - try: - api_user = getpass.getuser() - except Exception: - pass - - if not api_user: - raise TypeError( - 'Required "api_user" not specified. Pass as argument, set in ' - 'environment variable FTRACK_API_USER or one of the standard ' - 'environment variables used by Python\'s getpass module.' - ) - - self._api_user = api_user - - # Currently pending operations. - self.recorded_operations = ftrack_api.operation.Operations() - self.record_operations = True - - self.cache_key_maker = cache_key_maker - if self.cache_key_maker is None: - self.cache_key_maker = ftrack_api.cache.StringKeyMaker() - - # Enforce always having a memory cache at top level so that the same - # in-memory instance is returned from session. - self.cache = ftrack_api.cache.LayeredCache([ - ftrack_api.cache.MemoryCache() - ]) - - if cache is not None: - if callable(cache): - cache = cache(self) - - if cache is not None: - self.cache.caches.append(cache) - - self._managed_request = None - self._request = requests.Session() - self._request.auth = ftrack_api.session.SessionAuthentication( - self._api_key, self._api_user - ) - - self.auto_populate = auto_populate - - # Fetch server information and in doing so also check credentials. - self._server_information = self._fetch_server_information() - - # Now check compatibility of server based on retrieved information. - self.check_server_compatibility() - - # Construct event hub and load plugins. - self._event_hub = StorerEventHub( - self._server_url, - self._api_user, - self._api_key, - sock=sock - ) - - self._auto_connect_event_hub_thread = None - if auto_connect_event_hub in (None, True): - # Connect to event hub in background thread so as not to block main - # session usage waiting for event hub connection. - self._auto_connect_event_hub_thread = threading.Thread( - target=self._event_hub.connect - ) - self._auto_connect_event_hub_thread.daemon = True - self._auto_connect_event_hub_thread.start() - - # To help with migration from auto_connect_event_hub default changing - # from True to False. - self._event_hub._deprecation_warning_auto_connect = ( - auto_connect_event_hub is None - ) - - # Register to auto-close session on exit. - atexit.register(self.close) - - self._plugin_paths = plugin_paths - if self._plugin_paths is None: - self._plugin_paths = os.environ.get( - 'FTRACK_EVENT_PLUGIN_PATH', '' - ).split(os.pathsep) - - self._discover_plugins(plugin_arguments=plugin_arguments) - - # TODO: Make schemas read-only and non-mutable (or at least without - # rebuilding types)? - if schema_cache_path is not False: - if schema_cache_path is None: - schema_cache_path = os.environ.get( - 'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir() - ) - - schema_cache_path = os.path.join( - schema_cache_path, 'ftrack_api_schema_cache.json' - ) - - self.schemas = self._load_schemas(schema_cache_path) - self.types = self._build_entity_type_classes(self.schemas) - - ftrack_api._centralized_storage_scenario.register(self) - - self._configure_locations() - self.event_hub.publish( - ftrack_api.event.base.Event( - topic='ftrack.api.session.ready', - data=dict( - session=self - ) - ), - synchronous=True - ) From dd52e6594db97395f4ace8c39c0d873518f54d09 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:29:28 +0100 Subject: [PATCH 16/20] used moved session and event hubs in subprocesses --- .../ftrack_server/sub_event_processor.py | 13 ++++---- pype/ftrack/ftrack_server/sub_event_storer.py | 30 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_processor.py b/pype/ftrack/ftrack_server/sub_event_processor.py index 6ada787223..9c971ca916 100644 --- a/pype/ftrack/ftrack_server/sub_event_processor.py +++ b/pype/ftrack/ftrack_server/sub_event_processor.py @@ -1,12 +1,9 @@ -import os import sys -import datetime import signal import socket -import pymongo from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.session_processor import ProcessSession +from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub from pypeapp import Logger log = Logger().get_logger("Event processor") @@ -24,12 +21,14 @@ def main(args): sock.sendall(b"CreatedProcess") try: - session = ProcessSession(auto_connect_event_hub=True, sock=sock) - server = FtrackServer('event') + session = SocketSession( + auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub + ) + server = FtrackServer("event") log.debug("Launched Ftrack Event processor") server.run_server(session) - except Exception as exc: + except Exception: log.error("Event server crashed. See traceback below", exc_info=True) finally: diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py index 4828b10bfa..11cda0e487 100644 --- a/pype/ftrack/ftrack_server/sub_event_storer.py +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -7,22 +7,22 @@ import pymongo import ftrack_api from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info +from pype.ftrack.ftrack_server.lib import ( + get_ftrack_event_mongo_info, + SocketSession, + StorerEventHub +) from pype.ftrack.lib.custom_db_connector import DbConnector -from session_storer import StorerSession from pypeapp import Logger log = Logger().get_logger("Event storer") + +class SessionFactory: + session = None + + url, database, table_name = get_ftrack_event_mongo_info() - - -class SessionClass: - def __init__(self): - self.session = None - - -session_obj = SessionClass() dbcon = DbConnector( mongo_url=url, database_name=database, @@ -75,7 +75,7 @@ def launch(event): def trigger_sync(event): - session = session_obj.session + session = SessionFactory.session if session is None: log.warning("Session is not set. Can't trigger Sync to avalon action.") return True @@ -93,7 +93,7 @@ def trigger_sync(event): "$set": {"pype_data.is_processed": True} } dbcon.update_many(query, set_dict) - + selections = [] for project in projects: if project["status"] != "active": @@ -154,8 +154,10 @@ def main(args): sock.sendall(b"CreatedStore") try: - session = StorerSession(auto_connect_event_hub=True, sock=sock) - session_obj.session = session + session = SocketSession( + auto_connect_event_hub=True, sock=sock, Eventhub=StorerEventHub + ) + SessionFactory.session = session register(session) server = FtrackServer("event") log.debug("Launched Ftrack Event storer") From 8d7f29c1bac8fc1b6cf66137e07771b42b582c1e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:32:03 +0100 Subject: [PATCH 17/20] fix in trigger sync method, now check event_hub id to not trigger sync on every connect event --- pype/ftrack/ftrack_server/sub_event_storer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py index 11cda0e487..dfe8e21654 100644 --- a/pype/ftrack/ftrack_server/sub_event_storer.py +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -76,6 +76,10 @@ def launch(event): def trigger_sync(event): session = SessionFactory.session + source_id = event.get("source", {}).get("id") + if not source_id or source_id != session.event_hub.id: + return + if session is None: log.warning("Session is not set. Can't trigger Sync to avalon action.") return True From f5c326aa568a3660c4a8e612e1704b613403577e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:33:22 +0100 Subject: [PATCH 18/20] formatting changes --- pype/ftrack/ftrack_server/socket_thread.py | 4 ++-- pype/ftrack/ftrack_server/sub_legacy_server.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py index d0a2868743..3309f75cd7 100644 --- a/pype/ftrack/ftrack_server/socket_thread.py +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -1,7 +1,5 @@ import os -import sys import time -import signal import socket import threading import subprocess @@ -10,7 +8,9 @@ from pypeapp import Logger class SocketThread(threading.Thread): """Thread that checks suprocess of storer of processor of events""" + MAX_TIMEOUT = 35 + def __init__(self, name, port, filepath): super(SocketThread, self).__init__() self.log = Logger().get_logger("SocketThread", "Event Thread") diff --git a/pype/ftrack/ftrack_server/sub_legacy_server.py b/pype/ftrack/ftrack_server/sub_legacy_server.py index 31f38d0404..8b7bab5e2e 100644 --- a/pype/ftrack/ftrack_server/sub_legacy_server.py +++ b/pype/ftrack/ftrack_server/sub_legacy_server.py @@ -1,4 +1,3 @@ -import os import sys import time import datetime @@ -7,7 +6,6 @@ import threading from ftrack_server import FtrackServer import ftrack_api -from ftrack_api.event.hub import EventHub from pypeapp import Logger log = Logger().get_logger("Event Server Legacy") @@ -37,7 +35,10 @@ class TimerChecker(threading.Thread): if not self.session.event_hub.connected: if not connected: - if (datetime.datetime.now() - start).seconds > self.max_time_out: + if ( + (datetime.datetime.now() - start).seconds > + self.max_time_out + ): log.error(( "Exiting event server. Session was not connected" " to ftrack server in {} seconds." @@ -61,7 +62,7 @@ class TimerChecker(threading.Thread): def main(args): check_thread = None try: - server = FtrackServer('event') + server = FtrackServer("event") session = ftrack_api.Session(auto_connect_event_hub=True) check_thread = TimerChecker(server, session) From 6e2ea0c05bc63fbcb4272e11922c202b55d39b0f Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 11 Dec 2019 12:34:36 +0100 Subject: [PATCH 19/20] change session check in base event handler --- pype/ftrack/lib/ftrack_base_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/lib/ftrack_base_handler.py b/pype/ftrack/lib/ftrack_base_handler.py index 4b57452961..8329505ffb 100644 --- a/pype/ftrack/lib/ftrack_base_handler.py +++ b/pype/ftrack/lib/ftrack_base_handler.py @@ -2,7 +2,7 @@ import functools import time from pypeapp import Logger import ftrack_api -from pype.ftrack.ftrack_server import session_processor +from pype.ftrack.ftrack_server.lib import SocketSession class MissingPermision(Exception): @@ -41,7 +41,7 @@ class BaseHandler(object): self.log = Logger().get_logger(self.__class__.__name__) if not( isinstance(session, ftrack_api.session.Session) or - isinstance(session, session_processor.ProcessSession) + isinstance(session, SocketSession) ): raise Exception(( "Session object entered with args is instance of \"{}\"" From df34ed8705bf62e611b0b7d9e1e3f725df178457 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Thu, 12 Dec 2019 17:44:40 +0100 Subject: [PATCH 20/20] refactored verion to task event and added feature with more statuses to set --- .../events/event_version_to_task_statuses.py | 170 ++++++++++++------ 1 file changed, 113 insertions(+), 57 deletions(-) diff --git a/pype/ftrack/events/event_version_to_task_statuses.py b/pype/ftrack/events/event_version_to_task_statuses.py index cd83b819bc..1f5f1514d7 100644 --- a/pype/ftrack/events/event_version_to_task_statuses.py +++ b/pype/ftrack/events/event_version_to_task_statuses.py @@ -4,6 +4,7 @@ from pypeapp import config class VersionToTaskStatus(BaseEvent): + # Presets usage default_status_mapping = {} def launch(self, session, event): @@ -11,69 +12,124 @@ class VersionToTaskStatus(BaseEvent): # start of event procedure ---------------------------------- for entity in event['data'].get('entities', []): - # Filter non-assetversions - if ( - entity['entityType'] == 'assetversion' and - 'statusid' in (entity.get('keys') or []) - ): + # Filter AssetVersions + if entity["entityType"] != "assetversion": + continue - version = session.get('AssetVersion', entity['entityId']) - try: - version_status = session.get( - 'Status', entity['changes']['statusid']['new'] - ) - except Exception: + # Skip if statusid not in keys (in changes) + keys = entity.get("keys") + if not keys or "statusid" not in keys: + continue + + # Get new version task name + version_status_id = ( + entity + .get("changes", {}) + .get("statusid", {}) + .get("new", {}) + ) + + # Just check that `new` is set to any value + if not version_status_id: + continue + + try: + version_status = session.get("Status", version_status_id) + except Exception: + self.log.warning( + "Troubles with query status id [ {} ]".format( + version_status_id + ), + exc_info=True + ) + + if not version_status: + continue + + version_status_orig = version_status["name"] + + # Load status mapping from presets + status_mapping = ( + config.get_presets() + .get("ftrack", {}) + .get("ftrack_config", {}) + .get("status_version_to_task") + ) or self.default_status_mapping + + # Skip if mapping is empty + if not status_mapping: + continue + + # Lower version status name and check if has mapping + version_status = version_status_orig.lower() + new_status_names = status_mapping.get(version_status) + if not new_status_names: + continue + + self.log.debug( + "Processing AssetVersion status change: [ {} ]".format( + version_status_orig + ) + ) + + # Backwards compatibility (convert string to list) + if isinstance(new_status_names, str): + new_status_names = [new_status_names] + + # Lower all names from presets + new_status_names = [name.lower() for name in new_status_names] + + # Get entities necessary for processing + version = session.get("AssetVersion", entity["entityId"]) + task = version.get("task") + if not task: + continue + + project_schema = task["project"]["project_schema"] + # Get all available statuses for Task + statuses = project_schema.get_statuses("Task", task["type_id"]) + # map lowered status name with it's object + stat_names_low = { + status["name"].lower(): status for status in statuses + } + + new_status = None + for status_name in new_status_names: + if status_name not in stat_names_low: continue - task_status = version_status - task = version['task'] - self.log.info('>>> version status: [ {} ]'.format( - version_status['name'])) - version_name_low = version_status['name'].lower() + # store object of found status + new_status = stat_names_low[status_name] + self.log.debug("Status to set: [ {} ]".format( + new_status["name"] + )) + break - status_mapping = ( - config.get_presets() - .get("ftrack", {}) - .get("ftrack_config", {}) - .get("status_version_to_task") - ) or self.default_status_mapping + # Skip if status names were not found for paticulat entity + if not new_status: + self.log.warning( + "Any of statuses from presets can be set: {}".format( + str(new_status_names) + ) + ) + continue - status_to_set = status_mapping.get(version_name_low) + # Get full path to task for logging + ent_path = "/".join([ent["name"] for ent in task["link"]]) - self.log.info( - '>>> status to set: [ {} ]'.format(status_to_set)) - - if status_to_set is not None: - query = 'Status where name is "{}"'.format(status_to_set) - try: - task_status = session.query(query).one() - except Exception: - self.log.info( - '!!! status was not found in Ftrack [ {} ]'.format( - status_to_set - ) - ) - continue - - # Proceed if the task status was set - if task_status is not None: - # Get path to task - path = task['name'] - for p in task['ancestors']: - path = p['name'] + '/' + path - - # Setting task status - try: - task['status'] = task_status - session.commit() - except Exception as e: - session.rollback() - self.log.warning('!!! [ {} ] status couldnt be set:\ - [ {} ]'.format(path, e)) - session.rollback() - else: - self.log.info('>>> [ {} ] updated to [ {} ]'.format( - path, task_status['name'])) + # Setting task status + try: + task["status"] = new_status + session.commit() + self.log.debug("[ {} ] Status updated to [ {} ]".format( + ent_path, new_status['name'] + )) + except Exception: + session.rollback() + self.log.warning( + "[ {} ]Status couldn't be set".format(ent_path), + exc_info=True + ) def register(session, plugins_presets):