Merge branch 'master' into pype-templates-integration

This commit is contained in:
Milan Kolar 2018-11-14 13:23:29 +01:00 committed by GitHub
commit ab02f68810
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 3212 additions and 359 deletions

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,131 @@
import sys
import argparse
import logging
import getpass
import ftrack_api
from ftrack_action_handler import BaseAction
class AssetDelete(BaseAction):
'''Custom action.'''
#: Action identifier.
identifier = 'asset.delete'
#: Action label.
label = 'Asset Delete'
def discover(self, session, entities, event):
''' Validation '''
if (len(entities) != 1 or entities[0].entity_type
not in ['Shot', 'Asset Build']):
return False
return True
def interface(self, session, entities, event):
if not event['data'].get('values', {}):
entity = entities[0]
items = []
for asset in entity['assets']:
# get asset name for label
label = 'None'
if asset['name']:
label = asset['name']
items.append({
'label':label,
'name':label,
'value':False,
'type':'boolean'
})
if len(items) < 1:
return {
'success': False,
'message': 'There are no assets to delete'
}
return items
def launch(self, session, entities, event):
entity = entities[0]
# if values were set remove those items
if 'values' in event['data']:
values = event['data']['values']
# get list of assets to delete from form
to_delete = []
for key in values:
if values[key]:
to_delete.append(key)
# delete them by name
for asset in entity['assets']:
if asset['name'] in to_delete:
session.delete(asset)
try:
session.commit()
except:
session.rollback()
raise
return {
'success': True,
'message': 'Asset deleted.'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = AssetDelete(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,103 @@
import sys
import argparse
import logging
import os
import getpass
import ftrack_api
from ftrack_action_handler import BaseAction
class ClientReviewSort(BaseAction):
'''Custom action.'''
#: Action identifier.
identifier = 'client.review.sort'
#: Action label.
label = 'Sort Review'
def discover(self, session, entities, event):
''' Validation '''
if (len(entities) == 0 or entities[0].entity_type != 'ReviewSession'):
return False
return True
def launch(self, session, entities, event):
entity = entities[0]
# Get all objects from Review Session and all 'sort order' possibilities
obj_list = []
sort_order_list = []
for obj in entity['review_session_objects']:
obj_list.append(obj)
sort_order_list.append(obj['sort_order'])
# Sort criteria
obj_list = sorted(obj_list, key=lambda k: k['version'])
obj_list = sorted(obj_list, key=lambda k: k['asset_version']['task']['name'])
obj_list = sorted(obj_list, key=lambda k: k['name'])
# Set 'sort order' to sorted list, so they are sorted in Ftrack also
for i in range(len(obj_list)):
obj_list[i]['sort_order'] = sort_order_list[i]
session.commit()
return {
'success': True,
'message': 'Client Review sorted!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ClientReviewSort(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,122 @@
# :coding: utf-8
# :copyright: Copyright (c) 2015 Milan Kolar
import sys
import argparse
import logging
import getpass
import subprocess
import os
import ftrack_api
from ftrack_action_handler import BaseAction
class ComponentOpen(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'component.open'
# Action label
label = 'Open File'
# Action icon
icon = 'https://cdn4.iconfinder.com/data/icons/rcons-application/32/application_go_run-256.png',
def discover(self, session, entities, event):
''' Validation '''
if len(entities) != 1 or entities[0].entity_type != 'FileComponent':
return False
return True
def launch(self, session, entities, event):
entity = entities[0]
# Return error if component is on ftrack server
if entity['component_locations'][0]['location']['name'] == 'ftrack.server':
return {
'success': False,
'message': "This component is stored on ftrack server!"
}
# Get component filepath
# TODO with locations it will be different???
fpath = entity['component_locations'][0]['resource_identifier']
items = fpath.split(os.sep)
items.pop(-1)
fpath = os.sep.join(items)
if os.path.isdir(fpath):
if 'win' in sys.platform: # windows
subprocess.Popen('explorer "%s"' % fpath)
elif sys.platform == 'darwin': # macOS
subprocess.Popen(['open', fpath])
else: # linux
try:
subprocess.Popen(['xdg-open', fpath])
except OSError:
raise OSError('unsupported xdg-open call??')
else:
return {
'success': False,
'message': "Didn't found file: " + fpath
}
return {
'success': True,
'message': 'Component folder Opened'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ComponentOpen(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -11,6 +11,7 @@ from ftrack_action_handler import BaseAction
from avalon import io, inventory, lib
from avalon.vendor import toml
class AvalonIdAttribute(BaseAction):
'''Edit meta data action.'''
@ -21,139 +22,21 @@ class AvalonIdAttribute(BaseAction):
#: Action description.
description = 'Creates Avalon/Mongo ID for double check'
def validate_selection(self, session, entities):
'''Return if *entities* is a valid selection.'''
# if (len(entities) != 1):
# # If entities contains more than one item return early since
# # metadata cannot be edited for several entites at the same time.
# return False
# entity_type, entity_id = entities[0]
# if (
# entity_type not in session.types
# ):
# # Return False if the target entity does not have a metadata
# # attribute.
# return False
pass
return True
def discover(self, session, entities, event):
'''Return True if action is valid.'''
''' Validation '''
self.logger.info('Got selection: {0}'.format(entities))
return self.validate_selection(session, entities)
# userId = event['source']['user']['id']
# user = session.query('User where id is ' + userId).one()
# if user['user_security_roles'][0]['security_role']['name'] != 'Administrator':
# return False
return True
def importToAvalon(self, session, entity):
eLinks = []
custAttrName = 'avalon_mongo_id'
# TODO read from file, which data are in scope???
# get needed info of entity and all parents
for e in entity['link']:
tmp = session.get(e['type'], e['id'])
if e['name'].find(" ") == -1:
name = e['name']
else:
name = e['name'].replace(" ", "-")
print("Name of "+tmp.entity_type+" - "+e['name']+" was changed to "+name)
eLinks.append({"type": tmp.entity_type, "name": name, "ftrackId": tmp['id']})
entityProj = session.get(eLinks[0]['type'], eLinks[0]['ftrackId'])
# set AVALON_PROJECT env
os.environ["AVALON_PROJECT"] = entityProj["full_name"]
os.environ["AVALON_ASSET"] = entityProj['full_name']
# Get apps from Ftrack / TODO Exceptions?!!!
apps = []
for app in entityProj['custom_attributes']['applications']:
try:
label = toml.load(lib.which_app(app))['label']
apps.append({'name':app, 'label':label})
except Exception as e:
print('Error with application {0} - {1}'.format(app, e))
# Set project Config
config = {
'schema': 'avalon-core:config-1.0',
'tasks': [{'name': ''}],
'apps': apps,
'template': {'work': '','publish':''}
}
# Set project template
template = {"schema": "avalon-core:inventory-1.0"}
# --- Create project and assets in Avalon ---
io.install()
# If project don't exists -> <Create project> ELSE <Update Config>
if (io.find_one(
{'type': 'project', 'name': entityProj['full_name']}) is None):
inventory.save(entityProj['full_name'], config, template)
else:
io.update_many({'type': 'project','name': entityProj['full_name']},
{'$set':{'config':config}})
# Store info about project (FtrackId)
io.update_many({'type': 'project','name': entityProj['full_name']},
{'$set':{'data':{'ftrackId':entityProj['id'],'entityType':entityProj.entity_type}}})
# Store project Id
projectId = io.find_one({"type": "project", "name": entityProj["full_name"]})["_id"]
if custAttrName in entityProj['custom_attributes'] and entityProj['custom_attributes'][custAttrName] is '':
entityProj['custom_attributes'][custAttrName] = str(projectId)
# If entity is Project or have only 1 entity kill action
if (len(eLinks) > 1) and not (eLinks[-1]['type'] in ['Project']):
# TODO how to check if entity is Asset Library or AssetBuild?
silo = 'Assets' if eLinks[-1]['type'] in ['AssetBuild', 'Library'] else 'Film'
os.environ['AVALON_SILO'] = silo
# Create Assets
assets = []
for i in range(1, len(eLinks)):
assets.append(eLinks[i])
folderStruct = []
parentId = None
data = {'visualParent': parentId, 'parents': folderStruct,
'tasks':None, 'ftrackId': None, 'entityType': None}
for asset in assets:
os.environ['AVALON_ASSET'] = asset['name']
data.update({'ftrackId': asset['ftrackId'], 'entityType': asset['type']})
# Get tasks of each asset
assetEnt = session.get('TypedContext', asset['ftrackId'])
tasks = []
for child in assetEnt['children']:
if child.entity_type in ['Task']:
tasks.append(child['name'])
data.update({'tasks': tasks})
if (io.find_one({'type': 'asset', 'name': asset['name']}) is None):
# Create asset in DB
inventory.create_asset(asset['name'], silo, data, projectId)
print("Asset "+asset['name']+" - created")
else:
io.update_many({'type': 'asset','name': asset['name']},
{'$set':{'data':data}})
# TODO check if is asset in same folder!!! ???? FEATURE FOR FUTURE
print("Asset "+asset["name"]+" - already exist")
parentId = io.find_one({'type': 'asset', 'name': asset['name']})['_id']
data.update({'visualParent': parentId, 'parents': folderStruct})
folderStruct.append(asset['name'])
# Set custom attribute to avalon/mongo id of entity (parentID is last)
if custAttrName in entity['custom_attributes'] and entity['custom_attributes'][custAttrName] is '':
entity['custom_attributes'][custAttrName] = str(parentId)
io.uninstall()
def launch(self, session, entities, event):
# JOB SETTINGS
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
@ -161,44 +44,90 @@ class AvalonIdAttribute(BaseAction):
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Synch Ftrack to Avalon.'
'description': 'Custom Attribute creation.'
})
})
session.commit()
try:
print("action <" + self.__class__.__name__ + "> is running")
#TODO It's better to have these env set, are they used anywhere?
os.environ['AVALON_PROJECTS'] = "tmp"
os.environ['AVALON_ASSET'] = "tmp"
os.environ['AVALON_SILO'] = "tmp"
importable = []
# Attribute Name and Label
custAttrName = 'avalon_mongo_id'
custAttrLabel = 'Avalon/Mongo Id'
# Types that don't need object_type_id
base = {'show','asset','assetversion'}
# Don't create custom attribute on these entity types:
exceptions = ['task','milestone','library']
exceptions.extend(base)
# Get all possible object types
all_obj_types = session.query('ObjectType').all()
count_types = len(all_obj_types)
# Filter object types by exceptions
for index in range(count_types):
i = count_types - 1 - index
name = all_obj_types[i]['name'].lower()
def getShotAsset(entity):
if not (entity.entity_type in ['Task']):
if entity not in importable:
importable.append(entity)
if " " in name:
name = name.replace(" ","")
if entity['children']:
childrens = entity['children']
for child in childrens:
getShotAsset(child)
if name in exceptions:
all_obj_types.pop(i)
# get all entities separately
for entity in entities:
entity_type, entity_id = entity
act_ent = session.get(entity_type, entity_id)
getShotAsset(act_ent)
# Get IDs of filtered object types
all_obj_types_id = set()
for obj in all_obj_types:
all_obj_types_id.add(obj['id'])
for e in importable:
self.importToAvalon(session, e)
# Get all custom attributes
current_cust_attr = session.query('CustomAttributeConfiguration').all()
# Filter already existing AvalonMongoID attr.
for attr in current_cust_attr:
if attr['key'] == custAttrName:
if attr['entity_type'] in base:
base.remove(attr['entity_type'])
if attr['object_type_id'] in all_obj_types_id:
all_obj_types_id.remove(attr['object_type_id'])
# Set session back to begin("session.query" raises error on commit)
session.rollback()
# Set security roles for attribute
custAttrSecuRole = session.query('SecurityRole').all()
# Set Text type of Attribute
custom_attribute_type = session.query(
'CustomAttributeType where name is "text"'
).one()
for entity_type in base:
# Create a custom attribute configuration.
session.create('CustomAttributeConfiguration', {
'entity_type': entity_type,
'type': custom_attribute_type,
'label': custAttrLabel,
'key': custAttrName,
'default': '',
'write_security_roles': custAttrSecuRole,
'read_security_roles': custAttrSecuRole,
'config': json.dumps({'markdown': False})
})
for type in all_obj_types_id:
# Create a custom attribute configuration.
session.create('CustomAttributeConfiguration', {
'entity_type': 'task',
'object_type_id': type,
'type': custom_attribute_type,
'label': custAttrLabel,
'key': custAttrName,
'default': '',
'write_security_roles': custAttrSecuRole,
'read_security_roles': custAttrSecuRole,
'config': json.dumps({'markdown': False})
})
job['status'] = 'done'
session.commit()
print('Synchronization to Avalon was successfull!')
except Exception as e:
job['status'] = 'failed'
print('During synchronization to Avalon went something wrong!')
print("Creating custom attributes failed")
print(e)
return True
@ -215,7 +144,6 @@ def register(session, **kw):
action_handler = AvalonIdAttribute(session)
action_handler.register()
print("----- action - <" + action_handler.__class__.__name__ + "> - Has been registered -----")
def main(arguments=None):

View file

@ -0,0 +1,96 @@
import sys
import argparse
import logging
import getpass
import ftrack_api
from ftrack_action_handler import BaseAction
class VersionsCleanup(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'versions.cleanup'
# Action label
label = 'Versions cleanup'
def discover(self, session, entities, event):
''' Validation '''
# Only 1 AssetVersion is allowed
if len(entities) != 1 or entities[0].entity_type != 'AssetVersion':
return False
return True
def launch(self, session, entities, event):
entity = entities[0]
# Go through all versions in asset
for version in entity['asset']['versions']:
if not version['is_published']:
session.delete(version)
try:
session.commit()
except:
session.rollback()
raise
return {
'success': True,
'message': 'removed hidden versions'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = VersionsCleanup(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -20,20 +20,14 @@ class JobKiller(BaseAction):
description = 'Killing all running jobs younger than day'
def validate_selection(self, session, entities):
'''Return if *entities* is a valid selection.'''
pass
def discover(self, session, entities, event):
''' Validation '''
return True
def discover(self, session, entities, event):
'''Return True if action is valid.'''
self.logger.info('Got selection: {0}'.format(entities))
return self.validate_selection(session, entities)
def launch(self, session, entities, event):
""" JOB SETTING """
""" GET JOB """
yesterday = datetime.date.today() - datetime.timedelta(days=1)
@ -48,7 +42,10 @@ class JobKiller(BaseAction):
print('Changing Job ({}) status: {} -> failed'.format(job['id'], job['status']))
job['status'] = 'failed'
session.commit()
try:
session.commit()
except:
session.rollback()
print('All running jobs were killed Successfully!')
return {

View file

@ -0,0 +1,125 @@
import sys
import argparse
import logging
import getpass
import ftrack_api
from ftrack_action_handler import BaseAction
class SetVersion(BaseAction):
'''Custom action.'''
#: Action identifier.
identifier = 'version.set'
#: Action label.
label = 'Version Set'
def discover(self, session, entities, event):
''' Validation '''
# Only 1 AssetVersion is allowed
if len(entities) != 1 or entities[0].entity_type != 'AssetVersion':
return False
return True
def interface(self, session, entities, event):
if not event['data'].get('values', {}):
entity = entities[0]
# Get actual version of asset
act_ver = entity['version']
# Set form
items = [{
'label': 'Version number',
'type': 'number',
'name': 'version_number',
'value': act_ver
}]
return items
def launch(self, session, entities, event):
entity = entities[0]
# Do something with the values or return a new form.
values = event['data'].get('values', {})
# Default is action True
scs = True
msg = 'Version was changed to v{0}'.format(values['version_number'])
if not values['version_number']:
scs = False,
msg = "You didn't enter any version."
elif int(values['version_number']) <= 0:
scs = False
msg = 'Negative or zero version is not valid.'
else:
entity['version'] = values['version_number']
try:
session.commit()
except:
session.rollback()
raise
return {
'success': scs,
'message': msg
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = SetVersion(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -4,8 +4,8 @@ import sys
import argparse
import logging
import os
import json
import ftrack_api
import json
from ftrack_action_handler import BaseAction
from avalon import io, inventory, lib
@ -23,27 +23,11 @@ class SyncToAvalon(BaseAction):
#: Action icon.
icon = 'https://cdn1.iconfinder.com/data/icons/hawcons/32/699650-icon-92-inbox-download-512.png'
def validate_selection(self, session, entities):
'''Return if *entities* is a valid selection.'''
# if (len(entities) != 1):
# # If entities contains more than one item return early since
# # metadata cannot be edited for several entites at the same time.
# return False
# entity_type, entity_id = entities[0]
# if (
# entity_type not in session.types
# ):
# # Return False if the target entity does not have a metadata
# # attribute.
# return False
pass
return True
def discover(self, session, entities, event):
'''Return True if action is valid.'''
''' Validation '''
self.logger.info('Got selection: {0}'.format(entities))
return self.validate_selection(session, entities)
return True
def importToAvalon(self, session, entity):
@ -51,6 +35,7 @@ class SyncToAvalon(BaseAction):
custAttrName = 'avalon_mongo_id'
# TODO read from file, which data are in scope???
# get needed info of entity and all parents
for e in entity['link']:
tmp = session.get(e['type'], e['id'])
if e['name'].find(" ") == -1:
@ -92,6 +77,7 @@ class SyncToAvalon(BaseAction):
# --- Create project and assets in Avalon ---
io.install()
## ----- PROJECT ------
# If project don't exists -> <Create project> ELSE <Update Config>
if (io.find_one({'type': 'project',
'name': entityProj['full_name']}) is None):
@ -112,10 +98,12 @@ class SyncToAvalon(BaseAction):
# If entity is Project or have only 1 entity kill action
if (len(eLinks) > 1) and not (eLinks[-1]['type'] in ['Project']):
## ----- ASSETS ------
# Presets:
# TODO how to check if entity is Asset Library or AssetBuild?
silo = 'Assets' if eLinks[-1]['type'] in ['AssetBuild', 'Library'] else 'Film'
os.environ['AVALON_SILO'] = silo
# Create Assets
# Get list of assets without project
assets = []
for i in range(1, len(eLinks)):
assets.append(eLinks[i])
@ -137,23 +125,32 @@ class SyncToAvalon(BaseAction):
tasks.append(child['name'])
data.update({'tasks': tasks})
if (io.find_one({'type': 'asset', 'name': asset['name']}) is None):
# Create asset in DB
# Try to find asset in current database
avalon_asset = io.find_one({'type': 'asset', 'name': asset['name']})
# Create if don't exists
if avalon_asset is None:
inventory.create_asset(asset['name'], silo, data, projectId)
print("Asset "+asset['name']+" - created")
# Raise error if it seems to be different ent. with same name
elif (avalon_asset['data']['ftrackId'] != data['ftrackId'] or
avalon_asset['data']['visualParent'] != data['visualParent'] or
avalon_asset['data']['parents'] != data['parents']):
raise ValueError('Possibility of entity name duplication: {}'.format(asset['name']))
# Else update info
else:
io.update_many({'type': 'asset','name': asset['name']},
{'$set':{'data':data}})
{'$set':{'data':data, 'silo': silo}})
# TODO check if is asset in same folder!!! ???? FEATURE FOR FUTURE
print("Asset "+asset["name"]+" - already exist")
print("Asset "+asset["name"]+" - updated")
# Get parent ID and store it to data
parentId = io.find_one({'type': 'asset', 'name': asset['name']})['_id']
hierarchy = os.path.sep.join(folderStruct)
data.update({'visualParent': parentId, 'parents': folderStruct,
'hierarchy': hierarchy})
folderStruct.append(asset['name'])
## FTRACK FEATURE - FTRACK MUST HAVE avalon_mongo_id FOR EACH ENTITY TYPE EXCEPT TASK
# Set custom attribute to avalon/mongo id of entity (parentID is last)
if custAttrName in entity['custom_attributes'] and entity['custom_attributes'][custAttrName] is '':
entity['custom_attributes'][custAttrName] = str(parentId)
@ -161,6 +158,8 @@ class SyncToAvalon(BaseAction):
io.uninstall()
def launch(self, session, entities, event):
message = ""
# JOB SETTINGS
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
@ -175,10 +174,8 @@ class SyncToAvalon(BaseAction):
try:
print("action <" + self.__class__.__name__ + "> is running")
#TODO It's better to have these env set, are they used anywhere?
# os.environ['AVALON_PROJECTS'] = "tmp"
os.environ['AVALON_ASSET'] = "tmp"
os.environ['AVALON_SILO'] = "tmp"
#TODO AVALON_PROJECTS, AVALON_ASSET, AVALON_SILO should be set up otherwise console log shows avalon debug
importable = []
def getShotAsset(entity):
@ -191,25 +188,33 @@ class SyncToAvalon(BaseAction):
for child in childrens:
getShotAsset(child)
# get all entities separately
# get all entities separately/unique
for entity in entities:
entity_type, entity_id = entity
act_ent = session.get(entity_type, entity_id)
getShotAsset(act_ent)
getShotAsset(entity)
for e in importable:
self.importToAvalon(session, e)
job['status'] = 'done'
session.commit()
print('Synchronization to Avalon was successfull!')
except Exception as e:
job['status'] = 'failed'
print('During synchronization to Avalon went something wrong!')
print(e)
message = str(e)
return True
if len(message) > 0:
return {
'success': False,
'message': message
}
return {
'success': True,
'message': "Synchronization was successfull"
}
def register(session, **kw):
@ -223,7 +228,6 @@ def register(session, **kw):
action_handler = SyncToAvalon(session)
action_handler.register()
print("----- action - <" + action_handler.__class__.__name__ + "> - Has been registered -----")
def main(arguments=None):

View file

@ -18,26 +18,38 @@ class TestAction(BaseAction):
#: Action identifier.
identifier = 'test.action'
#: Action label.
label = 'Test action'
#: Action description.
description = 'Test action'
def validate_selection(self, session, entities):
'''Return if *entities* is a valid selection.'''
pass
return True
def discover(self, session, entities, event):
'''Return True if action is valid.'''
''' Validation '''
return True
self.logger.info('Got selection: {0}'.format(entities))
return self.validate_selection(session, entities)
def launch(self, session, entities, event):
for entity in entities:
index = 0
name = entity['components'][index]['name']
filetype = entity['components'][index]['file_type']
path = entity['components'][index]['component_locations'][0]['resource_identifier']
# entity['components'][index]['component_locations'][0]['resource_identifier'] = r"C:\Users\jakub.trllo\Desktop\test\exr\int_c022_lighting_v001_main_AO.%04d.exr"
location = entity['components'][0]['component_locations'][0]['location']
component = entity['components'][0]
# print(location.get_filesystem_path(component))
# for k in p:
# print(100*"-")
# print(k)
# print(p[k])
return True

View file

@ -0,0 +1,114 @@
# :coding: utf-8
# :copyright: Copyright (c) 2015 Milan Kolar
import sys
import argparse
import logging
import getpass
import json
import ftrack_api
from ftrack_action_handler import BaseAction
class ThumbToChildren(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'thumb.to.children'
# Action label
label = 'Thumbnail to Children'
# Action icon
icon = "https://cdn3.iconfinder.com/data/icons/transfers/100/239322-download_transfer-128.png"
def discover(self, session, entities, event):
''' Validation '''
if (len(entities) != 1 or entities[0].entity_type in ['Project']):
return False
return True
def launch(self, session, entities, event):
'''Callback method for action.'''
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
job = session.create('Job', {
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Push thumbnails to Childrens'
})
})
try:
for entity in entities:
thumbid = entity['thumbnail_id']
if thumbid:
for child in entity['children']:
child['thumbnail_id'] = thumbid
# inform the user that the job is done
job['status'] = 'done'
except:
# fail the job if something goes wrong
job['status'] = 'failed'
raise
finally:
session.commit()
return {
'success': True,
'message': 'Created job for updating thumbnails!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ThumbToChildren(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,130 @@
# :coding: utf-8
# :copyright: Copyright (c) 2015 Milan Kolar
import sys
import argparse
import logging
import getpass
import json
import ftrack_api
from ftrack_action_handler import BaseAction
class ThumbToParent(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'thumb.to.parent'
# Action label
label = 'Thumbnail to Parent'
# Action icon
icon = "https://cdn3.iconfinder.com/data/icons/transfers/100/239419-upload_transfer-512.png"
def discover(self, session, entities, event):
'''Return action config if triggered on asset versions.'''
if len(entities) <= 0 or entities[0].entity_type in ['Project']:
return False
return True
def launch(self, session, entities, event):
'''Callback method for action.'''
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
job = session.create('Job', {
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Push thumbnails to parents'
})
})
try:
for entity in entities:
parent = None
thumbid = None
if entity.entity_type.lower() == 'assetversion':
try:
parent = entity['task']
except:
par_ent = entity['link'][-2]
parent = session.get(par_ent['type'], par_ent['id'])
else:
try:
parent = entity['parent']
except:
print("Durin Action 'Thumb to Parent' went something wrong")
thumbid = entity['thumbnail_id']
if parent and thumbid:
parent['thumbnail_id'] = thumbid
status = 'done'
else:
status = 'failed'
# inform the user that the job is done
job['status'] = status or 'done'
except:
# fail the job if something goes wrong
job['status'] = 'failed'
raise
finally:
session.commit()
return {
'success': True,
'message': 'Created job for updating thumbnails!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ThumbToParent(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,403 @@
import logging
import subprocess
import sys
import os
import re
from operator import itemgetter
import ftrack_api
class DJVViewAction(object):
"""Launch DJVView action."""
identifier = "djvview-launch-action"
# label = "DJV View"
# icon = "http://a.fsdn.com/allura/p/djv/icon"
def __init__(self, session):
'''Expects a ftrack_api.Session instance'''
self.logger = logging.getLogger(
'{0}.{1}'.format(__name__, self.__class__.__name__)
)
if self.identifier is None:
raise ValueError(
'Action missing identifier.'
)
self.session = session
def is_valid_selection(self, event):
selection = event["data"].get("selection", [])
if not selection:
return
entityType = selection[0]["entityType"]
if entityType not in ["assetversion", "task"]:
return False
return True
def discover(self, event):
"""Return available actions based on *event*. """
if not self.is_valid_selection(event):
return
items = []
applications = self.get_applications()
applications = sorted(
applications, key=lambda application: application["label"]
)
for application in applications:
self.djv_path = application.get("path", None)
applicationIdentifier = application["identifier"]
label = application["label"]
items.append({
"actionIdentifier": self.identifier,
"label": label,
"variant": application.get("variant", None),
"description": application.get("description", None),
"icon": application.get("icon", "default"),
"applicationIdentifier": applicationIdentifier
})
return {
"items": items
}
def register(self):
'''Registers the action, subscribing the discover and launch topics.'''
self.session.event_hub.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
self.session.api_user
), self.discover
)
self.session.event_hub.subscribe(
'topic=ftrack.action.launch and data.actionIdentifier={0} and source.user.username={1}'.format(
self.identifier,
self.session.api_user
),
self.launch
)
print("----- action - <" + self.__class__.__name__ + "> - Has been registered -----")
def get_applications(self):
applications = []
label = "DJVView {version}"
versionExpression = re.compile(r"(?P<version>\d+.\d+.\d+)")
applicationIdentifier = "djvview"
description = "DJV View Launcher"
icon = "http://a.fsdn.com/allura/p/djv/icon"
expression = []
if sys.platform == "win32":
expression = ["C:\\", "Program Files", "djv-\d.+",
"bin", "djv_view.exe"]
elif sys.platform == "darwin":
expression = ["Application", "DJV.app", "Contents", "MacOS", "DJV"]
# Linuxs
else:
expression = ["usr", "local", "djv", "djv_view"]
pieces = expression[:]
start = pieces.pop(0)
if sys.platform == 'win32':
# On Windows C: means current directory so convert roots that look
# like drive letters to the C:\ format.
if start and start[-1] == ':':
start += '\\'
if not os.path.exists(start):
raise ValueError(
'First part "{0}" of expression "{1}" must match exactly to an '
'existing entry on the filesystem.'
.format(start, expression)
)
expressions = list(map(re.compile, pieces))
expressionsCount = len(expression)-1
for location, folders, files in os.walk(start, topdown=True, followlinks=True):
level = location.rstrip(os.path.sep).count(os.path.sep)
expression = expressions[level]
if level < (expressionsCount - 1):
# If not yet at final piece then just prune directories.
folders[:] = [folder for folder in folders
if expression.match(folder)]
else:
# Match executable. Note that on OSX executable might equate to
# a folder (.app).
for entry in folders + files:
match = expression.match(entry)
if match:
# Extract version from full matching path.
path = os.path.join(start, location, entry)
versionMatch = versionExpression.search(path)
if versionMatch:
version = versionMatch.group('version')
applications.append({
'identifier': applicationIdentifier.format(
version=version
),
'path': path,
'version': version,
'label': label.format(version=version),
'icon': icon,
# 'variant': variant.format(version=version),
'description': description
})
else:
self.logger.debug(
'Discovered application executable, but it '
'does not appear to o contain required version '
'information: {0}'.format(path)
)
# Don't descend any further as out of patterns to match.
del folders[:]
return applications
def translate_event(self, session, event):
'''Return *event* translated structure to be used with the API.'''
selection = event['data'].get('selection', [])
entities = list()
for entity in selection:
entities.append(
(session.get(self.get_entity_type(entity), entity.get('entityId')))
)
return entities
def get_entity_type(self, entity):
entity_type = entity.get('entityType').replace('_', '').lower()
for schema in self.session.schemas:
alias_for = schema.get('alias_for')
if (
alias_for and isinstance(alias_for, str) and
alias_for.lower() == entity_type
):
return schema['id']
for schema in self.session.schemas:
if schema['id'].lower() == entity_type:
return schema['id']
raise ValueError(
'Unable to translate entity type: {0}.'.format(entity_type)
)
def launch(self, event):
"""Callback method for DJVView action."""
session = self.session
entities = self.translate_event(session, event)
# Launching application
if "values" in event["data"]:
filename = event['data']['values']['path']
file_type = filename.split(".")[-1]
# TODO Is this proper way?
try:
fps = int(entities[0]['custom_attributes']['fps'])
except:
fps = 24
# TODO issequence is probably already built-in validation in ftrack
isseq = re.findall('%[0-9]*d', filename)
if len(isseq) > 0:
if len(isseq) == 1:
frames = []
padding = re.findall('%[0-9]*d', filename).pop()
index = filename.find(padding)
full_file = filename[0:index-1]
file = full_file.split(os.sep)[-1]
folder = os.path.dirname(full_file)
for fname in os.listdir(path=folder):
if fname.endswith(file_type) and file in fname:
frames.append(int(fname.split(".")[-2]))
if len(frames) > 0:
start = min(frames)
end = max(frames)
range = (padding % start) + '-' + (padding % end)
filename = re.sub('%[0-9]*d', range, filename)
else:
print("")
return {
'success': False,
'message': 'DJV View - Filename has more than one seqence identifier.'
}
cmd = []
# DJV path
cmd.append(os.path.normpath(self.djv_path))
# DJV Options Start ##############################################
# cmd.append('-file_layer (value)') #layer name
cmd.append('-file_proxy 1/2') # Proxy scale: 1/2, 1/4, 1/8
cmd.append('-file_cache True') # Cache: True, False.
# cmd.append('-window_fullscreen') #Start in full screen
# cmd.append("-window_toolbar False") # Toolbar controls: False, True.
# cmd.append("-window_playbar False") # Window controls: False, True.
# cmd.append("-view_grid None") # Grid overlay: None, 1x1, 10x10, 100x100.
# cmd.append("-view_hud True") # Heads up display: True, False.
cmd.append("-playback Forward") # Playback: Stop, Forward, Reverse.
# cmd.append("-playback_frame (value)") # Frame.
cmd.append("-playback_speed " + str(fps))
# cmd.append("-playback_timer (value)") # Timer: Sleep, Timeout. Value: Sleep.
# cmd.append("-playback_timer_resolution (value)") # Timer resolution (seconds): 0.001.
cmd.append("-time_units Frames") # Time units: Timecode, Frames.
# DJV Options End ################################################
# PATH TO COMPONENT
cmd.append(os.path.normpath(filename))
# Run DJV with these commands
subprocess.Popen(' '.join(cmd))
return {
'success': True,
'message': 'DJV View started.'
}
if 'items' not in event["data"]:
event["data"]['items'] = []
try:
for entity in entities:
versions = []
allowed_types = ["img", "mov", "exr"]
if entity.entity_type.lower() == "assetversion":
if entity['components'][0]['file_type'] in allowed_types:
versions.append(entity)
if entity.entity_type.lower() == "task":
# AssetVersions are obtainable only from shot!
shotentity = entity['parent']
for asset in shotentity['assets']:
for version in asset['versions']:
# Get only AssetVersion of selected task
if version['task']['id'] != entity['id']:
continue
# Get only components with allowed type
if version['components'][0]['file_type'] in allowed_types:
versions.append(version)
# Raise error if no components were found
if len(versions) < 1:
raise ValueError('There are no Asset Versions to open.')
for version in versions:
for component in version['components']:
label = "v{0} - {1} - {2}"
label = label.format(
str(version['version']).zfill(3),
version['asset']['type']['name'],
component['name']
)
try:
# TODO This is proper way to get filepath!!!
# THIS WON'T WORK RIGHT NOW
location = component['component_locations'][0]['location']
file_path = location.get_filesystem_path(component)
# if component.isSequence():
# if component.getMembers():
# frame = int(component.getMembers()[0].getName())
# file_path = file_path % frame
except:
# This works but is NOT proper way
file_path = component['component_locations'][0]['resource_identifier']
event["data"]["items"].append(
{"label": label, "value": file_path}
)
except Exception as e:
return {
'success': False,
'message': str(e)
}
return {
"items": [
{
"label": "Items to view",
"type": "enumerator",
"name": "path",
"data": sorted(
event["data"]['items'],
key=itemgetter("label"),
reverse=True
)
}
]
}
def register(session, **kw):
"""Register hooks."""
if not isinstance(session, ftrack_api.session.Session):
return
action = DJVViewAction(session)
action.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,428 @@
import os
import operator
import ftrack_api
import collections
import sys
import json
import base64
# sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# from ftrack_kredenc.lucidity.vendor import yaml
# from ftrack_kredenc import lucidity
#
#
# def get_ftrack_connect_path():
#
# ftrack_connect_root = os.path.abspath(os.getenv('FTRACK_CONNECT_PACKAGE'))
#
# return ftrack_connect_root
#
#
# def from_yaml(filepath):
# ''' Parse a Schema from a YAML file at the given *filepath*.
# '''
# with open(filepath, 'r') as f:
# data = yaml.safe_load(f)
# return data
#
#
# def get_task_enviro(entity, environment=None):
#
# context = get_context(entity)
#
# if not environment:
# environment = {}
#
# for key in context:
# os.environ[key.upper()] = context[key]['name']
# environment[key.upper()] = context[key]['name']
#
# if key == 'Project':
# os.putenv('PROJECT_ROOT', context[key]['root'])
# os.environ['PROJECT_ROOT'] = context[key]['root']
# environment['PROJECT_ROOT'] = context[key]['root']
# print('PROJECT_ROOT: ' + context[key]['root'])
# print(key + ': ' + context[key]['name'])
#
# return environment
#
#
# def get_entity():
# decodedEventData = json.loads(
# base64.b64decode(
# os.environ.get('FTRACK_CONNECT_EVENT')
# )
# )
#
# entity = decodedEventData.get('selection')[0]
#
# if entity['entityType'] == 'task':
# return ftrack_api.Task(entity['entityId'])
# else:
# return None
#
#
# def set_env_vars():
#
# entity = get_entity()
#
# if entity:
# if not os.environ.get('project_root'):
# enviro = get_task_enviro(entity)
#
# print(enviro)
#
#
def get_context(entity):
entityName = entity['name']
entityId = entity['id']
entityType = entity.entity_type
entityDescription = entity['description']
print(100*"*")
for k in entity['ancestors']:
print(k['name'])
print(100*"*")
hierarchy = entity.getParents()
ctx = collections.OrderedDict()
if entity.get('entityType') == 'task' and entityType == 'Task':
taskType = entity.getType().getName()
entityDic = {
'type': taskType,
'name': entityName,
'id': entityId,
'description': entityDescription
}
elif entity.get('entityType') == 'task':
entityDic = {
'name': entityName,
'id': entityId,
'description': entityDescription
}
ctx[entityType] = entityDic
folder_counter = 0
for ancestor in hierarchy:
tempdic = {}
if isinstance(ancestor, ftrack_api.Component):
# Ignore intermediate components.
continue
tempdic['name'] = ancestor.getName()
tempdic['id'] = ancestor.getId()
try:
objectType = ancestor.getObjectType()
tempdic['description'] = ancestor.getDescription()
except AttributeError:
objectType = 'Project'
tempdic['description'] = ''
if objectType == 'Asset Build':
tempdic['type'] = ancestor.getType().get('name')
objectType = objectType.replace(' ', '_')
elif objectType == 'Project':
tempdic['code'] = tempdic['name']
tempdic['name'] = ancestor.get('fullname')
tempdic['root'] = ancestor.getRoot()
if objectType == 'Folder':
objectType = objectType + str(folder_counter)
folder_counter += 1
ctx[objectType] = tempdic
return ctx
def getNewContext(entity):
parents = []
item = entity
while True:
item = item['parent']
if not item:
break
parents.append(item)
ctx = collections.OrderedDict()
entityDic = {
'name': entity['name'],
'id': entity['id'],
}
try:
entityDic['type'] = entity['type']['name']
except:
pass
ctx[entity['object_type']['name']] = entityDic
print(100*"-")
for p in parents:
print(p)
# add all parents to the context
for parent in parents:
tempdic = {}
if not parent.get('project_schema'):
tempdic = {
'name': parent['full_name'],
'code': parent['name'],
'id': parent['id'],
}
tempdic = {
'name': parent['name'],
'id': parent['id'],
}
object_type = parent['object_type']['name']
ctx[object_type] = tempdic
# add project to the context
project = entity['project']
ctx['Project'] = {
'name': project['full_name'],
'code': project['name'],
'id': project['id'],
'root': project['root'],
},
return ctx
#
#
# def get_frame_range():
#
# entity = get_entity()
# entityType = entity.getObjectType()
# environment = {}
#
# if entityType == 'Task':
# try:
# environment['FS'] = str(int(entity.getFrameStart()))
# except Exception:
# environment['FS'] = '1'
# try:
# environment['FE'] = str(int(entity.getFrameEnd()))
# except Exception:
# environment['FE'] = '1'
# else:
# try:
# environment['FS'] = str(int(entity.getFrameStart()))
# except Exception:
# environment['FS'] = '1'
# try:
# environment['FE'] = str(int(entity.getFrameEnd()))
# except Exception:
# environment['FE'] = '1'
#
#
# def get_asset_name_by_id(id):
# for t in ftrack_api.getAssetTypes():
# try:
# if t.get('typeid') == id:
# return t.get('name')
# except:
# return None
#
#
# def get_status_by_name(name):
# statuses = ftrack_api.getTaskStatuses()
#
# result = None
# for s in statuses:
# if s.get('name').lower() == name.lower():
# result = s
#
# return result
#
#
# def sort_types(types):
# data = {}
# for t in types:
# data[t] = t.get('sort')
#
# data = sorted(data.items(), key=operator.itemgetter(1))
# results = []
# for item in data:
# results.append(item[0])
#
# return results
#
#
# def get_next_task(task):
# shot = task.getParent()
# tasks = shot.getTasks()
#
# types_sorted = sort_types(ftrack_api.getTaskTypes())
#
# next_types = None
# for t in types_sorted:
# if t.get('typeid') == task.get('typeid'):
# try:
# next_types = types_sorted[(types_sorted.index(t) + 1):]
# except:
# pass
#
# for nt in next_types:
# for t in tasks:
# if nt.get('typeid') == t.get('typeid'):
# return t
#
# return None
#
#
# def get_latest_version(versions):
# latestVersion = None
# if len(versions) > 0:
# versionNumber = 0
# for item in versions:
# if item.get('version') > versionNumber:
# versionNumber = item.getVersion()
# latestVersion = item
# return latestVersion
#
#
# def get_thumbnail_recursive(task):
# if task.get('thumbid'):
# thumbid = task.get('thumbid')
# return ftrack_api.Attachment(id=thumbid)
# if not task.get('thumbid'):
# parent = ftrack_api.Task(id=task.get('parent_id'))
# return get_thumbnail_recursive(parent)
#
#
# # paths_collected
#
# def getFolderHierarchy(context):
# '''Return structure for *hierarchy*.
# '''
#
# hierarchy = []
# for key in reversed(context):
# hierarchy.append(context[key]['name'])
# print(hierarchy)
#
# return os.path.join(*hierarchy[1:-1])
#
#
def tweakContext(context, include=False):
for key in context:
if key == 'Asset Build':
context['Asset_Build'] = context.pop(key)
key = 'Asset_Build'
description = context[key].get('description')
if description:
context[key]['description'] = '_' + description
hierarchy = []
for key in reversed(context):
hierarchy.append(context[key]['name'])
if include:
hierarchy = os.path.join(*hierarchy[1:])
else:
hierarchy = os.path.join(*hierarchy[1:-1])
context['ft_hierarchy'] = hierarchy
def getSchema(entity):
project = entity['project']
schema = project['project_schema']['name']
tools = os.path.abspath(os.environ.get('studio_tools'))
schema_path = os.path.join(tools, 'studio', 'templates', (schema + '_' + project['name'] + '.yml'))
if not os.path.exists(schema_path):
schema_path = os.path.join(tools, 'studio', 'templates', (schema + '.yml'))
if not os.path.exists(schema_path):
schema_path = os.path.join(tools, 'studio', 'templates', 'default.yml')
schema = lucidity.Schema.from_yaml(schema_path)
print(schema_path)
return schema
# def getAllPathsYaml(entity, root=''):
#
# if isinstance(entity, str) or isinstance(entity, unicode):
# entity = ftrack_api.Task(entity)
#
# context = get_context(entity)
#
# tweakContext(context)
#
# schema = getSchema(entity)
#
# paths = schema.format_all(context)
# paths_collected = []
#
# for path in paths:
# tweak_path = path[0].replace(" ", '_').replace('\'', '').replace('\\', '/')
#
# tempPath = os.path.join(root, tweak_path)
# path = list(path)
# path[0] = tempPath
# paths_collected.append(path)
#
# return paths_collected
#
def getPathsYaml(entity, templateList=None, root=None, **kwargs):
'''
version=None
ext=None
item=None
family=None
subset=None
'''
context = get_context(entity)
if entity.entity_type != 'Task':
tweakContext(context, include=True)
else:
tweakContext(context)
context.update(kwargs)
host = sys.executable.lower()
ext = None
if not context.get('ext'):
if "nuke" in host:
ext = 'nk'
elif "maya" in host:
ext = 'ma'
elif "houdini" in host:
ext = 'hip'
if ext:
context['ext'] = ext
if not context.get('subset'):
context['subset'] = ''
else:
context['subset'] = '_' + context['subset']
schema = getSchema(entity)
paths = schema.format_all(context)
paths_collected = set([])
for temp_mask in templateList:
for path in paths:
if temp_mask in path[1].name:
path = path[0].lower().replace(" ", '_').replace('\'', '').replace('\\', '/')
path_list = path.split('/')
if path_list[0].endswith(':'):
path_list[0] = path_list[0] + os.path.sep
path = os.path.join(*path_list)
temppath = os.path.join(root, path)
paths_collected.add(temppath)
return list(paths_collected)

View file

@ -19,7 +19,6 @@ t = Templates(
)
class AppAction(object):
'''Custom Action base class
@ -53,14 +52,13 @@ class AppAction(object):
self.icon = icon
self.description = description
@property
def session(self):
'''Return current session.'''
return self._session
def register(self):
'''Registers the action, subscribing the the discover and launch topics.'''
'''Registers the action, subscribing the discover and launch topics.'''
self.session.event_hub.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
self.session.api_user
@ -85,6 +83,7 @@ class AppAction(object):
)
if accepts:
self.logger.info('Selection is valid')
return {
'items': [{
'label': self.label,
@ -94,17 +93,18 @@ class AppAction(object):
'icon': self.icon,
}]
}
else:
self.logger.info('Selection is _not_ valid')
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
*entities* is a list of tuples each containing the entity type and
the entity id. If the entity is a hierarchical you will always get
the entity type TypedContext, once retrieved through a get operation
you will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
@ -221,6 +221,7 @@ class AppAction(object):
*event* the unmodified original event
'''
# TODO Delete this line
print("Action - {0} ({1}) - just started".format(self.label, self.identifier))
@ -228,8 +229,8 @@ class AppAction(object):
entity = session.get(entity, id)
silo = "Film"
if entity.entity_type=="AssetBuild":
silo= "Asset"
if entity.entity_type == "AssetBuild":
silo = "Asset"
# set environments for Avalon
os.environ["AVALON_PROJECT"] = entity['project']['full_name']
@ -239,24 +240,22 @@ class AppAction(object):
os.environ["AVALON_APP"] = self.identifier
os.environ["AVALON_APP_NAME"] = self.identifier + "_" + self.variant
anatomy = t.anatomy
io.install()
hierarchy = io.find_one({"type":'asset', "name":entity['parent']['name']})['data']['parents']
hierarchy = io.find_one({"type": 'asset', "name": entity['parent']['name']})['data']['parents']
io.uninstall()
if hierarchy:
# hierarchy = os.path.sep.join(hierarchy)
hierarchy = os.path.join(*hierarchy)
data = { "project": {"name": entity['project']['full_name'],
data = {"project": {"name": entity['project']['full_name'],
"code": entity['project']['name']},
"task": entity['name'],
"asset": entity['parent']['name'],
"hierarchy": hierarchy}
"task": entity['name'],
"asset": entity['parent']['name'],
"hierarchy": hierarchy}
anatomy = anatomy.format(data)
os.environ["AVALON_WORKDIR"] = os.path.join(anatomy.work.root, anatomy.work.folder)
# TODO Add paths to avalon setup from tomls
@ -309,6 +308,13 @@ class AppAction(object):
'message': "We didn't found launcher for {0}".format(self.label)
}
# RUN TIMER IN FTRACK
username = event['source']['user']['username']
user = session.query('User where username is "{}"'.format(username)).one()
task = session.query('Task where id is {}'.format(entity['id'])).one()
print('Starting timer for task: ' + task['name'])
user.start_timer(task, force=True)
return {
'success': True,
'message': "Launching {0}".format(self.label)
@ -424,6 +430,7 @@ class BaseAction(object):
),
self._launch
)
print("----- action - <" + self.__class__.__name__ + "> - Has been registered -----")
def _discover(self, event):
args = self._translate_event(
@ -435,6 +442,7 @@ class BaseAction(object):
)
if accepts:
self.logger.info(u'Discovering action with selection: {0}'.format(args[1]['data'].get('selection', [])))
return {
'items': [{
'label': self.label,
@ -472,7 +480,8 @@ class BaseAction(object):
for entity in _selection:
_entities.append(
(
self._get_entity_type(entity), entity.get('entityId')
session.get(self._get_entity_type(entity), entity.get('entityId'))
# self._get_entity_type(entity), entity.get('entityId')
)
)

View file

@ -1,51 +0,0 @@
import ftrack_utils
import ftrack_api
session = ftrack_api.Session(
server_url="https://pype.ftrackapp.com",
api_key="4e01eda0-24b3-4451-8e01-70edc03286be",
api_user="jakub.trllo",
)
objTypes = set()
# TODO get all entity types ---- NOT TASK,MILESTONE,LIBRARY --> should be editable!!!
allObjTypes = session.query('ObjectType').all()
for object in range(len(allObjTypes)):
index = len(allObjTypes)-object-1
if (str(allObjTypes[index]['name']) in ['Task','Milestone','Library']):
allObjTypes.pop(index)
for k in allObjTypes:
print(k['name'])
# Name & Label for export Avalon-mongo ID to Ftrack
# allCustAttr = session.query('CustomAttributeConfiguration').all()
# curCustAttr = []
# for ca in allCustAttr:
# curCustAttr.append(ca['key'])
#
# custAttrName = 'avalon_mongo_id'
# custAttrLabel = 'Avalon/Mongo Id'
# custAttrType = session.query('CustomAttributeType where name is "text"').one()
# # TODO WHICH SECURITY ROLE IS RIGHT
# custAttrSecuRole = session.query('SecurityRole').all()
# for custAttrObjType in objTypes:
# # Create Custom attribute if not exists
# if custAttrName not in curCustAttr:
# session.create('CustomAttributeConfiguration', {
# 'entity_type': 'task',
# 'object_type_id': custAttrObjType['id'],
# 'type': custAttrType,
# 'label': custAttrLabel,
# 'key': custAttrName,
# 'default': '',
# 'write_security_roles': custAttrSecuRole,
# 'read_security_roles': custAttrSecuRole,
# 'config': json.dumps({'markdown': False}),
# })
# session.commit()

View file

@ -0,0 +1,65 @@
import os
import toml
import ftrack_api
import appdirs
config_path = os.path.normpath(appdirs.user_data_dir('pype-app','pype'))
config_name = 'ftrack_cred.toml'
fpath = os.path.join(config_path, config_name)
folder = os.path.dirname(fpath)
if not os.path.isdir(folder):
os.makedirs(folder)
def _get_credentials():
folder = os.path.dirname(fpath)
if not os.path.isdir(folder):
os.makedirs(folder)
try:
file = open(fpath, 'r')
except:
filecreate = open(fpath, 'w')
filecreate.close()
file = open(fpath, 'r')
credentials = toml.load(file)
file.close()
return credentials
def _save_credentials(username, apiKey):
file = open(fpath, 'w')
data = {
'username':username,
'apiKey':apiKey
}
credentials = toml.dumps(data)
file.write(credentials)
file.close()
def _clear_credentials():
file = open(fpath, 'w').close()
def _set_env(username, apiKey):
os.environ['FTRACK_API_USER'] = username
os.environ['FTRACK_API_KEY'] = apiKey
def _check_credentials(username=None, apiKey=None):
if username and apiKey:
_set_env(username, apiKey)
try:
session = ftrack_api.Session()
session.close()
except Exception as e:
print(e)
return False
return True

56
pype/ftrack/ftrackRun.py Normal file
View file

@ -0,0 +1,56 @@
import sys
import os
import argparse
from app.lib.utils import forward
from pype.ftrack import credentials, login_dialog as login_dialog
# Validation if alredy logged into Ftrack
def validate():
validation = False
cred = credentials._get_credentials()
if 'username' in cred and 'apiKey' in cred:
validation = credentials._check_credentials(
cred['username'],
cred['apiKey']
)
if validation is False:
login_dialog.run_login()
else:
login_dialog.run_login()
validation = credentials._check_credentials()
if not validation:
print("We are unable to connect to Ftrack")
sys.exit()
# Entered arguments
parser = argparse.ArgumentParser()
parser.add_argument("--actionserver", action="store_true",
help="launch action server for ftrack")
parser.add_argument("--eventserver", action="store_true",
help="launch action server for ftrack")
parser.add_argument("--logout", action="store_true",
help="launch action server for ftrack")
kwargs, args = parser.parse_known_args()
if kwargs.logout:
credentials._clear_credentials()
sys.exit()
else:
validate()
if kwargs.eventserver:
fname = os.path.join(os.environ["FTRACK_ACTION_SERVER"], "eventServer.py")
returncode = forward([
sys.executable, "-u", fname
])
else:
fname = os.path.join(os.environ["FTRACK_ACTION_SERVER"], "actionServer.py")
returncode = forward([
sys.executable, "-u", fname
])
sys.exit(returncode)

View file

@ -1,62 +1,14 @@
# fttrack help functions
# import ftrack
import ftrack_api
import os
from pprint import *
def deleteAssetsForTask(taskId):
#taskId = os.environ['FTRACK_TASKID']
task = ftrack.Task(taskId)
def checkLogin():
# check Environments FTRACK_API_USER, FTRACK_API_KEY
pass
taskAssets = task.getAssets()
print(taskAssets)
for a in taskAssets:
print(a.getName())
a.delete()
#shot = task.getParent()
#shotAssets = shot.getAssets()
def deleteAssetsFromShotByName(shotId, assNm=None):
if not assNm:
return
shot = ftrack.Task(shotId)
shotAssets = shot.getAssets()
for a in shotAssets:
nm = a.getName()
if nm == assNm:
a.delete()
# Created as action
def killRunningTasks(tm=None):
import datetime
import ftrack_api
session = ftrack_api.Session()
# Query all jobs created prior to yesterday which has not yet been completed.
yesterday = datetime.date.today() - datetime.timedelta(days=1)
if tm:
yesterday = tm
print(yesterday)
jobs = session.query(
'select id, status from Job '
'where status in ("queued", "running") and created_at > {0}'.format(yesterday)
)
# Update all the queried jobs, setting the status to failed.
for job in jobs:
print(job['created_at'])
print('Changing Job ({}) status: {} -> failed'.format(job['id'], job['status']))
job['status'] = 'failed'
session.commit()
print('Complete')
def checkRegex():
# _handle_result -> would be solution?
@ -83,3 +35,122 @@ def checkRegex():
'message': 'Entity name contains invalid character!'
}
)
def get_context(entity):
parents = []
item = entity
while True:
item = item['parent']
if not item:
break
parents.append(item)
ctx = collections.OrderedDict()
folder_counter = 0
entityDic = {
'name': entity['name'],
'id': entity['id'],
}
try:
entityDic['type'] = entity['type']['name']
except:
pass
ctx[entity['object_type']['name']] = entityDic
# add all parents to the context
for parent in parents:
tempdic = {}
if not parent.get('project_schema'):
tempdic = {
'name': parent['name'],
'id': parent['id'],
}
object_type = parent['object_type']['name']
if object_type == 'Folder':
object_type = object_type + str(folder_counter)
folder_counter += 1
ctx[object_type] = tempdic
# add project to the context
project = entity['project']
ctx['Project'] = {
'name': project['full_name'],
'code': project['name'],
'id': project['id'],
'root': project['root']
}
return ctx
def get_status_by_name(name):
statuses = ftrack.getTaskStatuses()
result = None
for s in statuses:
if s.get('name').lower() == name.lower():
result = s
return result
def sort_types(types):
data = {}
for t in types:
data[t] = t.get('sort')
data = sorted(data.items(), key=operator.itemgetter(1))
results = []
for item in data:
results.append(item[0])
return results
def get_next_task(task):
shot = task.getParent()
tasks = shot.getTasks()
types_sorted = sort_types(ftrack.getTaskTypes())
next_types = None
for t in types_sorted:
if t.get('typeid') == task.get('typeid'):
try:
next_types = types_sorted[(types_sorted.index(t) + 1):]
except:
pass
for nt in next_types:
for t in tasks:
if nt.get('typeid') == t.get('typeid'):
return t
return None
def get_latest_version(versions):
latestVersion = None
if len(versions) > 0:
versionNumber = 0
for item in versions:
if item.get('version') > versionNumber:
versionNumber = item.getVersion()
latestVersion = item
return latestVersion
def get_thumbnail_recursive(task):
if task.get('thumbid'):
thumbid = task.get('thumbid')
return ftrack.Attachment(id=thumbid)
if not task.get('thumbid'):
parent = ftrack.Task(id=task.get('parent_id'))
return get_thumbnail_recursive(parent)

302
pype/ftrack/login_dialog.py Normal file
View file

@ -0,0 +1,302 @@
import sys
import os
import requests
from PyQt5 import QtCore, QtGui, QtWidgets
from app import style
from . import credentials, login_tools
class Login_Dialog_ui(QtWidgets.QWidget):
SIZE_W = 300
SIZE_H = 230
loginSignal = QtCore.pyqtSignal(object, object, object)
_login_server_thread = None
inputs = []
buttons = []
labels = []
def __init__(self):
super().__init__()
self.loginSignal.connect(self.loginWithCredentials)
self._translate = QtCore.QCoreApplication.translate
self.font = QtGui.QFont()
self.font.setFamily("DejaVu Sans Condensed")
self.font.setPointSize(9)
self.font.setBold(True)
self.font.setWeight(50)
self.font.setKerning(True)
self.resize(self.SIZE_W, self.SIZE_H)
self.setMinimumSize(QtCore.QSize(self.SIZE_W, self.SIZE_H))
self.setStyleSheet(style.load_stylesheet())
self.setLayout(self._main())
self.setWindowTitle('FTrack Login')
self.show()
def _main(self):
self.main = QtWidgets.QVBoxLayout()
self.main.setObjectName("main")
self.form = QtWidgets.QFormLayout()
self.form.setContentsMargins(10, 15, 10, 5)
self.form.setObjectName("form")
self.ftsite_label = QtWidgets.QLabel("FTrack URL:")
self.ftsite_label.setFont(self.font)
self.ftsite_label.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
self.ftsite_label.setTextFormat(QtCore.Qt.RichText)
self.ftsite_label.setObjectName("user_label")
self.ftsite_input = QtWidgets.QLineEdit()
self.ftsite_input.setEnabled(True)
self.ftsite_input.setFrame(True)
self.ftsite_input.setEnabled(False)
self.ftsite_input.setReadOnly(True)
self.ftsite_input.setObjectName("ftsite_input")
self.user_label = QtWidgets.QLabel("Username:")
self.user_label.setFont(self.font)
self.user_label.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
self.user_label.setTextFormat(QtCore.Qt.RichText)
self.user_label.setObjectName("user_label")
self.user_input = QtWidgets.QLineEdit()
self.user_input.setEnabled(True)
self.user_input.setFrame(True)
self.user_input.setObjectName("user_input")
self.user_input.setPlaceholderText(self._translate("main","user.name"))
self.user_input.textChanged.connect(self._user_changed)
self.api_label = QtWidgets.QLabel("API Key:")
self.api_label.setFont(self.font)
self.api_label.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
self.api_label.setTextFormat(QtCore.Qt.RichText)
self.api_label.setObjectName("api_label")
self.api_input = QtWidgets.QLineEdit()
self.api_input.setEnabled(True)
self.api_input.setFrame(True)
self.api_input.setObjectName("api_input")
self.api_input.setPlaceholderText(self._translate("main","e.g. xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"))
self.api_input.textChanged.connect(self._api_changed)
self.error_label = QtWidgets.QLabel("")
self.error_label.setFont(self.font)
self.error_label.setTextFormat(QtCore.Qt.RichText)
self.error_label.setObjectName("error_label")
self.error_label.setWordWrap(True);
self.error_label.hide()
self.form.addRow(self.ftsite_label, self.ftsite_input)
self.form.addRow(self.user_label, self.user_input)
self.form.addRow(self.api_label,self.api_input)
self.form.addRow(self.error_label)
self.btnGroup = QtWidgets.QHBoxLayout()
self.btnGroup.addStretch(1)
self.btnGroup.setObjectName("btnGroup")
self.btnEnter = QtWidgets.QPushButton("Login")
self.btnEnter.setToolTip('Set Username and API Key with entered values')
self.btnEnter.clicked.connect(self.enter_credentials)
self.btnClose = QtWidgets.QPushButton("Close")
self.btnClose.setToolTip('Close this window')
self.btnClose.clicked.connect(self._close_widget)
self.btnFtrack = QtWidgets.QPushButton("Ftrack")
self.btnFtrack.setToolTip('Open browser for Login to Ftrack')
self.btnFtrack.clicked.connect(self.open_ftrack)
self.btnGroup.addWidget(self.btnFtrack)
self.btnGroup.addWidget(self.btnEnter)
self.btnGroup.addWidget(self.btnClose)
self.main.addLayout(self.form)
self.main.addLayout(self.btnGroup)
self.inputs.append(self.api_input)
self.inputs.append(self.user_input)
self.inputs.append(self.ftsite_input)
self.enter_site()
return self.main
def enter_site(self):
try:
url = os.getenv('FTRACK_SERVER')
newurl = self.checkUrl(url)
if newurl is None:
self.btnEnter.setEnabled(False)
self.btnFtrack.setEnabled(False)
for input in self.inputs:
input.setEnabled(False)
newurl = url
self.ftsite_input.setText(newurl)
except Exception as e:
self.setError("FTRACK_SERVER is not set in templates")
self.btnEnter.setEnabled(False)
self.btnFtrack.setEnabled(False)
for input in self.inputs:
input.setEnabled(False)
def setError(self, msg):
self.error_label.setText(msg)
self.error_label.show()
def _user_changed(self):
self.user_input.setStyleSheet("")
def _api_changed(self):
self.api_input.setStyleSheet("")
def _invalid_input(self,entity):
entity.setStyleSheet("border: 1px solid red;")
def enter_credentials(self):
user = self.user_input.text().strip()
api = self.api_input.text().strip()
msg = "You didn't enter "
missing = []
if user == "":
missing.append("Username")
self._invalid_input(self.user_input)
if api == "":
missing.append("API Key")
self._invalid_input(self.api_input)
if len(missing) > 0:
self.setError("{0} {1}".format(msg, " and ".join(missing)))
return
verification = credentials._check_credentials(user, api)
if verification:
credentials._save_credentials(username, apiKey)
credentials._set_env(username, apiKey)
self._close_widget()
else:
self._invalid_input(self.user_input)
self._invalid_input(self.api_input)
self.setError("We're unable to sign in to Ftrack with these credentials")
def open_ftrack(self):
url = self.ftsite_input.text()
self.loginWithCredentials(url,None,None)
def checkUrl(self, url):
url = url.strip('/ ')
if not url:
self.setError("There is no URL set in Templates")
return
if not 'http' in url:
if url.endswith('ftrackapp.com'):
url = 'https://' + url
else:
url = 'https://{0}.ftrackapp.com'.format(url)
try:
result = requests.get(
url,
allow_redirects=False # Old python API will not work with redirect.
)
except requests.exceptions.RequestException:
self.setError(
'The server URL set in Templates could not be reached.'
)
return
if (
result.status_code != 200 or 'FTRACK_VERSION' not in result.headers
):
self.setError(
'The server URL set in Templates is not a valid ftrack server.'
)
return
return url
def loginWithCredentials(self, url, username, apiKey):
url = url.strip('/ ')
if not url:
self.setError(
'You need to specify a valid server URL, '
'for example https://server-name.ftrackapp.com'
)
return
if not 'http' in url:
if url.endswith('ftrackapp.com'):
url = 'https://' + url
else:
url = 'https://{0}.ftrackapp.com'.format(url)
try:
result = requests.get(
url,
allow_redirects=False # Old python API will not work with redirect.
)
except requests.exceptions.RequestException:
self.setError(
'The server URL you provided could not be reached.'
)
return
if (
result.status_code != 200 or 'FTRACK_VERSION' not in result.headers
):
self.setError(
'The server URL you provided is not a valid ftrack server.'
)
return
# If there is an existing server thread running we need to stop it.
if self._login_server_thread:
self._login_server_thread.quit()
self._login_server_thread = None
# If credentials are not properly set, try to get them using a http
# server.
if not username or not apiKey:
self._login_server_thread = login_tools.LoginServerThread()
self._login_server_thread.loginSignal.connect(self.loginSignal)
self._login_server_thread.start(url)
return
verification = credentials._check_credentials(username, apiKey)
if verification is True:
credentials._save_credentials(username, apiKey)
credentials._set_env(username, apiKey)
self._close_widget()
def _close_widget(self):
self.close()
class Login_Dialog(Login_Dialog_ui):
def __init__(self):
super(Login_Dialog, self).__init__()
def getApp():
return QtWidgets.QApplication(sys.argv)
def run_login():
app = getApp()
ui = Login_Dialog()
ui.show()
app.exec_()

View file

@ -0,0 +1,120 @@
import os
import sys
import requests
import argparse
from pprint import pprint
from PyQt5 import QtCore, QtWidgets
from app import style
from . import credentials, login_tools
class Login_Dialog(QtWidgets.QWidget):
loginSignal = QtCore.pyqtSignal(object, object, object)
_login_server_thread = None
def __init__(self):
super().__init__()
self.loginSignal.connect(self.loginWithCredentials)
def run(self):
try:
url = os.getenv('FTRACK_SERVER')
except:
print("Environment variable 'FTRACK_SERVER' is not set.")
return
self.url = self.checkUrl(url)
self.open_ftrack()
def open_ftrack(self):
self.loginWithCredentials(self.url, None, None)
def checkUrl(self, url):
url = url.strip('/ ')
if not url:
print("Url is empty!")
return
if not 'http' in url:
if url.endswith('ftrackapp.com'):
url = 'https://' + url
else:
url = 'https://{0}.ftrackapp.com'.format(url)
try:
result = requests.get(
url,
allow_redirects=False # Old python API will not work with redirect.
)
except requests.exceptions.RequestException:
print('The server URL set in Templates could not be reached.')
return
if (
result.status_code != 200 or 'FTRACK_VERSION' not in result.headers
):
print('The server URL set in Templates is not a valid ftrack server.')
return
return url
def loginWithCredentials(self, url, username, apiKey):
url = url.strip('/ ')
if not url:
print(
'You need to specify a valid server URL, '
'for example https://server-name.ftrackapp.com'
)
return
if not 'http' in url:
if url.endswith('ftrackapp.com'):
url = 'https://' + url
else:
url = 'https://{0}.ftrackapp.com'.format(url)
try:
result = requests.get(
url,
allow_redirects=False # Old python API will not work with redirect.
)
except requests.exceptions.RequestException:
print('The server URL you provided could not be reached.')
return
if (
result.status_code != 200 or 'FTRACK_VERSION' not in result.headers
):
print('The server URL you provided is not a valid ftrack server.')
return
# If there is an existing server thread running we need to stop it.
if self._login_server_thread:
self._login_server_thread.quit()
self._login_server_thread = None
# If credentials are not properly set, try to get them using a http
# server.
if not username or not apiKey:
self._login_server_thread = login_tools.LoginServerThread()
self._login_server_thread.loginSignal.connect(self.loginSignal)
self._login_server_thread.start(url)
verification = credentials._check_credentials(username, apiKey)
if verification is True:
credentials._save_credentials(username, apiKey)
credentials._set_env(username, apiKey)
self.close()
def run_login():
app = QtWidgets.QApplication(sys.argv)
applogin = Login_Dialog()
applogin.run()
app.exec_()
if __name__ == '__main__':
run_login()

111
pype/ftrack/login_tools.py Normal file
View file

@ -0,0 +1,111 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib import parse
import webbrowser
import functools
from PyQt5 import QtCore
# class LoginServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class LoginServerHandler(BaseHTTPRequestHandler):
'''Login server handler.'''
def __init__(self, login_callback, *args, **kw):
'''Initialise handler.'''
self.login_callback = login_callback
BaseHTTPRequestHandler.__init__(self, *args, **kw)
def do_GET(self):
'''Override to handle requests ourselves.'''
parsed_path = parse.urlparse(self.path)
query = parsed_path.query
api_user = None
api_key = None
login_credentials = None
if 'api_user' and 'api_key' in query:
login_credentials = parse.parse_qs(query)
api_user = login_credentials['api_user'][0]
api_key = login_credentials['api_key'][0]
message = """
<html>
<style type="text/css">
body {{
background-color: #333;
text-align: center;
color: #ccc;
margin-top: 200px;
}}
h1 {{
font-family: "DejaVu Sans";
font-size: 36px;
margin: 20px 0;
}}
h3 {{
font-weight: normal;
font-family: "DejaVu Sans";
margin: 30px 10px;
}}
em {{
color: #fff;
}}
</style>
<body>
<h1>Sign in to Ftrack was successful</h1>
<h3>
You signed in with username <em>{0}</em>.
</h3>
<h3>
You can close this window now.
</h3>
</body>
</html>
""".format(api_user)
else:
message = '<h1>Failed to sign in</h1>'
self.send_response(200)
self.end_headers()
self.wfile.write(message.encode())
if login_credentials:
self.login_callback(
api_user,
api_key
)
class LoginServerThread(QtCore.QThread):
'''Login server thread.'''
# Login signal.
loginSignal = QtCore.pyqtSignal(object, object, object)
def start(self, url):
'''Start thread.'''
self.url = url
super(LoginServerThread, self).start()
def _handle_login(self, api_user, api_key):
'''Login to server with *api_user* and *api_key*.'''
self.loginSignal.emit(self.url, api_user, api_key)
def run(self):
'''Listen for events.'''
# self._server = BaseHTTPServer.HTTPServer(
self._server = HTTPServer(
('localhost', 0),
functools.partial(
LoginServerHandler, self._handle_login
)
)
webbrowser.open_new_tab(
'{0}/user/api_credentials?redirect_url=http://localhost:{1}'.format(
self.url, self._server.server_port
)
)
self._server.handle_request()

View file

@ -1,16 +0,0 @@
# import ftrack_api as local session
import ftrack_api
#
session = ftrack_api.Session()
# ----------------------------------
def test_event(event):
'''just a testing event'''
# start of event procedure ----------------------------------
for entity in event['data'].get('entities', []):
print(100*"_")
print(entity['changes'])
# end of event procedure ----------------------------------

View file

@ -1,16 +0,0 @@
# import ftrack_api as local session
import ftrack_api
#
session = ftrack_api.Session()
# ----------------------------------
def test_event(event):
'''just a testing event'''
# start of event procedure ----------------------------------
for entity in event['data'].get('entities', []):
print(100*"_")
print(entity['keys'])
# end of event procedure ----------------------------------

608
pype/vendor/appdirs.py vendored Normal file
View file

@ -0,0 +1,608 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2005-2010 ActiveState Software Inc.
# Copyright (c) 2013 Eddy Petrișor
"""Utilities for determining application-specific dirs.
See <http://github.com/ActiveState/appdirs> for details and usage.
"""
# Dev Notes:
# - MSDN on where to store app data files:
# http://support.microsoft.com/default.aspx?scid=kb;en-us;310294#XSLTH3194121123120121120120
# - Mac OS X: http://developer.apple.com/documentation/MacOSX/Conceptual/BPFileSystem/index.html
# - XDG spec for Un*x: http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
__version_info__ = (1, 4, 3)
__version__ = '.'.join(map(str, __version_info__))
import sys
import os
PY3 = sys.version_info[0] == 3
if PY3:
unicode = str
if sys.platform.startswith('java'):
import platform
os_name = platform.java_ver()[3][0]
if os_name.startswith('Windows'): # "Windows XP", "Windows 7", etc.
system = 'win32'
elif os_name.startswith('Mac'): # "Mac OS X", etc.
system = 'darwin'
else: # "Linux", "SunOS", "FreeBSD", etc.
# Setting this to "linux2" is not ideal, but only Windows or Mac
# are actually checked for and the rest of the module expects
# *sys.platform* style strings.
system = 'linux2'
else:
system = sys.platform
def user_data_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: ~/Library/Application Support/<AppName>
Unix: ~/.local/share/<AppName> # or in $XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\Application Data\<AppAuthor>\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>
Win 7 (not roaming): C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>
Win 7 (roaming): C:\Users\<username>\AppData\Roaming\<AppAuthor>\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if system == "win32":
if appauthor is None:
appauthor = appname
const = roaming and "CSIDL_APPDATA" or "CSIDL_LOCAL_APPDATA"
path = os.path.normpath(_get_win_folder(const))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('~/Library/Application Support/')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_DATA_HOME', os.path.expanduser("~/.local/share"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_data_dir(appname=None, appauthor=None, version=None, multipath=False):
r"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of data dirs should be
returned. By default, the first item from XDG_DATA_DIRS is
returned, or '/usr/local/share/<AppName>',
if XDG_DATA_DIRS is not set
Typical site data directories are:
Mac OS X: /Library/Application Support/<AppName>
Unix: /usr/local/share/<AppName> or /usr/share/<AppName>
Win XP: C:\Documents and Settings\All Users\Application Data\<AppAuthor>\<AppName>
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
Win 7: C:\ProgramData\<AppAuthor>\<AppName> # Hidden, but writeable on Win 7.
For Unix, this is using the $XDG_DATA_DIRS[0] default.
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('/Library/Application Support')
if appname:
path = os.path.join(path, appname)
else:
# XDG default for $XDG_DATA_DIRS
# only first, if multipath is False
path = os.getenv('XDG_DATA_DIRS',
os.pathsep.join(['/usr/local/share', '/usr/share']))
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
if appname and version:
path = os.path.join(path, version)
return path
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user config directories are:
Mac OS X: same as user_data_dir
Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by default "~/.config/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_config_dir(appname=None, appauthor=None, version=None, multipath=False):
r"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of config dirs should be
returned. By default, the first item from XDG_CONFIG_DIRS is
returned, or '/etc/xdg/<AppName>', if XDG_CONFIG_DIRS is not set
Typical site config directories are:
Mac OS X: same as site_data_dir
Unix: /etc/xdg/<AppName> or $XDG_CONFIG_DIRS[i]/<AppName> for each value in
$XDG_CONFIG_DIRS
Win *: same as site_data_dir
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
For Unix, this is using the $XDG_CONFIG_DIRS[0] default, if multipath=False
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system in ["win32", "darwin"]:
path = site_data_dir(appname, appauthor)
if appname and version:
path = os.path.join(path, version)
else:
# XDG default for $XDG_CONFIG_DIRS
# only first, if multipath is False
path = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
def user_cache_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Cache" to the base app data dir for Windows. See
discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Cache
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go in
the `CSIDL_LOCAL_APPDATA` directory. This is identical to the non-roaming
app data dir (the default returned by `user_data_dir` above). Apps typically
put cache data somewhere *under* the given dir here. Some examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
This can be disabled with the `opinion=False` option.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_LOCAL_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
if opinion:
path = os.path.join(path, "Cache")
elif system == 'darwin':
path = os.path.expanduser('~/Library/Caches')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def user_state_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific state dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user state directories are:
Mac OS X: same as user_data_dir
Unix: ~/.local/state/<AppName> # or in $XDG_STATE_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow this Debian proposal <https://wiki.debian.org/XDGBaseDirectorySpecification#state>
to extend the XDG spec and support $XDG_STATE_HOME.
That means, by default "~/.local/state/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_STATE_HOME', os.path.expanduser("~/.local/state"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def user_log_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific log dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Logs" to the base app data dir for Windows, and "log" to the
base cache dir for Unix. See discussion below.
Typical user log directories are:
Mac OS X: ~/Library/Logs/<AppName>
Unix: ~/.cache/<AppName>/log # or under $XDG_CACHE_HOME if defined
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Logs
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Logs
On Windows the only suggestion in the MSDN docs is that local settings
go in the `CSIDL_LOCAL_APPDATA` directory. (Note: I'm interested in
examples of what some windows apps use for a logs dir.)
OPINION: This function appends "Logs" to the `CSIDL_LOCAL_APPDATA`
value for Windows and appends "log" to the user cache dir for Unix.
This can be disabled with the `opinion=False` option.
"""
if system == "darwin":
path = os.path.join(
os.path.expanduser('~/Library/Logs'),
appname)
elif system == "win32":
path = user_data_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "Logs")
else:
path = user_cache_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "log")
if appname and version:
path = os.path.join(path, version)
return path
class AppDirs(object):
"""Convenience wrapper for getting application dirs."""
def __init__(self, appname=None, appauthor=None, version=None,
roaming=False, multipath=False):
self.appname = appname
self.appauthor = appauthor
self.version = version
self.roaming = roaming
self.multipath = multipath
@property
def user_data_dir(self):
return user_data_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_data_dir(self):
return site_data_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_config_dir(self):
return user_config_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_config_dir(self):
return site_config_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_cache_dir(self):
return user_cache_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_state_dir(self):
return user_state_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_log_dir(self):
return user_log_dir(self.appname, self.appauthor,
version=self.version)
#---- internal support stuff
def _get_win_folder_from_registry(csidl_name):
"""This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
if PY3:
import winreg as _winreg
else:
import _winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = _winreg.OpenKey(
_winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"
)
dir, type = _winreg.QueryValueEx(key, shell_folder_name)
return dir
def _get_win_folder_with_pywin32(csidl_name):
from win32com.shell import shellcon, shell
dir = shell.SHGetFolderPath(0, getattr(shellcon, csidl_name), 0, 0)
# Try to make this a unicode path because SHGetFolderPath does
# not return unicode strings when there is unicode data in the
# path.
try:
dir = unicode(dir)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
try:
import win32api
dir = win32api.GetShortPathName(dir)
except ImportError:
pass
except UnicodeError:
pass
return dir
def _get_win_folder_with_ctypes(csidl_name):
import ctypes
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in buf:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return buf.value
def _get_win_folder_with_jna(csidl_name):
import array
from com.sun import jna
from com.sun.jna.platform import win32
buf_size = win32.WinDef.MAX_PATH * 2
buf = array.zeros('c', buf_size)
shell = win32.Shell32.INSTANCE
shell.SHGetFolderPath(None, getattr(win32.ShlObj, csidl_name), None, win32.ShlObj.SHGFP_TYPE_CURRENT, buf)
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf = array.zeros('c', buf_size)
kernel = win32.Kernel32.INSTANCE
if kernel.GetShortPathName(dir, buf, buf_size):
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
return dir
if system == "win32":
try:
import win32com.shell
_get_win_folder = _get_win_folder_with_pywin32
except ImportError:
try:
from ctypes import windll
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
try:
import com.sun.jna
_get_win_folder = _get_win_folder_with_jna
except ImportError:
_get_win_folder = _get_win_folder_from_registry
#---- self test code
if __name__ == "__main__":
appname = "MyApp"
appauthor = "MyCompany"
props = ("user_data_dir",
"user_config_dir",
"user_cache_dir",
"user_state_dir",
"user_log_dir",
"site_data_dir",
"site_config_dir")
print("-- app dirs %s --" % __version__)
print("-- app dirs (with optional 'version')")
dirs = AppDirs(appname, appauthor, version="1.0")
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'version')")
dirs = AppDirs(appname, appauthor)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'appauthor')")
dirs = AppDirs(appname)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (with disabled 'appauthor')")
dirs = AppDirs(appname, appauthor=False)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))