mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-26 05:42:15 +01:00
Merge branch 'release/2.4.0' into develop
This commit is contained in:
commit
62a7acc754
18 changed files with 1038 additions and 1113 deletions
|
|
@ -1,354 +1,606 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import collections
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from queue import Queue
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
import argparse
|
||||
import ftrack_api
|
||||
from pype.ftrack import BaseAction
|
||||
from pype.ftrack.lib.io_nonsingleton import DbConnector
|
||||
|
||||
|
||||
class DeleteAsset(BaseAction):
|
||||
class DeleteAssetSubset(BaseAction):
|
||||
'''Edit meta data action.'''
|
||||
|
||||
#: Action identifier.
|
||||
identifier = 'delete.asset'
|
||||
identifier = "delete.asset.subset"
|
||||
#: Action label.
|
||||
label = 'Delete Asset/Subsets'
|
||||
label = "Delete Asset/Subsets"
|
||||
#: Action description.
|
||||
description = 'Removes from Avalon with all childs and asset from Ftrack'
|
||||
icon = '{}/ftrack/action_icons/DeleteAsset.svg'.format(
|
||||
os.environ.get('PYPE_STATICS_SERVER', '')
|
||||
description = "Removes from Avalon with all childs and asset from Ftrack"
|
||||
icon = "{}/ftrack/action_icons/DeleteAsset.svg".format(
|
||||
os.environ.get("PYPE_STATICS_SERVER", "")
|
||||
)
|
||||
#: roles that are allowed to register this action
|
||||
role_list = ['Pypeclub', 'Administrator']
|
||||
#: Db
|
||||
db = DbConnector()
|
||||
role_list = ["Pypeclub", "Administrator", "Project Manager"]
|
||||
#: Db connection
|
||||
dbcon = DbConnector()
|
||||
|
||||
value = None
|
||||
splitter = {"type": "label", "value": "---"}
|
||||
action_data_by_id = {}
|
||||
asset_prefix = "asset:"
|
||||
subset_prefix = "subset:"
|
||||
|
||||
def discover(self, session, entities, event):
|
||||
''' Validation '''
|
||||
if len(entities) != 1:
|
||||
return False
|
||||
""" Validation """
|
||||
task_ids = []
|
||||
for ent_info in event["data"]["selection"]:
|
||||
entType = ent_info.get("entityType", "")
|
||||
if entType == "task":
|
||||
task_ids.append(ent_info["entityId"])
|
||||
|
||||
valid = ["task"]
|
||||
entityType = event["data"]["selection"][0].get("entityType", "")
|
||||
if entityType.lower() not in valid:
|
||||
return False
|
||||
|
||||
return True
|
||||
for entity in entities:
|
||||
ftrack_id = entity["id"]
|
||||
if ftrack_id not in task_ids:
|
||||
continue
|
||||
if entity.entity_type.lower() != "task":
|
||||
return True
|
||||
return False
|
||||
|
||||
def _launch(self, event):
|
||||
self.reset_session()
|
||||
try:
|
||||
self.db.install()
|
||||
args = self._translate_event(
|
||||
self.session, event
|
||||
)
|
||||
if "values" not in event["data"]:
|
||||
self.dbcon.install()
|
||||
return self._interface(self.session, *args)
|
||||
|
||||
interface = self._interface(
|
||||
self.session, *args
|
||||
)
|
||||
|
||||
confirmation = self.confirm_delete(
|
||||
True, *args
|
||||
)
|
||||
|
||||
if interface:
|
||||
return interface
|
||||
|
||||
confirmation = self.confirm_delete(*args)
|
||||
if confirmation:
|
||||
return confirmation
|
||||
|
||||
self.dbcon.install()
|
||||
response = self.launch(
|
||||
self.session, *args
|
||||
)
|
||||
finally:
|
||||
self.db.uninstall()
|
||||
self.dbcon.uninstall()
|
||||
|
||||
return self._handle_result(
|
||||
self.session, response, *args
|
||||
)
|
||||
|
||||
def interface(self, session, entities, event):
|
||||
if not event['data'].get('values', {}):
|
||||
self.attempt = 1
|
||||
items = []
|
||||
entity = entities[0]
|
||||
title = 'Choose items to delete from "{}"'.format(entity['name'])
|
||||
project = entity['project']
|
||||
self.show_message(event, "Preparing data...", True)
|
||||
items = []
|
||||
title = "Choose items to delete"
|
||||
|
||||
self.db.Session['AVALON_PROJECT'] = project["full_name"]
|
||||
# Filter selection and get ftrack ids
|
||||
selection = event["data"].get("selection") or []
|
||||
ftrack_ids = []
|
||||
project_in_selection = False
|
||||
for entity in selection:
|
||||
entity_type = (entity.get("entityType") or "").lower()
|
||||
if entity_type != "task":
|
||||
if entity_type == "show":
|
||||
project_in_selection = True
|
||||
continue
|
||||
|
||||
av_entity = self.db.find_one({
|
||||
'type': 'asset',
|
||||
'name': entity['name']
|
||||
ftrack_id = entity.get("entityId")
|
||||
if not ftrack_id:
|
||||
continue
|
||||
|
||||
ftrack_ids.append(ftrack_id)
|
||||
|
||||
if project_in_selection:
|
||||
msg = "It is not possible to use this action on project entity."
|
||||
self.show_message(event, msg, True)
|
||||
|
||||
# Filter event even more (skip task entities)
|
||||
# - task entities are not relevant for avalon
|
||||
for entity in entities:
|
||||
ftrack_id = entity["id"]
|
||||
if ftrack_id not in ftrack_ids:
|
||||
continue
|
||||
|
||||
if entity.entity_type.lower() == "task":
|
||||
ftrack_ids.remove(ftrack_id)
|
||||
|
||||
if not ftrack_ids:
|
||||
# It is bug if this happens!
|
||||
return {
|
||||
"success": False,
|
||||
"message": "Invalid selection for this action (Bug)"
|
||||
}
|
||||
|
||||
if entities[0].entity_type.lower() == "project":
|
||||
project = entities[0]
|
||||
else:
|
||||
project = entities[0]["project"]
|
||||
|
||||
project_name = project["full_name"]
|
||||
self.dbcon.Session["AVALON_PROJECT"] = project_name
|
||||
|
||||
selected_av_entities = self.dbcon.find({
|
||||
"type": "asset",
|
||||
"data.ftrackId": {"$in": ftrack_ids}
|
||||
})
|
||||
selected_av_entities = [ent for ent in selected_av_entities]
|
||||
if not selected_av_entities:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "Didn't found entities in avalon"
|
||||
}
|
||||
|
||||
# Remove cached action older than 2 minutes
|
||||
old_action_ids = []
|
||||
for id, data in self.action_data_by_id.items():
|
||||
created_at = data.get("created_at")
|
||||
if not created_at:
|
||||
old_action_ids.append(id)
|
||||
continue
|
||||
cur_time = datetime.now()
|
||||
existing_in_sec = (created_at - cur_time).total_seconds()
|
||||
if existing_in_sec > 60 * 2:
|
||||
old_action_ids.append(id)
|
||||
|
||||
for id in old_action_ids:
|
||||
self.action_data_by_id.pop(id, None)
|
||||
|
||||
# Store data for action id
|
||||
action_id = str(uuid.uuid1())
|
||||
self.action_data_by_id[action_id] = {
|
||||
"attempt": 1,
|
||||
"created_at": datetime.now(),
|
||||
"project_name": project_name,
|
||||
"subset_ids_by_name": {},
|
||||
"subset_ids_by_parent": {}
|
||||
}
|
||||
|
||||
id_item = {
|
||||
"type": "hidden",
|
||||
"name": "action_id",
|
||||
"value": action_id
|
||||
}
|
||||
|
||||
items.append(id_item)
|
||||
asset_ids = [ent["_id"] for ent in selected_av_entities]
|
||||
subsets_for_selection = self.dbcon.find({
|
||||
"type": "subset",
|
||||
"parent": {"$in": asset_ids}
|
||||
})
|
||||
|
||||
asset_ending = ""
|
||||
if len(selected_av_entities) > 1:
|
||||
asset_ending = "s"
|
||||
|
||||
asset_title = {
|
||||
"type": "label",
|
||||
"value": "# Delete asset{}:".format(asset_ending)
|
||||
}
|
||||
asset_note = {
|
||||
"type": "label",
|
||||
"value": (
|
||||
"<p><i>NOTE: Action will delete checked entities"
|
||||
" in Ftrack and Avalon with all children entities and"
|
||||
" published content.</i></p>"
|
||||
)
|
||||
}
|
||||
|
||||
items.append(asset_title)
|
||||
items.append(asset_note)
|
||||
|
||||
asset_items = collections.defaultdict(list)
|
||||
for asset in selected_av_entities:
|
||||
ent_path_items = [project_name]
|
||||
ent_path_items.extend(asset.get("data", {}).get("parents") or [])
|
||||
ent_path_to_parent = "/".join(ent_path_items) + "/"
|
||||
asset_items[ent_path_to_parent].append(asset)
|
||||
|
||||
for asset_parent_path, assets in sorted(asset_items.items()):
|
||||
items.append({
|
||||
"type": "label",
|
||||
"value": "## <b>- {}</b>".format(asset_parent_path)
|
||||
})
|
||||
|
||||
if av_entity is None:
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'Didn\'t found assets in avalon'
|
||||
}
|
||||
|
||||
asset_label = {
|
||||
'type': 'label',
|
||||
'value': '## Delete whole asset: ##'
|
||||
}
|
||||
asset_item = {
|
||||
'label': av_entity['name'],
|
||||
'name': 'whole_asset',
|
||||
'type': 'boolean',
|
||||
'value': False
|
||||
}
|
||||
splitter = {
|
||||
'type': 'label',
|
||||
'value': '{}'.format(200*"-")
|
||||
}
|
||||
subset_label = {
|
||||
'type': 'label',
|
||||
'value': '## Subsets: ##'
|
||||
}
|
||||
if av_entity is not None:
|
||||
items.append(asset_label)
|
||||
items.append(asset_item)
|
||||
items.append(splitter)
|
||||
|
||||
all_subsets = self.db.find({
|
||||
'type': 'subset',
|
||||
'parent': av_entity['_id']
|
||||
for asset in assets:
|
||||
items.append({
|
||||
"label": asset["name"],
|
||||
"name": "{}{}".format(
|
||||
self.asset_prefix, str(asset["_id"])
|
||||
),
|
||||
"type": 'boolean',
|
||||
"value": False
|
||||
})
|
||||
|
||||
subset_items = []
|
||||
for subset in all_subsets:
|
||||
item = {
|
||||
'label': subset['name'],
|
||||
'name': str(subset['_id']),
|
||||
'type': 'boolean',
|
||||
'value': False
|
||||
}
|
||||
subset_items.append(item)
|
||||
if len(subset_items) > 0:
|
||||
items.append(subset_label)
|
||||
items.extend(subset_items)
|
||||
else:
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'Didn\'t found assets in avalon'
|
||||
}
|
||||
subset_ids_by_name = collections.defaultdict(list)
|
||||
subset_ids_by_parent = collections.defaultdict(list)
|
||||
for subset in subsets_for_selection:
|
||||
subset_id = subset["_id"]
|
||||
name = subset["name"]
|
||||
parent_id = subset["parent"]
|
||||
subset_ids_by_name[name].append(subset_id)
|
||||
subset_ids_by_parent[parent_id].append(subset_id)
|
||||
|
||||
if not subset_ids_by_name:
|
||||
return {
|
||||
'items': items,
|
||||
'title': title
|
||||
"items": items,
|
||||
"title": title
|
||||
}
|
||||
|
||||
def confirm_delete(self, first_attempt, entities, event):
|
||||
if first_attempt is True:
|
||||
if 'values' not in event['data']:
|
||||
return
|
||||
subset_ending = ""
|
||||
if len(subset_ids_by_name.keys()) > 1:
|
||||
subset_ending = "s"
|
||||
|
||||
values = event['data']['values']
|
||||
subset_title = {
|
||||
"type": "label",
|
||||
"value": "# Subset{} to delete:".format(subset_ending)
|
||||
}
|
||||
subset_note = {
|
||||
"type": "label",
|
||||
"value": (
|
||||
"<p><i>WARNING: Subset{} will be removed"
|
||||
" for all <b>selected</b> entities.</i></p>"
|
||||
).format(subset_ending)
|
||||
}
|
||||
|
||||
if len(values) <= 0:
|
||||
return
|
||||
if 'whole_asset' not in values:
|
||||
return
|
||||
else:
|
||||
values = self.values
|
||||
items.append(self.splitter)
|
||||
items.append(subset_title)
|
||||
items.append(subset_note)
|
||||
|
||||
title = 'Confirmation of deleting {}'
|
||||
if values['whole_asset'] is True:
|
||||
title = title.format(
|
||||
'whole asset {}'.format(
|
||||
entities[0]['name']
|
||||
)
|
||||
)
|
||||
else:
|
||||
subsets = []
|
||||
for key, value in values.items():
|
||||
if value is True:
|
||||
subsets.append(key)
|
||||
len_subsets = len(subsets)
|
||||
if len_subsets == 0:
|
||||
for name in subset_ids_by_name:
|
||||
items.append({
|
||||
"label": "<b>{}</b>".format(name),
|
||||
"name": "{}{}".format(self.subset_prefix, name),
|
||||
"type": "boolean",
|
||||
"value": False
|
||||
})
|
||||
|
||||
self.action_data_by_id[action_id]["subset_ids_by_parent"] = (
|
||||
subset_ids_by_parent
|
||||
)
|
||||
self.action_data_by_id[action_id]["subset_ids_by_name"] = (
|
||||
subset_ids_by_name
|
||||
)
|
||||
|
||||
return {
|
||||
"items": items,
|
||||
"title": title
|
||||
}
|
||||
|
||||
def confirm_delete(self, entities, event):
|
||||
values = event["data"]["values"]
|
||||
action_id = values.get("action_id")
|
||||
spec_data = self.action_data_by_id.get(action_id)
|
||||
if not spec_data:
|
||||
# it is a bug if this happens!
|
||||
return {
|
||||
"success": False,
|
||||
"message": "Something bad has happened. Please try again."
|
||||
}
|
||||
|
||||
# Process Delete confirmation
|
||||
delete_key = values.get("delete_key")
|
||||
if delete_key:
|
||||
delete_key = delete_key.lower().strip()
|
||||
# Go to launch part if user entered `delete`
|
||||
if delete_key == "delete":
|
||||
return
|
||||
# Skip whole process if user didn't enter any text
|
||||
elif delete_key == "":
|
||||
self.action_data_by_id.pop(action_id, None)
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'Nothing was selected to delete'
|
||||
"success": True,
|
||||
"message": "Deleting cancelled (delete entry was empty)"
|
||||
}
|
||||
elif len_subsets == 1:
|
||||
title = title.format(
|
||||
'{} subset'.format(len_subsets)
|
||||
)
|
||||
else:
|
||||
title = title.format(
|
||||
'{} subsets'.format(len_subsets)
|
||||
)
|
||||
# Get data to show again
|
||||
to_delete = spec_data["to_delete"]
|
||||
|
||||
else:
|
||||
to_delete = collections.defaultdict(list)
|
||||
for key, value in values.items():
|
||||
if not value:
|
||||
continue
|
||||
if key.startswith(self.asset_prefix):
|
||||
_key = key.replace(self.asset_prefix, "")
|
||||
to_delete["assets"].append(_key)
|
||||
|
||||
elif key.startswith(self.subset_prefix):
|
||||
_key = key.replace(self.subset_prefix, "")
|
||||
to_delete["subsets"].append(_key)
|
||||
|
||||
self.action_data_by_id[action_id]["to_delete"] = to_delete
|
||||
|
||||
asset_to_delete = len(to_delete.get("assets") or []) > 0
|
||||
subset_to_delete = len(to_delete.get("subsets") or []) > 0
|
||||
|
||||
if not asset_to_delete and not subset_to_delete:
|
||||
self.action_data_by_id.pop(action_id, None)
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Nothing was selected to delete"
|
||||
}
|
||||
|
||||
attempt = spec_data["attempt"]
|
||||
if attempt > 3:
|
||||
self.action_data_by_id.pop(action_id, None)
|
||||
return {
|
||||
"success": False,
|
||||
"message": "You didn't enter \"DELETE\" properly 3 times!"
|
||||
}
|
||||
|
||||
self.action_data_by_id[action_id]["attempt"] += 1
|
||||
|
||||
title = "Confirmation of deleting"
|
||||
|
||||
if asset_to_delete:
|
||||
asset_len = len(to_delete["assets"])
|
||||
asset_ending = ""
|
||||
if asset_len > 1:
|
||||
asset_ending = "s"
|
||||
title += " {} Asset{}".format(asset_len, asset_ending)
|
||||
if subset_to_delete:
|
||||
title += " and"
|
||||
|
||||
if subset_to_delete:
|
||||
sub_len = len(to_delete["subsets"])
|
||||
type_ending = ""
|
||||
sub_ending = ""
|
||||
if sub_len == 1:
|
||||
subset_ids_by_name = spec_data["subset_ids_by_name"]
|
||||
if len(subset_ids_by_name[to_delete["subsets"][0]]) > 1:
|
||||
sub_ending = "s"
|
||||
|
||||
elif sub_len > 1:
|
||||
type_ending = "s"
|
||||
sub_ending = "s"
|
||||
|
||||
title += " {} type{} of subset{}".format(
|
||||
sub_len, type_ending, sub_ending
|
||||
)
|
||||
|
||||
self.values = values
|
||||
items = []
|
||||
|
||||
id_item = {"type": "hidden", "name": "action_id", "value": action_id}
|
||||
delete_label = {
|
||||
'type': 'label',
|
||||
'value': '# Please enter "DELETE" to confirm #'
|
||||
}
|
||||
|
||||
delete_item = {
|
||||
'name': 'delete_key',
|
||||
'type': 'text',
|
||||
'value': '',
|
||||
'empty_text': 'Type Delete here...'
|
||||
"name": "delete_key",
|
||||
"type": "text",
|
||||
"value": "",
|
||||
"empty_text": "Type Delete here..."
|
||||
}
|
||||
|
||||
items.append(id_item)
|
||||
items.append(delete_label)
|
||||
items.append(delete_item)
|
||||
|
||||
return {
|
||||
'items': items,
|
||||
'title': title
|
||||
"items": items,
|
||||
"title": title
|
||||
}
|
||||
|
||||
def launch(self, session, entities, event):
|
||||
if 'values' not in event['data']:
|
||||
return
|
||||
|
||||
values = event['data']['values']
|
||||
if len(values) <= 0:
|
||||
return
|
||||
if 'delete_key' not in values:
|
||||
return
|
||||
|
||||
if values['delete_key'].lower() != 'delete':
|
||||
if values['delete_key'].lower() == '':
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'Deleting cancelled'
|
||||
}
|
||||
if self.attempt < 3:
|
||||
self.attempt += 1
|
||||
return_dict = self.confirm_delete(False, entities, event)
|
||||
return_dict['title'] = '{} ({} attempt)'.format(
|
||||
return_dict['title'], self.attempt
|
||||
)
|
||||
return return_dict
|
||||
self.show_message(event, "Processing...", True)
|
||||
values = event["data"]["values"]
|
||||
action_id = values.get("action_id")
|
||||
spec_data = self.action_data_by_id.get(action_id)
|
||||
if not spec_data:
|
||||
# it is a bug if this happens!
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'You didn\'t enter "DELETE" properly 3 times!'
|
||||
"success": False,
|
||||
"message": "Something bad has happened. Please try again."
|
||||
}
|
||||
|
||||
entity = entities[0]
|
||||
project = entity['project']
|
||||
report_messages = collections.defaultdict(list)
|
||||
|
||||
self.db.Session['AVALON_PROJECT'] = project["full_name"]
|
||||
project_name = spec_data["project_name"]
|
||||
to_delete = spec_data["to_delete"]
|
||||
self.dbcon.Session["AVALON_PROJECT"] = project_name
|
||||
|
||||
all_ids = []
|
||||
if self.values.get('whole_asset', False) is True:
|
||||
av_entity = self.db.find_one({
|
||||
'type': 'asset',
|
||||
'name': entity['name']
|
||||
assets_to_delete = to_delete.get("assets") or []
|
||||
subsets_to_delete = to_delete.get("subsets") or []
|
||||
|
||||
# Convert asset ids to ObjectId obj
|
||||
assets_to_delete = [ObjectId(id) for id in assets_to_delete if id]
|
||||
|
||||
subset_ids_by_parent = spec_data["subset_ids_by_parent"]
|
||||
subset_ids_by_name = spec_data["subset_ids_by_name"]
|
||||
|
||||
subset_ids_to_archive = []
|
||||
asset_ids_to_archive = []
|
||||
ftrack_ids_to_delete = []
|
||||
if len(assets_to_delete) > 0:
|
||||
# Prepare data when deleting whole avalon asset
|
||||
avalon_assets = self.dbcon.find({"type": "asset"})
|
||||
avalon_assets_by_parent = collections.defaultdict(list)
|
||||
for asset in avalon_assets:
|
||||
parent_id = asset["data"]["visualParent"]
|
||||
avalon_assets_by_parent[parent_id].append(asset)
|
||||
if asset["_id"] in assets_to_delete:
|
||||
ftrack_id = asset["data"]["ftrackId"]
|
||||
ftrack_ids_to_delete.append(ftrack_id)
|
||||
|
||||
children_queue = Queue()
|
||||
for mongo_id in assets_to_delete:
|
||||
children_queue.put(mongo_id)
|
||||
|
||||
while not children_queue.empty():
|
||||
mongo_id = children_queue.get()
|
||||
if mongo_id in asset_ids_to_archive:
|
||||
continue
|
||||
|
||||
asset_ids_to_archive.append(mongo_id)
|
||||
for subset_id in subset_ids_by_parent.get(mongo_id, []):
|
||||
if subset_id not in subset_ids_to_archive:
|
||||
subset_ids_to_archive.append(subset_id)
|
||||
|
||||
children = avalon_assets_by_parent.get(mongo_id)
|
||||
if not children:
|
||||
continue
|
||||
|
||||
for child in children:
|
||||
child_id = child["_id"]
|
||||
if child_id not in asset_ids_to_archive:
|
||||
children_queue.put(child_id)
|
||||
|
||||
# Prepare names of assets in ftrack and ids of subsets in mongo
|
||||
asset_names_to_delete = []
|
||||
if len(subsets_to_delete) > 0:
|
||||
for name in subsets_to_delete:
|
||||
asset_names_to_delete.append(name)
|
||||
for subset_id in subset_ids_by_name[name]:
|
||||
if subset_id in subset_ids_to_archive:
|
||||
continue
|
||||
subset_ids_to_archive.append(subset_id)
|
||||
|
||||
# Get ftrack ids of entities where will be delete only asset
|
||||
not_deleted_entities_id = []
|
||||
ftrack_id_name_map = {}
|
||||
if asset_names_to_delete:
|
||||
for entity in entities:
|
||||
ftrack_id = entity["id"]
|
||||
ftrack_id_name_map[ftrack_id] = entity["name"]
|
||||
if ftrack_id in ftrack_ids_to_delete:
|
||||
continue
|
||||
not_deleted_entities_id.append(ftrack_id)
|
||||
|
||||
mongo_proc_txt = "MongoProcessing: "
|
||||
ftrack_proc_txt = "Ftrack processing: "
|
||||
if asset_ids_to_archive:
|
||||
self.log.debug("{}Archivation of assets <{}>".format(
|
||||
mongo_proc_txt,
|
||||
", ".join([str(id) for id in asset_ids_to_archive])
|
||||
))
|
||||
self.dbcon.update_many(
|
||||
{
|
||||
"_id": {"$in": asset_ids_to_archive},
|
||||
"type": "asset"
|
||||
},
|
||||
{"$set": {"type": "archived_asset"}}
|
||||
)
|
||||
|
||||
if subset_ids_to_archive:
|
||||
self.log.debug("{}Archivation of subsets <{}>".format(
|
||||
mongo_proc_txt,
|
||||
", ".join([str(id) for id in subset_ids_to_archive])
|
||||
))
|
||||
self.dbcon.update_many(
|
||||
{
|
||||
"_id": {"$in": subset_ids_to_archive},
|
||||
"type": "subset"
|
||||
},
|
||||
{"$set": {"type": "archived_subset"}}
|
||||
)
|
||||
|
||||
if ftrack_ids_to_delete:
|
||||
self.log.debug("{}Deleting Ftrack Entities <{}>".format(
|
||||
ftrack_proc_txt, ", ".join(ftrack_ids_to_delete)
|
||||
))
|
||||
|
||||
joined_ids_to_delete = ", ".join(
|
||||
["\"{}\"".format(id) for id in ftrack_ids_to_delete]
|
||||
)
|
||||
ftrack_ents_to_delete = self.session.query(
|
||||
"select id, link from TypedContext where id in ({})".format(
|
||||
joined_ids_to_delete
|
||||
)
|
||||
).all()
|
||||
for entity in ftrack_ents_to_delete:
|
||||
self.session.delete(entity)
|
||||
try:
|
||||
self.session.commit()
|
||||
except Exception:
|
||||
ent_path = "/".join(
|
||||
[ent["name"] for ent in entity["link"]]
|
||||
)
|
||||
msg = "Failed to delete entity"
|
||||
report_messages[msg].append(ent_path)
|
||||
self.session.rollback()
|
||||
self.log.warning(
|
||||
"{} <{}>".format(msg, ent_path),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
if not_deleted_entities_id:
|
||||
joined_not_deleted = ", ".join([
|
||||
"\"{}\"".format(ftrack_id)
|
||||
for ftrack_id in not_deleted_entities_id
|
||||
])
|
||||
joined_asset_names = ", ".join([
|
||||
"\"{}\"".format(name)
|
||||
for name in asset_names_to_delete
|
||||
])
|
||||
# Find assets of selected entities with names of checked subsets
|
||||
assets = self.session.query((
|
||||
"select id from Asset where"
|
||||
" context_id in ({}) and name in ({})"
|
||||
).format(joined_not_deleted, joined_asset_names)).all()
|
||||
|
||||
self.log.debug("{}Deleting Ftrack Assets <{}>".format(
|
||||
ftrack_proc_txt,
|
||||
", ".join([asset["id"] for asset in assets])
|
||||
))
|
||||
for asset in assets:
|
||||
self.session.delete(asset)
|
||||
try:
|
||||
self.session.commit()
|
||||
except Exception:
|
||||
self.session.rollback()
|
||||
msg = "Failed to delete asset"
|
||||
report_messages[msg].append(asset["id"])
|
||||
self.log.warning(
|
||||
"{} <{}>".format(asset["id"]),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
return self.report_handle(report_messages, project_name, event)
|
||||
|
||||
def report_handle(self, report_messages, project_name, event):
|
||||
if not report_messages:
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Deletion was successful!"
|
||||
}
|
||||
|
||||
title = "Delete report ({}):".format(project_name)
|
||||
items = []
|
||||
items.append({
|
||||
"type": "label",
|
||||
"value": "# Deleting was not completely successful"
|
||||
})
|
||||
items.append({
|
||||
"type": "label",
|
||||
"value": "<p><i>Check logs for more information</i></p>"
|
||||
})
|
||||
for msg, _items in report_messages.items():
|
||||
if not _items or not msg:
|
||||
continue
|
||||
|
||||
items.append({
|
||||
"type": "label",
|
||||
"value": "# {}".format(msg)
|
||||
})
|
||||
|
||||
if av_entity is not None:
|
||||
all_ids.append(av_entity['_id'])
|
||||
all_ids.extend(self.find_child(av_entity))
|
||||
if isinstance(_items, str):
|
||||
_items = [_items]
|
||||
items.append({
|
||||
"type": "label",
|
||||
"value": '<p>{}</p>'.format("<br>".join(_items))
|
||||
})
|
||||
items.append(self.splitter)
|
||||
|
||||
session.delete(entity)
|
||||
session.commit()
|
||||
else:
|
||||
subset_names = []
|
||||
for key, value in self.values.items():
|
||||
if key == 'delete_key' or value is False:
|
||||
continue
|
||||
|
||||
entity_id = ObjectId(key)
|
||||
av_entity = self.db.find_one({'_id': entity_id})
|
||||
subset_names.append(av_entity['name'])
|
||||
if av_entity is None:
|
||||
continue
|
||||
all_ids.append(entity_id)
|
||||
all_ids.extend(self.find_child(av_entity))
|
||||
|
||||
for ft_asset in entity['assets']:
|
||||
if ft_asset['name'] in subset_names:
|
||||
session.delete(ft_asset)
|
||||
session.commit()
|
||||
|
||||
if len(all_ids) == 0:
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'No entities to delete in avalon'
|
||||
}
|
||||
|
||||
delete_query = {'_id': {'$in': all_ids}}
|
||||
self.db.delete_many(delete_query)
|
||||
self.show_interface(items, title, event)
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'All assets were deleted!'
|
||||
"success": False,
|
||||
"message": "Deleting finished. Read report messages."
|
||||
}
|
||||
|
||||
def find_child(self, entity):
|
||||
output = []
|
||||
id = entity['_id']
|
||||
visuals = [x for x in self.db.find({'data.visualParent': id})]
|
||||
assert len(visuals) == 0, 'This asset has another asset as child'
|
||||
childs = self.db.find({'parent': id})
|
||||
for child in childs:
|
||||
output.append(child['_id'])
|
||||
output.extend(self.find_child(child))
|
||||
return output
|
||||
|
||||
def find_assets(self, asset_names):
|
||||
assets = []
|
||||
for name in asset_names:
|
||||
entity = self.db.find_one({
|
||||
'type': 'asset',
|
||||
'name': name
|
||||
})
|
||||
if entity is not None and entity not in assets:
|
||||
assets.append(entity)
|
||||
return assets
|
||||
|
||||
|
||||
def register(session, plugins_presets={}):
|
||||
'''Register plugin. Called when used as an plugin.'''
|
||||
|
||||
DeleteAsset(session, plugins_presets).register()
|
||||
|
||||
|
||||
def main(arguments=None):
|
||||
'''Set up logging and register action.'''
|
||||
if arguments is None:
|
||||
arguments = []
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
# Allow setting of logging level from arguments.
|
||||
loggingLevels = {}
|
||||
for level in (
|
||||
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
|
||||
logging.ERROR, logging.CRITICAL
|
||||
):
|
||||
loggingLevels[logging.getLevelName(level).lower()] = level
|
||||
|
||||
parser.add_argument(
|
||||
'-v', '--verbosity',
|
||||
help='Set the logging output verbosity.',
|
||||
choices=loggingLevels.keys(),
|
||||
default='info'
|
||||
)
|
||||
namespace = parser.parse_args(arguments)
|
||||
|
||||
# Set up basic logging
|
||||
logging.basicConfig(level=loggingLevels[namespace.verbosity])
|
||||
|
||||
session = ftrack_api.Session()
|
||||
|
||||
register(session)
|
||||
|
||||
# Wait for events
|
||||
logging.info(
|
||||
'Registered actions and listening for events. Use Ctrl-C to abort.'
|
||||
)
|
||||
session.event_hub.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit(main(sys.argv[1:]))
|
||||
DeleteAssetSubset(session, plugins_presets).register()
|
||||
|
|
|
|||
|
|
@ -1,175 +0,0 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import ftrack_api
|
||||
from pype.ftrack import BaseAction
|
||||
from pype.ftrack.lib.io_nonsingleton import DbConnector
|
||||
|
||||
|
||||
class AssetsRemover(BaseAction):
|
||||
'''Edit meta data action.'''
|
||||
|
||||
#: Action identifier.
|
||||
identifier = 'remove.assets'
|
||||
#: Action label.
|
||||
label = "Pype Admin"
|
||||
variant = '- Delete Assets by Name'
|
||||
#: Action description.
|
||||
description = 'Removes assets from Ftrack and Avalon db with all childs'
|
||||
#: roles that are allowed to register this action
|
||||
role_list = ['Pypeclub', 'Administrator']
|
||||
icon = '{}/ftrack/action_icons/PypeAdmin.svg'.format(
|
||||
os.environ.get('PYPE_STATICS_SERVER', '')
|
||||
)
|
||||
#: Db
|
||||
db = DbConnector()
|
||||
|
||||
def discover(self, session, entities, event):
|
||||
''' Validation '''
|
||||
if len(entities) != 1:
|
||||
return False
|
||||
|
||||
valid = ["show", "task"]
|
||||
entityType = event["data"]["selection"][0].get("entityType", "")
|
||||
if entityType.lower() not in valid:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def interface(self, session, entities, event):
|
||||
if not event['data'].get('values', {}):
|
||||
title = 'Enter Asset names to delete'
|
||||
|
||||
items = []
|
||||
for i in range(15):
|
||||
|
||||
item = {
|
||||
'label': 'Asset {}'.format(i+1),
|
||||
'name': 'asset_{}'.format(i+1),
|
||||
'type': 'text',
|
||||
'value': ''
|
||||
}
|
||||
items.append(item)
|
||||
|
||||
return {
|
||||
'items': items,
|
||||
'title': title
|
||||
}
|
||||
|
||||
def launch(self, session, entities, event):
|
||||
entity = entities[0]
|
||||
if entity.entity_type.lower() != 'Project':
|
||||
project = entity['project']
|
||||
else:
|
||||
project = entity
|
||||
|
||||
if 'values' not in event['data']:
|
||||
return
|
||||
|
||||
values = event['data']['values']
|
||||
if len(values) <= 0:
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'No Assets to delete!'
|
||||
}
|
||||
|
||||
asset_names = []
|
||||
|
||||
for k, v in values.items():
|
||||
if v.replace(' ', '') != '':
|
||||
asset_names.append(v)
|
||||
|
||||
self.db.install()
|
||||
self.db.Session['AVALON_PROJECT'] = project["full_name"]
|
||||
|
||||
assets = self.find_assets(asset_names)
|
||||
|
||||
all_ids = []
|
||||
for asset in assets:
|
||||
all_ids.append(asset['_id'])
|
||||
all_ids.extend(self.find_child(asset))
|
||||
|
||||
if len(all_ids) == 0:
|
||||
self.db.uninstall()
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'None of assets'
|
||||
}
|
||||
|
||||
delete_query = {'_id': {'$in': all_ids}}
|
||||
self.db.delete_many(delete_query)
|
||||
|
||||
self.db.uninstall()
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'All assets were deleted!'
|
||||
}
|
||||
|
||||
def find_child(self, entity):
|
||||
output = []
|
||||
id = entity['_id']
|
||||
visuals = [x for x in self.db.find({'data.visualParent': id})]
|
||||
assert len(visuals) == 0, 'This asset has another asset as child'
|
||||
childs = self.db.find({'parent': id})
|
||||
for child in childs:
|
||||
output.append(child['_id'])
|
||||
output.extend(self.find_child(child))
|
||||
return output
|
||||
|
||||
def find_assets(self, asset_names):
|
||||
assets = []
|
||||
for name in asset_names:
|
||||
entity = self.db.find_one({
|
||||
'type': 'asset',
|
||||
'name': name
|
||||
})
|
||||
if entity is not None and entity not in assets:
|
||||
assets.append(entity)
|
||||
return assets
|
||||
|
||||
|
||||
def register(session, plugins_presets={}):
|
||||
'''Register plugin. Called when used as an plugin.'''
|
||||
|
||||
AssetsRemover(session, plugins_presets).register()
|
||||
|
||||
|
||||
def main(arguments=None):
|
||||
'''Set up logging and register action.'''
|
||||
if arguments is None:
|
||||
arguments = []
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
# Allow setting of logging level from arguments.
|
||||
loggingLevels = {}
|
||||
for level in (
|
||||
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
|
||||
logging.ERROR, logging.CRITICAL
|
||||
):
|
||||
loggingLevels[logging.getLevelName(level).lower()] = level
|
||||
|
||||
parser.add_argument(
|
||||
'-v', '--verbosity',
|
||||
help='Set the logging output verbosity.',
|
||||
choices=loggingLevels.keys(),
|
||||
default='info'
|
||||
)
|
||||
namespace = parser.parse_args(arguments)
|
||||
|
||||
# Set up basic logging
|
||||
logging.basicConfig(level=loggingLevels[namespace.verbosity])
|
||||
|
||||
session = ftrack_api.Session()
|
||||
|
||||
register(session)
|
||||
|
||||
# Wait for events
|
||||
logging.info(
|
||||
'Registered actions and listening for events. Use Ctrl-C to abort.'
|
||||
)
|
||||
session.event_hub.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit(main(sys.argv[1:]))
|
||||
|
|
@ -4,6 +4,7 @@ from pypeapp import config
|
|||
|
||||
class VersionToTaskStatus(BaseEvent):
|
||||
|
||||
# Presets usage
|
||||
default_status_mapping = {}
|
||||
|
||||
def launch(self, session, event):
|
||||
|
|
@ -11,69 +12,124 @@ class VersionToTaskStatus(BaseEvent):
|
|||
|
||||
# start of event procedure ----------------------------------
|
||||
for entity in event['data'].get('entities', []):
|
||||
# Filter non-assetversions
|
||||
if (
|
||||
entity['entityType'] == 'assetversion' and
|
||||
'statusid' in (entity.get('keys') or [])
|
||||
):
|
||||
# Filter AssetVersions
|
||||
if entity["entityType"] != "assetversion":
|
||||
continue
|
||||
|
||||
version = session.get('AssetVersion', entity['entityId'])
|
||||
try:
|
||||
version_status = session.get(
|
||||
'Status', entity['changes']['statusid']['new']
|
||||
)
|
||||
except Exception:
|
||||
# Skip if statusid not in keys (in changes)
|
||||
keys = entity.get("keys")
|
||||
if not keys or "statusid" not in keys:
|
||||
continue
|
||||
|
||||
# Get new version task name
|
||||
version_status_id = (
|
||||
entity
|
||||
.get("changes", {})
|
||||
.get("statusid", {})
|
||||
.get("new", {})
|
||||
)
|
||||
|
||||
# Just check that `new` is set to any value
|
||||
if not version_status_id:
|
||||
continue
|
||||
|
||||
try:
|
||||
version_status = session.get("Status", version_status_id)
|
||||
except Exception:
|
||||
self.log.warning(
|
||||
"Troubles with query status id [ {} ]".format(
|
||||
version_status_id
|
||||
),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
if not version_status:
|
||||
continue
|
||||
|
||||
version_status_orig = version_status["name"]
|
||||
|
||||
# Load status mapping from presets
|
||||
status_mapping = (
|
||||
config.get_presets()
|
||||
.get("ftrack", {})
|
||||
.get("ftrack_config", {})
|
||||
.get("status_version_to_task")
|
||||
) or self.default_status_mapping
|
||||
|
||||
# Skip if mapping is empty
|
||||
if not status_mapping:
|
||||
continue
|
||||
|
||||
# Lower version status name and check if has mapping
|
||||
version_status = version_status_orig.lower()
|
||||
new_status_names = status_mapping.get(version_status)
|
||||
if not new_status_names:
|
||||
continue
|
||||
|
||||
self.log.debug(
|
||||
"Processing AssetVersion status change: [ {} ]".format(
|
||||
version_status_orig
|
||||
)
|
||||
)
|
||||
|
||||
# Backwards compatibility (convert string to list)
|
||||
if isinstance(new_status_names, str):
|
||||
new_status_names = [new_status_names]
|
||||
|
||||
# Lower all names from presets
|
||||
new_status_names = [name.lower() for name in new_status_names]
|
||||
|
||||
# Get entities necessary for processing
|
||||
version = session.get("AssetVersion", entity["entityId"])
|
||||
task = version.get("task")
|
||||
if not task:
|
||||
continue
|
||||
|
||||
project_schema = task["project"]["project_schema"]
|
||||
# Get all available statuses for Task
|
||||
statuses = project_schema.get_statuses("Task", task["type_id"])
|
||||
# map lowered status name with it's object
|
||||
stat_names_low = {
|
||||
status["name"].lower(): status for status in statuses
|
||||
}
|
||||
|
||||
new_status = None
|
||||
for status_name in new_status_names:
|
||||
if status_name not in stat_names_low:
|
||||
continue
|
||||
task_status = version_status
|
||||
task = version['task']
|
||||
self.log.info('>>> version status: [ {} ]'.format(
|
||||
version_status['name']))
|
||||
|
||||
version_name_low = version_status['name'].lower()
|
||||
# store object of found status
|
||||
new_status = stat_names_low[status_name]
|
||||
self.log.debug("Status to set: [ {} ]".format(
|
||||
new_status["name"]
|
||||
))
|
||||
break
|
||||
|
||||
status_mapping = (
|
||||
config.get_presets()
|
||||
.get("ftrack", {})
|
||||
.get("ftrack_config", {})
|
||||
.get("status_version_to_task")
|
||||
) or self.default_status_mapping
|
||||
# Skip if status names were not found for paticulat entity
|
||||
if not new_status:
|
||||
self.log.warning(
|
||||
"Any of statuses from presets can be set: {}".format(
|
||||
str(new_status_names)
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
status_to_set = status_mapping.get(version_name_low)
|
||||
# Get full path to task for logging
|
||||
ent_path = "/".join([ent["name"] for ent in task["link"]])
|
||||
|
||||
self.log.info(
|
||||
'>>> status to set: [ {} ]'.format(status_to_set))
|
||||
|
||||
if status_to_set is not None:
|
||||
query = 'Status where name is "{}"'.format(status_to_set)
|
||||
try:
|
||||
task_status = session.query(query).one()
|
||||
except Exception:
|
||||
self.log.info(
|
||||
'!!! status was not found in Ftrack [ {} ]'.format(
|
||||
status_to_set
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
# Proceed if the task status was set
|
||||
if task_status is not None:
|
||||
# Get path to task
|
||||
path = task['name']
|
||||
for p in task['ancestors']:
|
||||
path = p['name'] + '/' + path
|
||||
|
||||
# Setting task status
|
||||
try:
|
||||
task['status'] = task_status
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
self.log.warning('!!! [ {} ] status couldnt be set:\
|
||||
[ {} ]'.format(path, e))
|
||||
session.rollback()
|
||||
else:
|
||||
self.log.info('>>> [ {} ] updated to [ {} ]'.format(
|
||||
path, task_status['name']))
|
||||
# Setting task status
|
||||
try:
|
||||
task["status"] = new_status
|
||||
session.commit()
|
||||
self.log.debug("[ {} ] Status updated to [ {} ]".format(
|
||||
ent_path, new_status['name']
|
||||
))
|
||||
except Exception:
|
||||
session.rollback()
|
||||
self.log.warning(
|
||||
"[ {} ]Status couldn't be set".format(ent_path),
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def register(session, plugins_presets):
|
||||
|
|
|
|||
|
|
@ -1,10 +1,32 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import getpass
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import datetime
|
||||
import time
|
||||
import queue
|
||||
import pymongo
|
||||
|
||||
import requests
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
try:
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
except ImportError:
|
||||
from urlparse import urlparse, parse_qs
|
||||
|
||||
from pypeapp import Logger
|
||||
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
|
||||
|
||||
def ftrack_events_mongo_settings():
|
||||
host = None
|
||||
|
|
@ -49,7 +71,9 @@ def ftrack_events_mongo_settings():
|
|||
|
||||
|
||||
def get_ftrack_event_mongo_info():
|
||||
host, port, database, username, password, collection, auth_db = ftrack_events_mongo_settings()
|
||||
host, port, database, username, password, collection, auth_db = (
|
||||
ftrack_events_mongo_settings()
|
||||
)
|
||||
user_pass = ""
|
||||
if username and password:
|
||||
user_pass = "{}:{}@".format(username, password)
|
||||
|
|
@ -97,3 +121,303 @@ def check_ftrack_url(url, log_errors=True):
|
|||
print('DEBUG: Ftrack server {} is accessible.'.format(url))
|
||||
|
||||
return url
|
||||
|
||||
|
||||
class StorerEventHub(ftrack_api.event.hub.EventHub):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(StorerEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "heartbeat":
|
||||
# Reply with heartbeat.
|
||||
self.sock.sendall(b"storer")
|
||||
return self._send_packet(self._code_name_mapping['heartbeat'])
|
||||
|
||||
elif code_name == "connect":
|
||||
event = ftrack_api.event.base.Event(
|
||||
topic="pype.storer.started",
|
||||
data={},
|
||||
source={
|
||||
"id": self.id,
|
||||
"user": {"username": self._api_user}
|
||||
}
|
||||
)
|
||||
self._event_queue.put(event)
|
||||
|
||||
return super(StorerEventHub, self)._handle_packet(
|
||||
code, packet_identifier, path, data
|
||||
)
|
||||
|
||||
|
||||
class ProcessEventHub(ftrack_api.event.hub.EventHub):
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
is_table_created = False
|
||||
pypelog = Logger().get_logger("Session Processor")
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dbcon = DbConnector(
|
||||
mongo_url=self.url,
|
||||
database_name=self.database,
|
||||
table_name=self.table_name
|
||||
)
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(ProcessEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def prepare_dbcon(self):
|
||||
try:
|
||||
self.dbcon.install()
|
||||
self.dbcon._database.list_collection_names()
|
||||
except pymongo.errors.AutoReconnect:
|
||||
self.pypelog.error(
|
||||
"Mongo server \"{}\" is not responding, exiting.".format(
|
||||
os.environ["AVALON_MONGO"]
|
||||
)
|
||||
)
|
||||
sys.exit(0)
|
||||
|
||||
except pymongo.errors.OperationFailure:
|
||||
self.pypelog.error((
|
||||
"Error with Mongo access, probably permissions."
|
||||
"Check if exist database with name \"{}\""
|
||||
" and collection \"{}\" inside."
|
||||
).format(self.database, self.table_name))
|
||||
self.sock.sendall(b"MongoError")
|
||||
sys.exit(0)
|
||||
|
||||
def wait(self, duration=None):
|
||||
"""Overriden wait
|
||||
|
||||
Event are loaded from Mongo DB when queue is empty. Handled event is
|
||||
set as processed in Mongo DB.
|
||||
"""
|
||||
started = time.time()
|
||||
self.prepare_dbcon()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.load_events():
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
try:
|
||||
self._handle(event)
|
||||
self.dbcon.update_one(
|
||||
{"id": event["id"]},
|
||||
{"$set": {"pype_data.is_processed": True}}
|
||||
)
|
||||
except pymongo.errors.AutoReconnect:
|
||||
self.pypelog.error((
|
||||
"Mongo server \"{}\" is not responding, exiting."
|
||||
).format(os.environ["AVALON_MONGO"]))
|
||||
sys.exit(0)
|
||||
# Additional special processing of events.
|
||||
if event['topic'] == 'ftrack.meta.disconnected':
|
||||
break
|
||||
|
||||
if duration is not None:
|
||||
if (time.time() - started) > duration:
|
||||
break
|
||||
|
||||
def load_events(self):
|
||||
"""Load not processed events sorted by stored date"""
|
||||
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
|
||||
result = self.dbcon.delete_many({
|
||||
"pype_data.stored": {"$lte": ago_date},
|
||||
"pype_data.is_processed": True
|
||||
})
|
||||
|
||||
not_processed_events = self.dbcon.find(
|
||||
{"pype_data.is_processed": False}
|
||||
).sort(
|
||||
[("pype_data.stored", pymongo.ASCENDING)]
|
||||
)
|
||||
|
||||
found = False
|
||||
for event_data in not_processed_events:
|
||||
new_event_data = {
|
||||
k: v for k, v in event_data.items()
|
||||
if k not in ["_id", "pype_data"]
|
||||
}
|
||||
try:
|
||||
event = ftrack_api.event.base.Event(**new_event_data)
|
||||
except Exception:
|
||||
self.logger.exception(L(
|
||||
'Failed to convert payload into event: {0}',
|
||||
event_data
|
||||
))
|
||||
continue
|
||||
found = True
|
||||
self._event_queue.put(event)
|
||||
|
||||
return found
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which skip events and extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "event":
|
||||
return
|
||||
if code_name == "heartbeat":
|
||||
self.sock.sendall(b"processor")
|
||||
return self._send_packet(self._code_name_mapping["heartbeat"])
|
||||
|
||||
return super()._handle_packet(code, packet_identifier, path, data)
|
||||
class SocketSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None, Eventhub=None
|
||||
):
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
if Eventhub is None:
|
||||
Eventhub = ftrack_api.event.hub.EventHub
|
||||
self._event_hub = Eventhub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,292 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import datetime
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import queue
|
||||
import pymongo
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Session processor")
|
||||
|
||||
|
||||
class ProcessEventHub(ftrack_api.event.hub.EventHub):
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
is_table_created = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dbcon = DbConnector(
|
||||
mongo_url=self.url,
|
||||
database_name=self.database,
|
||||
table_name=self.table_name
|
||||
)
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(ProcessEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def prepare_dbcon(self):
|
||||
try:
|
||||
self.dbcon.install()
|
||||
self.dbcon._database.list_collection_names()
|
||||
except pymongo.errors.AutoReconnect:
|
||||
log.error("Mongo server \"{}\" is not responding, exiting.".format(
|
||||
os.environ["AVALON_MONGO"]
|
||||
))
|
||||
sys.exit(0)
|
||||
|
||||
except pymongo.errors.OperationFailure:
|
||||
log.error((
|
||||
"Error with Mongo access, probably permissions."
|
||||
"Check if exist database with name \"{}\""
|
||||
" and collection \"{}\" inside."
|
||||
).format(self.database, self.table_name))
|
||||
self.sock.sendall(b"MongoError")
|
||||
sys.exit(0)
|
||||
|
||||
def wait(self, duration=None):
|
||||
"""Overriden wait
|
||||
|
||||
Event are loaded from Mongo DB when queue is empty. Handled event is
|
||||
set as processed in Mongo DB.
|
||||
"""
|
||||
started = time.time()
|
||||
self.prepare_dbcon()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.load_events():
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
try:
|
||||
self._handle(event)
|
||||
self.dbcon.update_one(
|
||||
{"id": event["id"]},
|
||||
{"$set": {"pype_data.is_processed": True}}
|
||||
)
|
||||
except pymongo.errors.AutoReconnect:
|
||||
log.error((
|
||||
"Mongo server \"{}\" is not responding, exiting."
|
||||
).format(os.environ["AVALON_MONGO"]))
|
||||
sys.exit(0)
|
||||
# Additional special processing of events.
|
||||
if event['topic'] == 'ftrack.meta.disconnected':
|
||||
break
|
||||
|
||||
if duration is not None:
|
||||
if (time.time() - started) > duration:
|
||||
break
|
||||
|
||||
def load_events(self):
|
||||
"""Load not processed events sorted by stored date"""
|
||||
ago_date = datetime.datetime.now() - datetime.timedelta(days=3)
|
||||
result = self.dbcon.delete_many({
|
||||
"pype_data.stored": {"$lte": ago_date},
|
||||
"pype_data.is_processed": True
|
||||
})
|
||||
|
||||
not_processed_events = self.dbcon.find(
|
||||
{"pype_data.is_processed": False}
|
||||
).sort(
|
||||
[("pype_data.stored", pymongo.ASCENDING)]
|
||||
)
|
||||
|
||||
found = False
|
||||
for event_data in not_processed_events:
|
||||
new_event_data = {
|
||||
k: v for k, v in event_data.items()
|
||||
if k not in ["_id", "pype_data"]
|
||||
}
|
||||
try:
|
||||
event = ftrack_api.event.base.Event(**new_event_data)
|
||||
except Exception:
|
||||
self.logger.exception(L(
|
||||
'Failed to convert payload into event: {0}',
|
||||
event_data
|
||||
))
|
||||
continue
|
||||
found = True
|
||||
self._event_queue.put(event)
|
||||
|
||||
return found
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which skip events and extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "event":
|
||||
return
|
||||
if code_name == "heartbeat":
|
||||
self.sock.sendall(b"processor")
|
||||
return self._send_packet(self._code_name_mapping["heartbeat"])
|
||||
|
||||
return super()._handle_packet(code, packet_identifier, path, data)
|
||||
|
||||
|
||||
class ProcessSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = ProcessEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
@ -1,269 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
import atexit
|
||||
import tempfile
|
||||
import threading
|
||||
import requests
|
||||
|
||||
import ftrack_api
|
||||
import ftrack_api.session
|
||||
import ftrack_api.cache
|
||||
import ftrack_api.operation
|
||||
import ftrack_api._centralized_storage_scenario
|
||||
import ftrack_api.event
|
||||
from ftrack_api.logging import LazyLogMessage as L
|
||||
|
||||
|
||||
class StorerEventHub(ftrack_api.event.hub.EventHub):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.sock = kwargs.pop("sock")
|
||||
super(StorerEventHub, self).__init__(*args, **kwargs)
|
||||
|
||||
def _handle_packet(self, code, packet_identifier, path, data):
|
||||
"""Override `_handle_packet` which extend heartbeat"""
|
||||
code_name = self._code_name_mapping[code]
|
||||
if code_name == "heartbeat":
|
||||
# Reply with heartbeat.
|
||||
self.sock.sendall(b"storer")
|
||||
return self._send_packet(self._code_name_mapping['heartbeat'])
|
||||
|
||||
elif code_name == "connect":
|
||||
event = ftrack_api.event.base.Event(
|
||||
topic="pype.storer.started",
|
||||
data={},
|
||||
source={
|
||||
"id": self.id,
|
||||
"user": {"username": self._api_user}
|
||||
}
|
||||
)
|
||||
self._event_queue.put(event)
|
||||
|
||||
return super(StorerEventHub, self)._handle_packet(
|
||||
code, packet_identifier, path, data
|
||||
)
|
||||
|
||||
|
||||
class StorerSession(ftrack_api.session.Session):
|
||||
'''An isolated session for interaction with an ftrack server.'''
|
||||
def __init__(
|
||||
self, server_url=None, api_key=None, api_user=None, auto_populate=True,
|
||||
plugin_paths=None, cache=None, cache_key_maker=None,
|
||||
auto_connect_event_hub=None, schema_cache_path=None,
|
||||
plugin_arguments=None, sock=None
|
||||
):
|
||||
'''Initialise session.
|
||||
|
||||
*server_url* should be the URL of the ftrack server to connect to
|
||||
including any port number. If not specified attempt to look up from
|
||||
:envvar:`FTRACK_SERVER`.
|
||||
|
||||
*api_key* should be the API key to use for authentication whilst
|
||||
*api_user* should be the username of the user in ftrack to record
|
||||
operations against. If not specified, *api_key* should be retrieved
|
||||
from :envvar:`FTRACK_API_KEY` and *api_user* from
|
||||
:envvar:`FTRACK_API_USER`.
|
||||
|
||||
If *auto_populate* is True (the default), then accessing entity
|
||||
attributes will cause them to be automatically fetched from the server
|
||||
if they are not already. This flag can be changed on the session
|
||||
directly at any time.
|
||||
|
||||
*plugin_paths* should be a list of paths to search for plugins. If not
|
||||
specified, default to looking up :envvar:`FTRACK_EVENT_PLUGIN_PATH`.
|
||||
|
||||
*cache* should be an instance of a cache that fulfils the
|
||||
:class:`ftrack_api.cache.Cache` interface and will be used as the cache
|
||||
for the session. It can also be a callable that will be called with the
|
||||
session instance as sole argument. The callable should return ``None``
|
||||
if a suitable cache could not be configured, but session instantiation
|
||||
can continue safely.
|
||||
|
||||
.. note::
|
||||
|
||||
The session will add the specified cache to a pre-configured layered
|
||||
cache that specifies the top level cache as a
|
||||
:class:`ftrack_api.cache.MemoryCache`. Therefore, it is unnecessary
|
||||
to construct a separate memory cache for typical behaviour. Working
|
||||
around this behaviour or removing the memory cache can lead to
|
||||
unexpected behaviour.
|
||||
|
||||
*cache_key_maker* should be an instance of a key maker that fulfils the
|
||||
:class:`ftrack_api.cache.KeyMaker` interface and will be used to
|
||||
generate keys for objects being stored in the *cache*. If not specified,
|
||||
a :class:`~ftrack_api.cache.StringKeyMaker` will be used.
|
||||
|
||||
If *auto_connect_event_hub* is True then embedded event hub will be
|
||||
automatically connected to the event server and allow for publishing and
|
||||
subscribing to **non-local** events. If False, then only publishing and
|
||||
subscribing to **local** events will be possible until the hub is
|
||||
manually connected using :meth:`EventHub.connect
|
||||
<ftrack_api.event.hub.EventHub.connect>`.
|
||||
|
||||
.. note::
|
||||
|
||||
The event hub connection is performed in a background thread to
|
||||
improve session startup time. If a registered plugin requires a
|
||||
connected event hub then it should check the event hub connection
|
||||
status explicitly. Subscribing to events does *not* require a
|
||||
connected event hub.
|
||||
|
||||
Enable schema caching by setting *schema_cache_path* to a folder path.
|
||||
If not set, :envvar:`FTRACK_API_SCHEMA_CACHE_PATH` will be used to
|
||||
determine the path to store cache in. If the environment variable is
|
||||
also not specified then a temporary directory will be used. Set to
|
||||
`False` to disable schema caching entirely.
|
||||
|
||||
*plugin_arguments* should be an optional mapping (dict) of keyword
|
||||
arguments to pass to plugin register functions upon discovery. If a
|
||||
discovered plugin has a signature that is incompatible with the passed
|
||||
arguments, the discovery mechanism will attempt to reduce the passed
|
||||
arguments to only those that the plugin accepts. Note that a warning
|
||||
will be logged in this case.
|
||||
|
||||
'''
|
||||
super(ftrack_api.session.Session, self).__init__()
|
||||
self.logger = logging.getLogger(
|
||||
__name__ + '.' + self.__class__.__name__
|
||||
)
|
||||
self._closed = False
|
||||
|
||||
if server_url is None:
|
||||
server_url = os.environ.get('FTRACK_SERVER')
|
||||
|
||||
if not server_url:
|
||||
raise TypeError(
|
||||
'Required "server_url" not specified. Pass as argument or set '
|
||||
'in environment variable FTRACK_SERVER.'
|
||||
)
|
||||
|
||||
self._server_url = server_url
|
||||
|
||||
if api_key is None:
|
||||
api_key = os.environ.get(
|
||||
'FTRACK_API_KEY',
|
||||
# Backwards compatibility
|
||||
os.environ.get('FTRACK_APIKEY')
|
||||
)
|
||||
|
||||
if not api_key:
|
||||
raise TypeError(
|
||||
'Required "api_key" not specified. Pass as argument or set in '
|
||||
'environment variable FTRACK_API_KEY.'
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
|
||||
if api_user is None:
|
||||
api_user = os.environ.get('FTRACK_API_USER')
|
||||
if not api_user:
|
||||
try:
|
||||
api_user = getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not api_user:
|
||||
raise TypeError(
|
||||
'Required "api_user" not specified. Pass as argument, set in '
|
||||
'environment variable FTRACK_API_USER or one of the standard '
|
||||
'environment variables used by Python\'s getpass module.'
|
||||
)
|
||||
|
||||
self._api_user = api_user
|
||||
|
||||
# Currently pending operations.
|
||||
self.recorded_operations = ftrack_api.operation.Operations()
|
||||
self.record_operations = True
|
||||
|
||||
self.cache_key_maker = cache_key_maker
|
||||
if self.cache_key_maker is None:
|
||||
self.cache_key_maker = ftrack_api.cache.StringKeyMaker()
|
||||
|
||||
# Enforce always having a memory cache at top level so that the same
|
||||
# in-memory instance is returned from session.
|
||||
self.cache = ftrack_api.cache.LayeredCache([
|
||||
ftrack_api.cache.MemoryCache()
|
||||
])
|
||||
|
||||
if cache is not None:
|
||||
if callable(cache):
|
||||
cache = cache(self)
|
||||
|
||||
if cache is not None:
|
||||
self.cache.caches.append(cache)
|
||||
|
||||
self._managed_request = None
|
||||
self._request = requests.Session()
|
||||
self._request.auth = ftrack_api.session.SessionAuthentication(
|
||||
self._api_key, self._api_user
|
||||
)
|
||||
|
||||
self.auto_populate = auto_populate
|
||||
|
||||
# Fetch server information and in doing so also check credentials.
|
||||
self._server_information = self._fetch_server_information()
|
||||
|
||||
# Now check compatibility of server based on retrieved information.
|
||||
self.check_server_compatibility()
|
||||
|
||||
# Construct event hub and load plugins.
|
||||
self._event_hub = StorerEventHub(
|
||||
self._server_url,
|
||||
self._api_user,
|
||||
self._api_key,
|
||||
sock=sock
|
||||
)
|
||||
|
||||
self._auto_connect_event_hub_thread = None
|
||||
if auto_connect_event_hub in (None, True):
|
||||
# Connect to event hub in background thread so as not to block main
|
||||
# session usage waiting for event hub connection.
|
||||
self._auto_connect_event_hub_thread = threading.Thread(
|
||||
target=self._event_hub.connect
|
||||
)
|
||||
self._auto_connect_event_hub_thread.daemon = True
|
||||
self._auto_connect_event_hub_thread.start()
|
||||
|
||||
# To help with migration from auto_connect_event_hub default changing
|
||||
# from True to False.
|
||||
self._event_hub._deprecation_warning_auto_connect = (
|
||||
auto_connect_event_hub is None
|
||||
)
|
||||
|
||||
# Register to auto-close session on exit.
|
||||
atexit.register(self.close)
|
||||
|
||||
self._plugin_paths = plugin_paths
|
||||
if self._plugin_paths is None:
|
||||
self._plugin_paths = os.environ.get(
|
||||
'FTRACK_EVENT_PLUGIN_PATH', ''
|
||||
).split(os.pathsep)
|
||||
|
||||
self._discover_plugins(plugin_arguments=plugin_arguments)
|
||||
|
||||
# TODO: Make schemas read-only and non-mutable (or at least without
|
||||
# rebuilding types)?
|
||||
if schema_cache_path is not False:
|
||||
if schema_cache_path is None:
|
||||
schema_cache_path = os.environ.get(
|
||||
'FTRACK_API_SCHEMA_CACHE_PATH', tempfile.gettempdir()
|
||||
)
|
||||
|
||||
schema_cache_path = os.path.join(
|
||||
schema_cache_path, 'ftrack_api_schema_cache.json'
|
||||
)
|
||||
|
||||
self.schemas = self._load_schemas(schema_cache_path)
|
||||
self.types = self._build_entity_type_classes(self.schemas)
|
||||
|
||||
ftrack_api._centralized_storage_scenario.register(self)
|
||||
|
||||
self._configure_locations()
|
||||
self.event_hub.publish(
|
||||
ftrack_api.event.base.Event(
|
||||
topic='ftrack.api.session.ready',
|
||||
data=dict(
|
||||
session=self
|
||||
)
|
||||
),
|
||||
synchronous=True
|
||||
)
|
||||
|
|
@ -1,7 +1,5 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
import subprocess
|
||||
|
|
@ -10,7 +8,9 @@ from pypeapp import Logger
|
|||
|
||||
class SocketThread(threading.Thread):
|
||||
"""Thread that checks suprocess of storer of processor of events"""
|
||||
|
||||
MAX_TIMEOUT = 35
|
||||
|
||||
def __init__(self, name, port, filepath):
|
||||
super(SocketThread, self).__init__()
|
||||
self.log = Logger().get_logger("SocketThread", "Event Thread")
|
||||
|
|
|
|||
|
|
@ -1,12 +1,9 @@
|
|||
import os
|
||||
import sys
|
||||
import datetime
|
||||
import signal
|
||||
import socket
|
||||
import pymongo
|
||||
|
||||
from ftrack_server import FtrackServer
|
||||
from pype.ftrack.ftrack_server.session_processor import ProcessSession
|
||||
from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event processor")
|
||||
|
|
@ -24,12 +21,14 @@ def main(args):
|
|||
|
||||
sock.sendall(b"CreatedProcess")
|
||||
try:
|
||||
session = ProcessSession(auto_connect_event_hub=True, sock=sock)
|
||||
server = FtrackServer('event')
|
||||
session = SocketSession(
|
||||
auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub
|
||||
)
|
||||
server = FtrackServer("event")
|
||||
log.debug("Launched Ftrack Event processor")
|
||||
server.run_server(session)
|
||||
|
||||
except Exception as exc:
|
||||
except Exception:
|
||||
log.error("Event server crashed. See traceback below", exc_info=True)
|
||||
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -7,22 +7,22 @@ import pymongo
|
|||
|
||||
import ftrack_api
|
||||
from ftrack_server import FtrackServer
|
||||
from pype.ftrack.ftrack_server.lib import get_ftrack_event_mongo_info
|
||||
from pype.ftrack.ftrack_server.lib import (
|
||||
get_ftrack_event_mongo_info,
|
||||
SocketSession,
|
||||
StorerEventHub
|
||||
)
|
||||
from pype.ftrack.lib.custom_db_connector import DbConnector
|
||||
from session_storer import StorerSession
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event storer")
|
||||
|
||||
|
||||
class SessionFactory:
|
||||
session = None
|
||||
|
||||
|
||||
url, database, table_name = get_ftrack_event_mongo_info()
|
||||
|
||||
|
||||
class SessionClass:
|
||||
def __init__(self):
|
||||
self.session = None
|
||||
|
||||
|
||||
session_obj = SessionClass()
|
||||
dbcon = DbConnector(
|
||||
mongo_url=url,
|
||||
database_name=database,
|
||||
|
|
@ -75,7 +75,11 @@ def launch(event):
|
|||
|
||||
|
||||
def trigger_sync(event):
|
||||
session = session_obj.session
|
||||
session = SessionFactory.session
|
||||
source_id = event.get("source", {}).get("id")
|
||||
if not source_id or source_id != session.event_hub.id:
|
||||
return
|
||||
|
||||
if session is None:
|
||||
log.warning("Session is not set. Can't trigger Sync to avalon action.")
|
||||
return True
|
||||
|
|
@ -93,7 +97,7 @@ def trigger_sync(event):
|
|||
"$set": {"pype_data.is_processed": True}
|
||||
}
|
||||
dbcon.update_many(query, set_dict)
|
||||
|
||||
|
||||
selections = []
|
||||
for project in projects:
|
||||
if project["status"] != "active":
|
||||
|
|
@ -154,8 +158,10 @@ def main(args):
|
|||
sock.sendall(b"CreatedStore")
|
||||
|
||||
try:
|
||||
session = StorerSession(auto_connect_event_hub=True, sock=sock)
|
||||
session_obj.session = session
|
||||
session = SocketSession(
|
||||
auto_connect_event_hub=True, sock=sock, Eventhub=StorerEventHub
|
||||
)
|
||||
SessionFactory.session = session
|
||||
register(session)
|
||||
server = FtrackServer("event")
|
||||
log.debug("Launched Ftrack Event storer")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import datetime
|
||||
|
|
@ -7,7 +6,6 @@ import threading
|
|||
|
||||
from ftrack_server import FtrackServer
|
||||
import ftrack_api
|
||||
from ftrack_api.event.hub import EventHub
|
||||
from pypeapp import Logger
|
||||
|
||||
log = Logger().get_logger("Event Server Legacy")
|
||||
|
|
@ -37,7 +35,10 @@ class TimerChecker(threading.Thread):
|
|||
|
||||
if not self.session.event_hub.connected:
|
||||
if not connected:
|
||||
if (datetime.datetime.now() - start).seconds > self.max_time_out:
|
||||
if (
|
||||
(datetime.datetime.now() - start).seconds >
|
||||
self.max_time_out
|
||||
):
|
||||
log.error((
|
||||
"Exiting event server. Session was not connected"
|
||||
" to ftrack server in {} seconds."
|
||||
|
|
@ -61,7 +62,7 @@ class TimerChecker(threading.Thread):
|
|||
def main(args):
|
||||
check_thread = None
|
||||
try:
|
||||
server = FtrackServer('event')
|
||||
server = FtrackServer("event")
|
||||
session = ftrack_api.Session(auto_connect_event_hub=True)
|
||||
|
||||
check_thread = TimerChecker(server, session)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import functools
|
|||
import time
|
||||
from pypeapp import Logger
|
||||
import ftrack_api
|
||||
from pype.ftrack.ftrack_server import session_processor
|
||||
from pype.ftrack.ftrack_server.lib import SocketSession
|
||||
|
||||
|
||||
class MissingPermision(Exception):
|
||||
|
|
@ -41,7 +41,7 @@ class BaseHandler(object):
|
|||
self.log = Logger().get_logger(self.__class__.__name__)
|
||||
if not(
|
||||
isinstance(session, ftrack_api.session.Session) or
|
||||
isinstance(session, session_processor.ProcessSession)
|
||||
isinstance(session, SocketSession)
|
||||
):
|
||||
raise Exception((
|
||||
"Session object entered with args is instance of \"{}\""
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ class ExtractReview(pyblish.api.InstancePlugin):
|
|||
if "reformat" not in p_tags:
|
||||
lb /= pixel_aspect
|
||||
output_args.append(
|
||||
"-filter:v drawbox=0:0:iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black,drawbox=0:ih-round((ih-(iw*(1/{0})))/2):iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black".format(lb))
|
||||
"-filter:v scale=1920x1080:flags=lanczos,setsar=1,drawbox=0:0:iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black,drawbox=0:ih-round((ih-(iw*(1/{0})))/2):iw:round((ih-(iw*(1/{0})))/2):t=fill:c=black".format(lb))
|
||||
|
||||
# In case audio is longer than video.
|
||||
output_args.append("-shortest")
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
|
|||
"yetiRig",
|
||||
"yeticache",
|
||||
"nukenodes",
|
||||
"gizmo"
|
||||
"gizmo",
|
||||
"source",
|
||||
"matchmove",
|
||||
"image"
|
||||
|
|
|
|||
|
|
@ -219,10 +219,6 @@ class CollectLook(pyblish.api.InstancePlugin):
|
|||
with lib.renderlayer(instance.data["renderlayer"]):
|
||||
self.collect(instance)
|
||||
|
||||
# make ftrack publishable
|
||||
self.maketx = instance.data.get('maketx', True)
|
||||
instance.data['maketx'] = self.maketx
|
||||
self.log.info('maketx: {}'.format(self.maketx))
|
||||
|
||||
def collect(self, instance):
|
||||
|
||||
|
|
|
|||
|
|
@ -74,6 +74,8 @@ def maketx(source, destination, *args):
|
|||
cmd.extend(args)
|
||||
cmd.extend(["-o", destination, source])
|
||||
|
||||
cmd = " ".join(cmd)
|
||||
|
||||
CREATE_NO_WINDOW = 0x08000000
|
||||
kwargs = dict(args=cmd, stderr=subprocess.STDOUT)
|
||||
|
||||
|
|
@ -183,6 +185,7 @@ class ExtractLook(pype.api.Extractor):
|
|||
transfers = list()
|
||||
hardlinks = list()
|
||||
hashes = dict()
|
||||
forceCopy = instance.data.get("forceCopy", False)
|
||||
|
||||
self.log.info(files)
|
||||
for filepath in files_metadata:
|
||||
|
|
@ -195,20 +198,26 @@ class ExtractLook(pype.api.Extractor):
|
|||
files_metadata[filepath]["color_space"] = "raw"
|
||||
|
||||
source, mode, hash = self._process_texture(
|
||||
filepath, do_maketx, staging=dir_path, linearise=linearise
|
||||
filepath,
|
||||
do_maketx,
|
||||
staging=dir_path,
|
||||
linearise=linearise,
|
||||
force=forceCopy
|
||||
)
|
||||
destination = self.resource_destination(instance,
|
||||
source,
|
||||
do_maketx)
|
||||
|
||||
# Force copy is specified.
|
||||
if instance.data.get("forceCopy", False):
|
||||
if forceCopy:
|
||||
mode = COPY
|
||||
|
||||
if mode == COPY:
|
||||
transfers.append((source, destination))
|
||||
self.log.info('copying')
|
||||
elif mode == HARDLINK:
|
||||
hardlinks.append((source, destination))
|
||||
self.log.info('hardlinking')
|
||||
|
||||
# Store the hashes from hash to destination to include in the
|
||||
# database
|
||||
|
|
@ -231,11 +240,12 @@ class ExtractLook(pype.api.Extractor):
|
|||
# ensure after context it's still the original value.
|
||||
color_space_attr = resource["node"] + ".colorSpace"
|
||||
color_space = cmds.getAttr(color_space_attr)
|
||||
|
||||
if files_metadata[source]["color_space"] == "raw":
|
||||
# set colorpsace to raw if we linearized it
|
||||
color_space = "Raw"
|
||||
# Remap file node filename to destination
|
||||
attr = resource["attribute"]
|
||||
remap[attr] = destinations[source]
|
||||
|
||||
remap[color_space_attr] = color_space
|
||||
|
||||
self.log.info("Finished remapping destinations ...")
|
||||
|
|
@ -310,6 +320,12 @@ class ExtractLook(pype.api.Extractor):
|
|||
# Source hash for the textures
|
||||
instance.data["sourceHashes"] = hashes
|
||||
|
||||
"""
|
||||
self.log.info("Returning colorspaces to their original values ...")
|
||||
for attr, value in remap.items():
|
||||
self.log.info(" - {}: {}".format(attr, value))
|
||||
cmds.setAttr(attr, value, type="string")
|
||||
"""
|
||||
self.log.info("Extracted instance '%s' to: %s" % (instance.name,
|
||||
maya_path))
|
||||
|
||||
|
|
@ -330,7 +346,7 @@ class ExtractLook(pype.api.Extractor):
|
|||
instance.data["assumedDestination"], "resources", basename + ext
|
||||
)
|
||||
|
||||
def _process_texture(self, filepath, do_maketx, staging, linearise):
|
||||
def _process_texture(self, filepath, do_maketx, staging, linearise, force):
|
||||
"""Process a single texture file on disk for publishing.
|
||||
This will:
|
||||
1. Check whether it's already published, if so it will do hardlink
|
||||
|
|
@ -352,7 +368,7 @@ class ExtractLook(pype.api.Extractor):
|
|||
# If source has been published before with the same settings,
|
||||
# then don't reprocess but hardlink from the original
|
||||
existing = find_paths_by_hash(texture_hash)
|
||||
if existing:
|
||||
if existing and not force:
|
||||
self.log.info("Found hash in database, preparing hardlink..")
|
||||
source = next((p for p in existing if os.path.exists(p)), None)
|
||||
if filepath:
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ class CreateWriteRender(plugin.PypeCreator):
|
|||
data.update({k: v})
|
||||
|
||||
self.data = data
|
||||
self.nodes = nuke.selectedNodes()
|
||||
self.log.info("self.data: '{}'".format(self.data))
|
||||
|
||||
def process(self):
|
||||
|
|
@ -46,9 +47,9 @@ class CreateWriteRender(plugin.PypeCreator):
|
|||
|
||||
# use selection
|
||||
if (self.options or {}).get("useSelection"):
|
||||
nodes = nuke.selectedNodes()
|
||||
nodes = self.nodes
|
||||
|
||||
assert len(nodes) == 1, self.log.error("Select only one node. The node you want to connect to, or tick off `Use selection`")
|
||||
assert len(nodes) < 2, self.log.error("Select only one node. The node you want to connect to, or tick off `Use selection`")
|
||||
|
||||
selected_node = nodes[0]
|
||||
inputs = [selected_node]
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class CollectNukeInstances(pyblish.api.ContextPlugin):
|
|||
continue
|
||||
except Exception as E:
|
||||
self.log.warning(E)
|
||||
continue
|
||||
|
||||
|
||||
# get data from avalon knob
|
||||
self.log.debug("node[name]: {}".format(node['name'].value()))
|
||||
|
|
@ -86,7 +86,11 @@ class CollectNukeInstances(pyblish.api.ContextPlugin):
|
|||
node.end()
|
||||
|
||||
family = avalon_knob_data["family"]
|
||||
families = [avalon_knob_data["families"]]
|
||||
families = avalon_knob_data.get("families")
|
||||
if families:
|
||||
families = [families]
|
||||
else:
|
||||
families = [family]
|
||||
|
||||
# Get format
|
||||
format = root['format'].value()
|
||||
|
|
@ -95,7 +99,9 @@ class CollectNukeInstances(pyblish.api.ContextPlugin):
|
|||
pixel_aspect = format.pixelAspect()
|
||||
|
||||
if node.Class() not in "Read":
|
||||
if node["render"].value():
|
||||
if "render" not in node.knobs().keys():
|
||||
families.insert(0, family)
|
||||
elif node["render"].value():
|
||||
self.log.info("flagged for render")
|
||||
add_family = "render.local"
|
||||
# dealing with local/farm rendering
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin):
|
|||
order = pyblish.api.CollectorOrder + 0.1
|
||||
label = "Collect Writes"
|
||||
hosts = ["nuke", "nukeassist"]
|
||||
families = ["render", "render.local", "render.farm"]
|
||||
families = ["write"]
|
||||
|
||||
def process(self, instance):
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin):
|
|||
"frameEnd": last_frame - handle_end,
|
||||
"version": int(version),
|
||||
"colorspace": node["colorspace"].value(),
|
||||
"families": [instance.data["family"]] + instance.data["families"],
|
||||
"families": [instance.data["family"]],
|
||||
"subset": instance.data["subset"],
|
||||
"fps": instance.context.data["fps"]
|
||||
}
|
||||
|
|
@ -110,6 +110,7 @@ class CollectNukeWrites(pyblish.api.InstancePlugin):
|
|||
if "deadlinePriority" in group_node.knobs():
|
||||
deadlinePriority = group_node["deadlinePriority"].value()
|
||||
|
||||
families = [f for f in instance.data["families"] if "write" not in f]
|
||||
instance.data.update({
|
||||
"versionData": version_data,
|
||||
"path": path,
|
||||
|
|
@ -120,10 +121,13 @@ class CollectNukeWrites(pyblish.api.InstancePlugin):
|
|||
"frameStart": first_frame,
|
||||
"frameEnd": last_frame,
|
||||
"outputType": output_type,
|
||||
"family": "write",
|
||||
"families": families,
|
||||
"colorspace": node["colorspace"].value(),
|
||||
"deadlineChunkSize": deadlineChunkSize,
|
||||
"deadlinePriority": deadlinePriority,
|
||||
"subsetGroup": "renders"
|
||||
})
|
||||
|
||||
|
||||
self.log.debug("instance.data: {}".format(instance.data))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue