hierarchical mongo id attribute automatically replace previous access

This commit is contained in:
iLLiCiTiT 2020-05-04 19:54:12 +02:00
parent f345da3715
commit 9935972a85

View file

@ -1,4 +1,5 @@
import os
import collections
import json
import arrow
import ftrack_api
@ -131,11 +132,6 @@ class CustomAttributes(BaseAction):
return True
def launch(self, session, entities, event):
self.types = {}
self.object_type_ids = {}
self.groups = {}
self.security_roles = None
# JOB SETTINGS
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
@ -149,7 +145,8 @@ class CustomAttributes(BaseAction):
})
session.commit()
try:
self.avalon_mongo_id_attributes(session)
self.prepare_global_data(session)
self.avalon_mongo_id_attributes(session, event)
self.custom_attributes_from_file(session, event)
job['status'] = 'done'
@ -166,60 +163,92 @@ class CustomAttributes(BaseAction):
return True
def avalon_mongo_id_attributes(self, session):
def prepare_global_data(self, session):
self.types_per_name = {
attr_type["name"].lower(): attr_type
for attr_type in session.query("CustomAttributeType").all()
}
self.security_roles = {
role["name"].lower(): role
for role in session.query("SecurityRole").all()
}
object_types = session.query("ObjectType").all()
self.object_types_per_id = {
object_type["id"]: object_type for object_type in object_types
}
self.object_types_per_name = {
object_type["name"].lower(): object_type
for object_type in object_types
}
self.groups = {}
def avalon_mongo_id_attributes(self, session, event):
hierarchical_attr, object_type_attrs = (
self.mongo_id_custom_attributes(session)
)
if hierarchical_attr is None:
self.create_hierarchical_mongo_attr(session)
hierarchical_attr, object_type_attrs = (
self.mongo_id_custom_attributes(session)
)
if hierarchical_attr is None:
return
if object_type_attrs:
self.convert_mongo_id_to_hierarchical(
hierarchical_attr, object_type_attrs, session, event
)
def mongo_id_custom_attributes(self, session):
cust_attrs_query = (
"select id, entity_type, object_type_id, is_hierarchical, default"
" from CustomAttributeConfiguration"
" where key = \"{}\""
).format(CustAttrIdKey)
mongo_id_avalon_attr = session.query(cust_attrs_query).all()
heirarchical_attr = None
object_type_attrs = []
for cust_attr in mongo_id_avalon_attr:
if cust_attr["is_hierarchical"]:
heirarchical_attr = cust_attr
else:
object_type_attrs.append(cust_attr)
return heirarchical_attr, object_type_attrs
def create_hierarchical_mongo_attr(self, session):
# Attribute Name and Label
cust_attr_label = 'Avalon/Mongo Id'
# Types that don't need object_type_id
base = {'show'}
# Don't create custom attribute on these entity types:
exceptions = ['task', 'milestone']
exceptions.extend(base)
# Get all possible object types
all_obj_types = session.query('ObjectType').all()
# Filter object types by exceptions
filtered_types_id = set()
for obj_type in all_obj_types:
name = obj_type['name']
if " " in name:
name = name.replace(' ', '')
if obj_type['name'] not in self.object_type_ids:
self.object_type_ids[name] = obj_type['id']
if name.lower() not in exceptions:
filtered_types_id.add(obj_type['id'])
cust_attr_label = "Avalon/Mongo ID"
# Set security roles for attribute
role_list = ("API", "Administrator", "Pypeclub")
roles = self.get_security_roles(role_list)
# Set Text type of Attribute
custom_attribute_type = self.get_type('text')
custom_attribute_type = self.types_per_name["text"]
# Set group to 'avalon'
group = self.get_group('avalon')
group = self.get_group("avalon")
data = {}
data['key'] = CustAttrIdKey
data['label'] = cust_attr_label
data['type'] = custom_attribute_type
data['default'] = ''
data['write_security_roles'] = roles
data['read_security_roles'] = roles
data['group'] = group
data['config'] = json.dumps({'markdown': False})
data = {
"key": CustAttrIdKey,
"label": cust_attr_label,
"type": custom_attribute_type,
"default": "",
"write_security_roles": roles,
"read_security_roles": roles,
"group": group,
"is_hierarchical": True,
"entity_type": "show",
"config": json.dumps({"markdown": False})
}
for entity_type in base:
data['entity_type'] = entity_type
self.process_attribute(data)
data['entity_type'] = 'task'
for object_type_id in filtered_types_id:
data['object_type_id'] = str(object_type_id)
self.process_attribute(data)
self.process_attribute(data)
def convert_mongo_id_to_hierarchical(
self, hierarchical_attr, object_type_attrs, session, event
@ -401,11 +430,11 @@ class CustomAttributes(BaseAction):
'Type {} is not valid'.format(attr['type'])
)
type_name = attr['type'].lower()
output['key'] = attr['key']
output['label'] = attr['label']
output['type'] = self.get_type(type_name)
type_name = attr['type'].lower()
output['type'] = self.types_per_name[type_name]
config = None
if type_name == 'number':
@ -500,35 +529,25 @@ class CustomAttributes(BaseAction):
'Found more than one group "{}"'.format(group_name)
)
def query_roles(self):
if self.security_roles is None:
self.security_roles = {}
for role in self.session.query("SecurityRole").all():
key = role["name"].lower()
self.security_roles[key] = role
return self.security_roles
def get_security_roles(self, security_roles):
security_roles = self.query_roles()
security_roles_lowered = tuple(name.lower() for name in security_roles)
if (
len(security_roles_lowered) == 0
or "all" in security_roles_lowered
):
return tuple(security_roles.values())
return list(self.security_roles.values())
output = []
if security_roles_lowered[0] == "except":
excepts = security_roles_lowered[1:]
for role_name, role in security_roles.items():
for role_name, role in self.security_roles.items():
if role_name not in excepts:
output.append(role)
else:
for role_name in security_roles_lowered:
if role_name in security_roles:
output.append(security_roles[role_name])
if role_name in self.security_roles:
output.append(self.security_roles[role_name])
else:
raise CustAttrException((
"Securit role \"{}\" was not found in Ftrack."
@ -593,27 +612,12 @@ class CustomAttributes(BaseAction):
return output
def get_type(self, type_name):
if type_name in self.types:
return self.types[type_name]
query = 'CustomAttributeType where name is "{}"'.format(type_name)
type = self.session.query(query).one()
self.types[type_name] = type
return type
def get_entity_type(self, attr):
if 'is_hierarchical' in attr:
if attr['is_hierarchical'] is True:
type = 'show'
if 'entity_type' in attr:
type = attr['entity_type']
return {
'is_hierarchical': True,
'entity_type': type
}
if attr.get("is_hierarchical", False):
return {
"is_hierarchical": True,
"entity_type": attr.get("entity_type") or "show"
}
if 'entity_type' not in attr:
raise CustAttrException('Missing entity_type')
@ -625,23 +629,16 @@ class CustomAttributes(BaseAction):
raise CustAttrException('Missing object_type')
object_type_name = attr['object_type']
if object_type_name not in self.object_type_ids:
try:
query = 'ObjectType where name is "{}"'.format(
object_type_name
)
object_type_id = self.session.query(query).one()['id']
except Exception:
raise CustAttrException((
'Object type with name "{}" don\'t exist'
).format(object_type_name))
self.object_type_ids[object_type_name] = object_type_id
else:
object_type_id = self.object_type_ids[object_type_name]
object_type_name_low = object_type_name.lower()
object_type = self.object_types_per_name.get(object_type_name_low)
if not object_type:
raise CustAttrException((
'Object type with name "{}" don\'t exist'
).format(object_type_name))
return {
'entity_type': attr['entity_type'],
'object_type_id': object_type_id
'object_type_id': object_type["id"]
}