diff --git a/pype/api.py b/pype/api.py index ce13688d13..c722757a3c 100644 --- a/pype/api.py +++ b/pype/api.py @@ -41,6 +41,7 @@ from .lib import ( get_last_version_from_path, modified_environ, add_tool_to_environment, + source_hash, get_latest_version ) @@ -59,6 +60,7 @@ __all__ = [ # Resources "resources", + # plugin classes "Extractor", # ordering @@ -85,6 +87,7 @@ __all__ = [ "get_last_version_from_path", "modified_environ", "add_tool_to_environment", + "source_hash", "subprocess", "get_latest_version" diff --git a/pype/lib.py b/pype/lib.py index 4d38ae8478..103d5a15f1 100644 --- a/pype/lib.py +++ b/pype/lib.py @@ -17,7 +17,6 @@ import six import avalon.api from .api import config - log = logging.getLogger(__name__) @@ -1383,6 +1382,27 @@ def ffprobe_streams(path_to_file): return json.loads(popen_output)["streams"] +def source_hash(filepath, *args): + """Generate simple identifier for a source file. + This is used to identify whether a source file has previously been + processe into the pipeline, e.g. a texture. + The hash is based on source filepath, modification time and file size. + This is only used to identify whether a specific source file was already + published before from the same location with the same modification date. + We opt to do it this way as opposed to Avalanch C4 hash as this is much + faster and predictable enough for all our production use cases. + Args: + filepath (str): The source file path. + You can specify additional arguments in the function + to allow for specific 'processing' values to be included. + """ + # We replace dots with comma because . cannot be a key in a pymongo dict. + file_name = os.path.basename(filepath) + time = str(os.path.getmtime(filepath)) + size = str(os.path.getsize(filepath)) + return "|".join([file_name, time, size] + list(args)).replace(".", ",") + + def get_latest_version(asset_name, subset_name): """Retrieve latest version from `asset_name`, and `subset_name`. diff --git a/pype/plugins/global/publish/integrate_new.py b/pype/plugins/global/publish/integrate_new.py index 1d50e24e86..88267e1b0a 100644 --- a/pype/plugins/global/publish/integrate_new.py +++ b/pype/plugins/global/publish/integrate_new.py @@ -11,6 +11,8 @@ from pymongo import DeleteOne, InsertOne import pyblish.api from avalon import io from avalon.vendor import filelink +import pype.api +from datetime import datetime # this is needed until speedcopy for linux is fixed if sys.platform == "win32": @@ -44,6 +46,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): "frameStart" "frameEnd" 'fps' + "data": additional metadata for each representation. """ label = "Integrate Asset New" @@ -94,18 +97,28 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): default_template_name = "publish" template_name_profiles = None - def process(self, instance): + # file_url : file_size of all published and uploaded files + integrated_file_sizes = {} + TMP_FILE_EXT = 'tmp' # suffix to denote temporary files, use without '.' + + def process(self, instance): + self.integrated_file_sizes = {} if [ef for ef in self.exclude_families if instance.data["family"] in ef]: return - self.register(instance) - - self.log.info("Integrating Asset in to the database ...") - self.log.info("instance.data: {}".format(instance.data)) - if instance.data.get('transfer', True): - self.integrate(instance) + try: + self.register(instance) + self.log.info("Integrated Asset in to the database ...") + self.log.info("instance.data: {}".format(instance.data)) + self.handle_destination_files(self.integrated_file_sizes, + 'finalize') + except Exception: + # clean destination + self.log.critical("Error when registering", exc_info=True) + self.handle_destination_files(self.integrated_file_sizes, 'remove') + six.reraise(*sys.exc_info()) def register(self, instance): # Required environment variables @@ -268,13 +281,21 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): representations = [] destination_list = [] + orig_transfers = [] if 'transfers' not in instance.data: instance.data['transfers'] = [] + else: + orig_transfers = list(instance.data['transfers']) template_name = self.template_name_from_instance(instance) published_representations = {} for idx, repre in enumerate(instance.data["representations"]): + # reset transfers for next representation + # instance.data['transfers'] is used as a global variable + # in current codebase + instance.data['transfers'] = list(orig_transfers) + published_files = [] # create template data for Anatomy @@ -455,13 +476,15 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): if repre_id is None: repre_id = io.ObjectId() + data = repre.get("data") or {} + data.update({'path': dst, 'template': template}) representation = { "_id": repre_id, "schema": "pype:representation-2.0", "type": "representation", "parent": version_id, "name": repre['name'], - "data": {'path': dst, 'template': template}, + "data": data, "dependencies": instance.data.get("dependencies", "").split(), # Imprint shortcut to context @@ -477,6 +500,24 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): dst_padding_exp % int(repre.get("frameStart")) ) + # any file that should be physically copied is expected in + # 'transfers' or 'hardlinks' + if instance.data.get('transfers', False) or \ + instance.data.get('hardlinks', False): + # could throw exception, will be caught in 'process' + # all integration to DB is being done together lower, + # so no rollback needed + self.log.debug("Integrating source files to destination ...") + self.integrated_file_sizes.update(self.integrate(instance)) + self.log.debug("Integrated files {}". + format(self.integrated_file_sizes)) + + # get 'files' info for representation and all attached resources + self.log.debug("Preparing files information ...") + representation["files"] = self.get_files_info( + instance, + self.integrated_file_sizes) + self.log.debug("__ representation: {}".format(representation)) destination_list.append(dst) self.log.debug("__ destination_list: {}".format(destination_list)) @@ -514,10 +555,19 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): Args: instance: the instance to integrate + Returns: + integrated_file_sizes: dictionary of destination file url and + its size in bytes """ - transfers = instance.data.get("transfers", list()) + # store destination url and size for reporting and rollback + integrated_file_sizes = {} + transfers = list(instance.data.get("transfers", list())) for src, dest in transfers: - self.copy_file(src, dest) + if os.path.normpath(src) != os.path.normpath(dest): + dest = self.get_dest_temp_url(dest) + self.copy_file(src, dest) + # TODO needs to be updated during site implementation + integrated_file_sizes[dest] = os.path.getsize(dest) # Produce hardlinked copies # Note: hardlink can only be produced between two files on the same @@ -526,8 +576,15 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): # to ensure publishes remain safe and non-edited. hardlinks = instance.data.get("hardlinks", list()) for src, dest in hardlinks: - self.log.debug("Hardlinking file .. {} -> {}".format(src, dest)) - self.hardlink_file(src, dest) + dest = self.get_dest_temp_url(dest) + self.log.debug("Hardlinking file ... {} -> {}".format(src, dest)) + if not os.path.exists(dest): + self.hardlink_file(src, dest) + + # TODO needs to be updated during site implementation + integrated_file_sizes[dest] = os.path.getsize(dest) + + return integrated_file_sizes def copy_file(self, src, dst): """ Copy given source to destination @@ -540,7 +597,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): """ src = os.path.normpath(src) dst = os.path.normpath(dst) - self.log.debug("Copying file .. {} -> {}".format(src, dst)) + self.log.debug("Copying file ... {} -> {}".format(src, dst)) dirname = os.path.dirname(dst) try: os.makedirs(dirname) @@ -549,20 +606,21 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): pass else: self.log.critical("An unexpected error occurred.") - raise + six.reraise(*sys.exc_info()) # copy file with speedcopy and check if size of files are simetrical while True: import shutil try: copyfile(src, dst) - except shutil.SameFileError as sfe: - self.log.critical("files are the same {} to {}".format(src, dst)) + except shutil.SameFileError: + self.log.critical("files are the same {} to {}".format(src, + dst)) os.remove(dst) try: shutil.copyfile(src, dst) self.log.debug("Copying files with shutil...") - except (OSError) as e: + except OSError as e: self.log.critical("Cannot copy {} to {}".format(src, dst)) self.log.critical(e) six.reraise(*sys.exc_info()) @@ -579,7 +637,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): pass else: self.log.critical("An unexpected error occurred.") - raise + six.reraise(*sys.exc_info()) filelink.create(src, dst, filelink.HARDLINK) @@ -592,7 +650,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): }) if subset is None: - self.log.info("Subset '%s' not found, creating.." % subset_name) + self.log.info("Subset '%s' not found, creating ..." % subset_name) self.log.debug("families. %s" % instance.data.get('families')) self.log.debug( "families. %s" % type(instance.data.get('families'))) @@ -662,16 +720,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): else: source = context.data["currentFile"] anatomy = instance.context.data["anatomy"] - success, rootless_path = ( - anatomy.find_root_template_from_path(source) - ) - if success: - source = rootless_path - else: - self.log.warning(( - "Could not find root path for remapping \"{}\"." - " This may cause issues on farm." - ).format(source)) + source = self.get_rootless_path(anatomy, source) self.log.debug("Source: {}".format(source)) version_data = { @@ -770,3 +819,151 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): ).format(family, task_name, template_name)) return template_name + + def get_rootless_path(self, anatomy, path): + """ Returns, if possible, path without absolute portion from host + (eg. 'c:\' or '/opt/..') + This information is host dependent and shouldn't be captured. + Example: + 'c:/projects/MyProject1/Assets/publish...' > + '{root}/MyProject1/Assets...' + + Args: + anatomy: anatomy part from instance + path: path (absolute) + Returns: + path: modified path if possible, or unmodified path + + warning logged + """ + success, rootless_path = ( + anatomy.find_root_template_from_path(path) + ) + if success: + path = rootless_path + else: + self.log.warning(( + "Could not find root path for remapping \"{}\"." + " This may cause issues on farm." + ).format(path)) + return path + + def get_files_info(self, instance, integrated_file_sizes): + """ Prepare 'files' portion for attached resources and main asset. + Combining records from 'transfers' and 'hardlinks' parts from + instance. + All attached resources should be added, currently without + Context info. + + Arguments: + instance: the current instance being published + integrated_file_sizes: dictionary of destination path (absolute) + and its file size + Returns: + output_resources: array of dictionaries to be added to 'files' key + in representation + """ + resources = list(instance.data.get("transfers", [])) + resources.extend(list(instance.data.get("hardlinks", []))) + + self.log.debug("get_resource_files_info.resources:{}". + format(resources)) + + output_resources = [] + anatomy = instance.context.data["anatomy"] + for _src, dest in resources: + path = self.get_rootless_path(anatomy, dest) + dest = self.get_dest_temp_url(dest) + file_hash = pype.api.source_hash(dest) + if self.TMP_FILE_EXT and \ + ',{}'.format(self.TMP_FILE_EXT) in file_hash: + file_hash = file_hash.replace(',{}'.format(self.TMP_FILE_EXT), + '') + + file_info = self.prepare_file_info(path, + integrated_file_sizes[dest], + file_hash) + output_resources.append(file_info) + + return output_resources + + def get_dest_temp_url(self, dest): + """ Enhance destination path with TMP_FILE_EXT to denote temporary + file. + Temporary files will be renamed after successful registration + into DB and full copy to destination + + Arguments: + dest: destination url of published file (absolute) + Returns: + dest: destination path + '.TMP_FILE_EXT' + """ + if self.TMP_FILE_EXT and '.{}'.format(self.TMP_FILE_EXT) not in dest: + dest += '.{}'.format(self.TMP_FILE_EXT) + return dest + + def prepare_file_info(self, path, size=None, file_hash=None, sites=None): + """ Prepare information for one file (asset or resource) + + Arguments: + path: destination url of published file (rootless) + size(optional): size of file in bytes + file_hash(optional): hash of file for synchronization validation + sites(optional): array of published locations, + ['studio': {'created_dt':date}] by default + keys expected ['studio', 'site1', 'gdrive1'] + Returns: + rec: dictionary with filled info + """ + + rec = { + "_id": io.ObjectId(), + "path": path + } + if size: + rec["size"] = size + + if file_hash: + rec["hash"] = file_hash + + if sites: + rec["sites"] = sites + else: + meta = {"created_dt": datetime.now()} + rec["sites"] = {"studio": meta} + + return rec + + def handle_destination_files(self, integrated_file_sizes, mode): + """ Clean destination files + Called when error happened during integrating to DB or to disk + OR called to rename uploaded files from temporary name to final to + highlight publishing in progress/broken + Used to clean unwanted files + + Arguments: + integrated_file_sizes: dictionary, file urls as keys, size as value + mode: 'remove' - clean files, + 'finalize' - rename files, + remove TMP_FILE_EXT suffix denoting temp file + """ + if integrated_file_sizes: + for file_url, _file_size in integrated_file_sizes.items(): + try: + if mode == 'remove': + self.log.debug("Removing file ...{}".format(file_url)) + os.remove(file_url) + if mode == 'finalize': + self.log.debug("Renaming file ...{}".format(file_url)) + import re + os.rename(file_url, + re.sub('\.{}$'.format(self.TMP_FILE_EXT), + '', + file_url) + ) + + except FileNotFoundError: + pass # file not there, nothing to delete + except OSError: + self.log.error("Cannot {} file {}".format(mode, file_url), + exc_info=True) + six.reraise(*sys.exc_info()) diff --git a/pype/plugins/maya/publish/extract_look.py b/pype/plugins/maya/publish/extract_look.py index f402f61329..6bd202093f 100644 --- a/pype/plugins/maya/publish/extract_look.py +++ b/pype/plugins/maya/publish/extract_look.py @@ -21,27 +21,6 @@ COPY = 1 HARDLINK = 2 -def source_hash(filepath, *args): - """Generate simple identifier for a source file. - This is used to identify whether a source file has previously been - processe into the pipeline, e.g. a texture. - The hash is based on source filepath, modification time and file size. - This is only used to identify whether a specific source file was already - published before from the same location with the same modification date. - We opt to do it this way as opposed to Avalanch C4 hash as this is much - faster and predictable enough for all our production use cases. - Args: - filepath (str): The source file path. - You can specify additional arguments in the function - to allow for specific 'processing' values to be included. - """ - # We replace dots with comma because . cannot be a key in a pymongo dict. - file_name = os.path.basename(filepath) - time = str(os.path.getmtime(filepath)) - size = str(os.path.getsize(filepath)) - return "|".join([file_name, time, size] + list(args)).replace(".", ",") - - def find_paths_by_hash(texture_hash): # Find the texture hash key in the dictionary and all paths that # originate from it. @@ -363,7 +342,7 @@ class ExtractLook(pype.api.Extractor): args = [] if do_maketx: args.append("maketx") - texture_hash = source_hash(filepath, *args) + texture_hash = pype.api.source_hash(filepath, *args) # If source has been published before with the same settings, # then don't reprocess but hardlink from the original diff --git a/pype/tests/test_mongo_performance.py b/pype/tests/test_mongo_performance.py new file mode 100644 index 0000000000..6b62f0fd1c --- /dev/null +++ b/pype/tests/test_mongo_performance.py @@ -0,0 +1,236 @@ +import pymongo +import bson +import random + + +class TestPerformance(): + ''' + Class for testing performance of representation and their 'files' parts. + Discussion is if embedded array: + 'files' : [ {'_id': '1111', 'path':'....}, + {'_id'...}] + OR documents: + 'files' : { + '1111': {'path':'....'}, + '2222': {'path':'...'} + } + is faster. + + Current results: without additional partial index documents is 3x faster + With index is array 50x faster then document + + Partial index something like: + db.getCollection('performance_test').createIndex + ({'files._id': 1}, + {partialFilterExpresion: {'files': {'$exists': true}}) + !DIDNT work for me, had to create manually in Compass + + ''' + + MONGO_URL = 'mongodb://localhost:27017' + MONGO_DB = 'performance_test' + MONGO_COLLECTION = 'performance_test' + + inserted_ids = [] + + def __init__(self, version='array'): + ''' + It creates and fills collection, based on value of 'version'. + + :param version: 'array' - files as embedded array, + 'doc' - as document + ''' + self.client = pymongo.MongoClient(self.MONGO_URL) + self.db = self.client[self.MONGO_DB] + self.collection_name = self.MONGO_COLLECTION + + self.version = version + + if self.version != 'array': + self.collection_name = self.MONGO_COLLECTION + '_doc' + + self.collection = self.db[self.collection_name] + + self.ids = [] # for testing + self.inserted_ids = [] + + def prepare(self, no_of_records=100000): + ''' + Produce 'no_of_records' of representations with 'files' segment. + It depends on 'version' value in constructor, 'arrray' or 'doc' + :return: + ''' + print('Purging {} collection'.format(self.collection_name)) + self.collection.delete_many({}) + + id = bson.objectid.ObjectId() + + insert_recs = [] + for i in range(no_of_records): + file_id = bson.objectid.ObjectId() + file_id2 = bson.objectid.ObjectId() + file_id3 = bson.objectid.ObjectId() + + self.inserted_ids.extend([file_id, file_id2, file_id3]) + + document = {"files": self.get_files(self.version, i, + file_id, file_id2, file_id3) + , + "context": { + "subset": "workfileLookdev", + "username": "petrk", + "task": "lookdev", + "family": "workfile", + "hierarchy": "Assets", + "project": {"code": "test", "name": "Test"}, + "version": 1, + "asset": "Cylinder", + "representation": "mb", + "root": "C:/projects" + }, + "dependencies": [], + "name": "mb", + "parent": {"oid": '{}'.format(id)}, + "data": { + "path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\v001\\test_Cylinder_workfileLookdev_v001.mb", + "template": "{root}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}" + }, + "type": "representation", + "schema": "pype:representation-2.0" + } + + insert_recs.append(document) + + print('Prepared {} records in {} collection'. + format(no_of_records, self.collection_name)) + + self.collection.insert_many(insert_recs) + # TODO refactore to produce real array and not needeing ugly regex + self.collection.insert_one({"inserted_id": self.inserted_ids}) + print('-' * 50) + + def run(self, queries=1000, loops=3): + ''' + Run X'queries' that are searching collection Y'loops' times + :param queries: how many times do ..find(...) + :param loops: loop of testing X queries + :return: None + ''' + print('Testing version {} on {}'.format(self.version, + self.collection_name)) + + inserted_ids = list(self.collection. + find({"inserted_id": {"$exists": True}})) + import re + self.ids = re.findall("'[0-9a-z]*'", str(inserted_ids)) + + import time + + found_cnt = 0 + for _ in range(loops): + start = time.time() + for _ in range(queries): + val = random.choice(self.ids) + val = val.replace("'", '') + + if (self.version == 'array'): + # prepared for partial index, without 'files': exists + # wont engage + found = self.collection.\ + find_one({'files': {"$exists": True}, + 'files._id': "{}".format(val)}) + else: + key = "files.{}".format(val) + found = self.collection.find_one({key: {"$exists": True}}) + if found: + found_cnt += 1 + + end = time.time() + print('duration per loop {}'.format(end - start)) + print("found_cnt {}".format(found_cnt)) + + def get_files(self, mode, i, file_id, file_id2, file_id3): + ''' + Wrapper to decide if 'array' or document version should be used + :param mode: 'array'|'doc' + :param i: step number + :param file_id: ObjectId of first dummy file + :param file_id2: .. + :param file_id3: .. + :return: + ''' + if mode == 'array': + return self.get_files_array(i, file_id, file_id2, file_id3) + else: + return self.get_files_doc(i, file_id, file_id2, file_id3) + + def get_files_array(self, i, file_id, file_id2, file_id3): + return [ + { + "path": "c:/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v001/" + "test_CylinderA_workfileLookdev_v{0:03}.mb".format(i), + "_id": '{}'.format(file_id), + "hash": "temphash", + "sites": ["studio"], + "size":87236 + }, + { + "path": "c:/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v001/" + "test_CylinderB_workfileLookdev_v{0:03}.mb".format(i), + "_id": '{}'.format(file_id2), + "hash": "temphash", + "sites": ["studio"], + "size": 87236 + }, + { + "path": "c:/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v001/" + "test_CylinderC_workfileLookdev_v{0:03}.mb".format(i), + "_id": '{}'.format(file_id3), + "hash": "temphash", + "sites": ["studio"], + "size": 87236 + } + + ] + + def get_files_doc(self, i, file_id, file_id2, file_id3): + ret = {} + ret['{}'.format(file_id)] = { + "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "v001/test_CylinderA_workfileLookdev_v{0:03}.mb".format(i), + "hash": "temphash", + "sites": ["studio"], + "size": 87236 + } + + ret['{}'.format(file_id2)] = { + "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "v001/test_CylinderB_workfileLookdev_v{0:03}.mb".format(i), + "hash": "temphash", + "sites": ["studio"], + "size": 87236 + } + ret['{}'.format(file_id3)] = { + "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "v001/test_CylinderC_workfileLookdev_v{0:03}.mb".format(i), + "hash": "temphash", + "sites": ["studio"], + "size": 87236 + } + + return ret + + +if __name__ == '__main__': + tp = TestPerformance('array') + tp.prepare() # enable to prepare data + tp.run(1000, 3) + + print('-'*50) + + tp = TestPerformance('doc') + tp.prepare() # enable to prepare data + tp.run(1000, 3)