removed deprecated doctor actions

This commit is contained in:
iLLiCiTiT 2020-06-04 16:59:21 +02:00
parent 3fd6e8fb29
commit f32fc2dfe2
3 changed files with 0 additions and 809 deletions

View file

@ -1,284 +0,0 @@
import os
import ftrack_api
from pype.modules.ftrack import BaseAction
from pype.modules.ftrack.lib.io_nonsingleton import DbConnector
class AttributesRemapper(BaseAction):
'''Edit meta data action.'''
ignore_me = True
#: Action identifier.
identifier = 'attributes.remapper'
#: Action label.
label = "Pype Doctor"
variant = '- Attributes Remapper'
#: Action description.
description = 'Remaps attributes in avalon DB'
#: roles that are allowed to register this action
role_list = ["Pypeclub", "Administrator"]
icon = '{}/ftrack/action_icons/PypeDoctor.svg'.format(
os.environ.get('PYPE_STATICS_SERVER', '')
)
db_con = DbConnector()
keys_to_change = {
"fstart": "frameStart",
"startFrame": "frameStart",
"edit_in": "frameStart",
"fend": "frameEnd",
"endFrame": "frameEnd",
"edit_out": "frameEnd",
"handle_start": "handleStart",
"handle_end": "handleEnd",
"handles": ["handleEnd", "handleStart"],
"frameRate": "fps",
"framerate": "fps",
"resolution_width": "resolutionWidth",
"resolution_height": "resolutionHeight",
"pixel_aspect": "pixelAspect"
}
def discover(self, session, entities, event):
''' Validation '''
return True
def interface(self, session, entities, event):
if event['data'].get('values', {}):
return
title = 'Select Projects where attributes should be remapped'
items = []
selection_enum = {
'label': 'Process type',
'type': 'enumerator',
'name': 'process_type',
'data': [
{
'label': 'Selection',
'value': 'selection'
}, {
'label': 'Inverted selection',
'value': 'except'
}
],
'value': 'selection'
}
selection_label = {
'type': 'label',
'value': (
'Selection based variants:<br/>'
'- `Selection` - '
'NOTHING is processed when nothing is selected<br/>'
'- `Inverted selection` - '
'ALL Projects are processed when nothing is selected'
)
}
items.append(selection_enum)
items.append(selection_label)
item_splitter = {'type': 'label', 'value': '---'}
all_projects = session.query('Project').all()
for project in all_projects:
item_label = {
'type': 'label',
'value': '{} (<i>{}</i>)'.format(
project['full_name'], project['name']
)
}
item = {
'name': project['id'],
'type': 'boolean',
'value': False
}
if len(items) > 0:
items.append(item_splitter)
items.append(item_label)
items.append(item)
if len(items) == 0:
return {
'success': False,
'message': 'Didn\'t found any projects'
}
else:
return {
'items': items,
'title': title
}
def launch(self, session, entities, event):
if 'values' not in event['data']:
return
values = event['data']['values']
process_type = values.pop('process_type')
selection = True
if process_type == 'except':
selection = False
interface_messages = {}
projects_to_update = []
for project_id, update_bool in values.items():
if not update_bool and selection:
continue
if update_bool and not selection:
continue
project = session.query(
'Project where id is "{}"'.format(project_id)
).one()
projects_to_update.append(project)
if not projects_to_update:
self.log.debug('Nothing to update')
return {
'success': True,
'message': 'Nothing to update'
}
self.db_con.install()
relevant_types = ["project", "asset", "version"]
for ft_project in projects_to_update:
self.log.debug(
"Processing project \"{}\"".format(ft_project["full_name"])
)
self.db_con.Session["AVALON_PROJECT"] = ft_project["full_name"]
project = self.db_con.find_one({'type': 'project'})
if not project:
key = "Projects not synchronized to db"
if key not in interface_messages:
interface_messages[key] = []
interface_messages[key].append(ft_project["full_name"])
continue
# Get all entities in project collection from MongoDB
_entities = self.db_con.find({})
for _entity in _entities:
ent_t = _entity.get("type", "*unknown type")
name = _entity.get("name", "*unknown name")
self.log.debug(
"- {} ({})".format(name, ent_t)
)
# Skip types that do not store keys to change
if ent_t.lower() not in relevant_types:
self.log.debug("-- skipping - type is not relevant")
continue
# Get data which will change
updating_data = {}
source_data = _entity["data"]
for key_from, key_to in self.keys_to_change.items():
# continue if final key already exists
if type(key_to) == list:
for key in key_to:
# continue if final key was set in update_data
if key in updating_data:
continue
# continue if source key not exist or value is None
value = source_data.get(key_from)
if value is None:
continue
self.log.debug(
"-- changing key {} to {}".format(
key_from,
key
)
)
updating_data[key] = value
else:
if key_to in source_data:
continue
# continue if final key was set in update_data
if key_to in updating_data:
continue
# continue if source key not exist or value is None
value = source_data.get(key_from)
if value is None:
continue
self.log.debug(
"-- changing key {} to {}".format(key_from, key_to)
)
updating_data[key_to] = value
# Pop out old keys from entity
is_obsolete = False
for key in self.keys_to_change:
if key not in source_data:
continue
is_obsolete = True
source_data.pop(key)
# continue if there is nothing to change
if not is_obsolete and not updating_data:
self.log.debug("-- nothing to change")
continue
source_data.update(updating_data)
self.db_con.update_many(
{"_id": _entity["_id"]},
{"$set": {"data": source_data}}
)
self.db_con.uninstall()
if interface_messages:
self.show_interface_from_dict(
messages=interface_messages,
title="Errors during remapping attributes",
event=event
)
return True
def show_interface_from_dict(self, event, messages, title=""):
items = []
for key, value in messages.items():
if not value:
continue
subtitle = {'type': 'label', 'value': '# {}'.format(key)}
items.append(subtitle)
if isinstance(value, list):
for item in value:
message = {
'type': 'label', 'value': '<p>{}</p>'.format(item)
}
items.append(message)
else:
message = {'type': 'label', 'value': '<p>{}</p>'.format(value)}
items.append(message)
self.show_interface(items=items, title=title, event=event)
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
AttributesRemapper(session, plugins_presets).register()

View file

@ -1,336 +0,0 @@
import os
import sys
import json
import argparse
import logging
import ftrack_api
from pype.modules.ftrack import BaseAction
class CustomAttributeDoctor(BaseAction):
ignore_me = True
#: Action identifier.
identifier = 'custom.attributes.doctor'
#: Action label.
label = "Pype Doctor"
variant = '- Custom Attributes Doctor'
#: Action description.
description = (
'Fix hierarchical custom attributes mainly handles, fstart'
' and fend'
)
icon = '{}/ftrack/action_icons/PypeDoctor.svg'.format(
os.environ.get('PYPE_STATICS_SERVER', '')
)
hierarchical_ca = ['handleStart', 'handleEnd', 'frameStart', 'frameEnd']
hierarchical_alternatives = {
'handleStart': 'handles',
'handleEnd': 'handles',
"frameStart": "fstart",
"frameEnd": "fend"
}
# Roles for new custom attributes
read_roles = ['ALL',]
write_roles = ['ALL',]
data_ca = {
'handleStart': {
'label': 'Frame handles start',
'type': 'number',
'config': json.dumps({'isdecimal': False})
},
'handleEnd': {
'label': 'Frame handles end',
'type': 'number',
'config': json.dumps({'isdecimal': False})
},
'frameStart': {
'label': 'Frame start',
'type': 'number',
'config': json.dumps({'isdecimal': False})
},
'frameEnd': {
'label': 'Frame end',
'type': 'number',
'config': json.dumps({'isdecimal': False})
}
}
def discover(self, session, entities, event):
''' Validation '''
return True
def interface(self, session, entities, event):
if event['data'].get('values', {}):
return
title = 'Select Project to fix Custom attributes'
items = []
item_splitter = {'type': 'label', 'value': '---'}
all_projects = session.query('Project').all()
for project in all_projects:
item_label = {
'type': 'label',
'value': '{} (<i>{}</i>)'.format(
project['full_name'], project['name']
)
}
item = {
'name': project['id'],
'type': 'boolean',
'value': False
}
if len(items) > 0:
items.append(item_splitter)
items.append(item_label)
items.append(item)
if len(items) == 0:
return {
'success': False,
'message': 'Didn\'t found any projects'
}
else:
return {
'items': items,
'title': title
}
def launch(self, session, entities, event):
if 'values' not in event['data']:
return
values = event['data']['values']
projects_to_update = []
for project_id, update_bool in values.items():
if not update_bool:
continue
project = session.query(
'Project where id is "{}"'.format(project_id)
).one()
projects_to_update.append(project)
if not projects_to_update:
self.log.debug('Nothing to update')
return {
'success': True,
'message': 'Nothing to update'
}
self.security_roles = {}
self.to_process = {}
# self.curent_default_values = {}
existing_attrs = session.query('CustomAttributeConfiguration').all()
self.prepare_custom_attributes(existing_attrs)
self.projects_data = {}
for project in projects_to_update:
self.process_data(project)
return True
def process_data(self, entity):
cust_attrs = entity.get('custom_attributes')
if not cust_attrs:
return
for dst_key, src_key in self.to_process.items():
if src_key in cust_attrs:
value = cust_attrs[src_key]
entity['custom_attributes'][dst_key] = value
self.session.commit()
for child in entity.get('children', []):
self.process_data(child)
def prepare_custom_attributes(self, existing_attrs):
to_process = {}
to_create = []
all_keys = {attr['key']: attr for attr in existing_attrs}
for key in self.hierarchical_ca:
if key not in all_keys:
self.log.debug(
'Custom attribute "{}" does not exist at all'.format(key)
)
to_create.append(key)
if key in self.hierarchical_alternatives:
alt_key = self.hierarchical_alternatives[key]
if alt_key in all_keys:
self.log.debug((
'Custom attribute "{}" will use values from "{}"'
).format(key, alt_key))
to_process[key] = alt_key
obj = all_keys[alt_key]
# if alt_key not in self.curent_default_values:
# self.curent_default_values[alt_key] = obj['default']
obj['default'] = None
self.session.commit()
else:
obj = all_keys[key]
new_key = key + '_old'
if obj['is_hierarchical']:
if new_key not in all_keys:
self.log.info((
'Custom attribute "{}" is already hierarchical'
' and can\'t find old one'
).format(key)
)
continue
to_process[key] = new_key
continue
# default_value = obj['default']
# if new_key not in self.curent_default_values:
# self.curent_default_values[new_key] = default_value
obj['key'] = new_key
obj['label'] = obj['label'] + '(old)'
obj['default'] = None
self.session.commit()
to_create.append(key)
to_process[key] = new_key
self.to_process = to_process
for key in to_create:
data = {
'key': key,
'entity_type': 'show',
'is_hierarchical': True,
'default': None
}
for _key, _value in self.data_ca.get(key, {}).items():
if _key == 'type':
_value = self.session.query((
'CustomAttributeType where name is "{}"'
).format(_value)).first()
data[_key] = _value
avalon_group = self.session.query(
'CustomAttributeGroup where name is "avalon"'
).first()
if avalon_group:
data['group'] = avalon_group
read_roles = self.get_security_role(self.read_roles)
write_roles = self.get_security_role(self.write_roles)
data['read_security_roles'] = read_roles
data['write_security_roles'] = write_roles
self.session.create('CustomAttributeConfiguration', data)
self.session.commit()
# def return_back_defaults(self):
# existing_attrs = self.session.query(
# 'CustomAttributeConfiguration'
# ).all()
#
# for attr_key, default in self.curent_default_values.items():
# for attr in existing_attrs:
# if attr['key'] != attr_key:
# continue
# attr['default'] = default
# self.session.commit()
# break
def get_security_role(self, security_roles):
roles = []
if len(security_roles) == 0 or security_roles[0] == 'ALL':
roles = self.get_role_ALL()
elif security_roles[0] == 'except':
excepts = security_roles[1:]
all = self.get_role_ALL()
for role in all:
if role['name'] not in excepts:
roles.append(role)
if role['name'] not in self.security_roles:
self.security_roles[role['name']] = role
else:
for role_name in security_roles:
if role_name in self.security_roles:
roles.append(self.security_roles[role_name])
continue
try:
query = 'SecurityRole where name is "{}"'.format(role_name)
role = self.session.query(query).one()
self.security_roles[role_name] = role
roles.append(role)
except Exception:
self.log.warning(
'Securit role "{}" does not exist'.format(role_name)
)
continue
return roles
def get_role_ALL(self):
role_name = 'ALL'
if role_name in self.security_roles:
all_roles = self.security_roles[role_name]
else:
all_roles = self.session.query('SecurityRole').all()
self.security_roles[role_name] = all_roles
for role in all_roles:
if role['name'] not in self.security_roles:
self.security_roles[role['name']] = role
return all_roles
def register(session, plugins_presets={}):
'''Register plugin. Called when used as an plugin.'''
CustomAttributeDoctor(session, plugins_presets).register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -1,189 +0,0 @@
import os
from pype.modules.ftrack import BaseAction
from pype.modules.ftrack.lib.io_nonsingleton import DbConnector
class PypeUpdateFromV2_2_0(BaseAction):
"""This action is to remove silo field from database and changes asset
schema to newer version
WARNING: it is NOT for situations when you want to switch from avalon-core
to Pype's avalon-core!!!
"""
#: Action identifier.
identifier = "silos.doctor"
#: Action label.
label = "Pype Update"
variant = "- v2.2.0 to v2.3.0 or higher"
#: Action description.
description = "Use when Pype was updated from v2.2.0 to v2.3.0 or higher"
#: roles that are allowed to register this action
role_list = ["Pypeclub", "Administrator"]
icon = "{}/ftrack/action_icons/PypeUpdate.svg".format(
os.environ.get("PYPE_STATICS_SERVER", "")
)
# connector to MongoDB (Avalon mongo)
db_con = DbConnector()
def discover(self, session, entities, event):
""" Validation """
if len(entities) != 1:
return False
if entities[0].entity_type.lower() != "project":
return False
return True
def interface(self, session, entities, event):
if event['data'].get('values', {}):
return
items = []
item_splitter = {'type': 'label', 'value': '---'}
title = "Updated Pype from v 2.2.0 to v2.3.0 or higher"
items.append({
"type": "label",
"value": (
"NOTE: This doctor action should be used ONLY when Pype"
" was updated from v2.2.0 to v2.3.0 or higher.<br><br><br>"
)
})
items.append({
"type": "label",
"value": (
"Select if want to process <b>all synchronized projects</b>"
" or <b>selection</b>."
)
})
items.append({
"type": "enumerator",
"name": "__process_all__",
"data": [{
"label": "All synchronized projects",
"value": True
}, {
"label": "Selection",
"value": False
}],
"value": False
})
items.append({
"type": "label",
"value": (
"<br/><br/><h2>Synchronized projects:</h2>"
"<i>(ignore if <strong>\"ALL projects\"</strong> selected)</i>"
)
})
self.log.debug("Getting all Ftrack projects")
# Get all Ftrack projects
all_ftrack_projects = [
project["full_name"] for project in session.query("Project").all()
]
self.log.debug("Getting Avalon projects that are also in the Ftrack")
# Get Avalon projects that are in Ftrack
self.db_con.install()
possible_projects = [
project["name"] for project in self.db_con.projects()
if project["name"] in all_ftrack_projects
]
for project in possible_projects:
item_label = {
"type": "label",
"value": project
}
item = {
"label": "- process",
"name": project,
"type": 'boolean',
"value": False
}
items.append(item_splitter)
items.append(item_label)
items.append(item)
if len(possible_projects) == 0:
return {
"success": False,
"message": (
"Nothing to process."
" There are not projects synchronized to avalon."
)
}
else:
return {
"items": items,
"title": title
}
def launch(self, session, entities, event):
if 'values' not in event['data']:
return
projects_selection = {
True: [],
False: []
}
process_all = None
values = event['data']['values']
for key, value in values.items():
if key == "__process_all__":
process_all = value
continue
projects_selection[value].append(key)
# Skip if process_all value is not boolean
# - may happen when user delete string line in combobox
if not isinstance(process_all, bool):
self.log.warning(
"Nothing was processed. User didn't select if want to process"
" selection or all projects!"
)
return {
"success": False,
"message": (
"Nothing was processed. You must select if want to process"
" \"selection\" or \"all projects\"!"
)
}
projects_to_process = projects_selection[True]
if process_all:
projects_to_process.extend(projects_selection[False])
self.db_con.install()
for project in projects_to_process:
self.log.debug("Processing project \"{}\"".format(project))
self.db_con.Session["AVALON_PROJECT"] = project
self.log.debug("- Unsetting silos on assets")
self.db_con.update_many(
{"type": "asset"},
{"$unset": {"silo": ""}}
)
self.log.debug("- setting schema of assets to v.3")
self.db_con.update_many(
{"type": "asset"},
{"$set": {"schema": "avalon-core:asset-3.0"}}
)
return True
def register(session, plugins_presets={}):
"""Register plugin. Called when used as an plugin."""
PypeUpdateFromV2_2_0(session, plugins_presets).register()