Merge remote-tracking branch 'origin/2.x/develop' into 2.x/develop

This commit is contained in:
Milan Kolar 2020-08-12 22:37:08 +02:00
commit c8f4dccae7
5 changed files with 487 additions and 52 deletions

View file

@ -41,6 +41,7 @@ from .lib import (
get_last_version_from_path,
modified_environ,
add_tool_to_environment,
source_hash,
get_latest_version
)
@ -59,6 +60,7 @@ __all__ = [
# Resources
"resources",
# plugin classes
"Extractor",
# ordering
@ -85,6 +87,7 @@ __all__ = [
"get_last_version_from_path",
"modified_environ",
"add_tool_to_environment",
"source_hash",
"subprocess",
"get_latest_version"

View file

@ -17,7 +17,6 @@ import six
import avalon.api
from .api import config
log = logging.getLogger(__name__)
@ -1383,6 +1382,27 @@ def ffprobe_streams(path_to_file):
return json.loads(popen_output)["streams"]
def source_hash(filepath, *args):
"""Generate simple identifier for a source file.
This is used to identify whether a source file has previously been
processe into the pipeline, e.g. a texture.
The hash is based on source filepath, modification time and file size.
This is only used to identify whether a specific source file was already
published before from the same location with the same modification date.
We opt to do it this way as opposed to Avalanch C4 hash as this is much
faster and predictable enough for all our production use cases.
Args:
filepath (str): The source file path.
You can specify additional arguments in the function
to allow for specific 'processing' values to be included.
"""
# We replace dots with comma because . cannot be a key in a pymongo dict.
file_name = os.path.basename(filepath)
time = str(os.path.getmtime(filepath))
size = str(os.path.getsize(filepath))
return "|".join([file_name, time, size] + list(args)).replace(".", ",")
def get_latest_version(asset_name, subset_name):
"""Retrieve latest version from `asset_name`, and `subset_name`.

View file

@ -11,6 +11,8 @@ from pymongo import DeleteOne, InsertOne
import pyblish.api
from avalon import io
from avalon.vendor import filelink
import pype.api
from datetime import datetime
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
@ -44,6 +46,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"frameStart"
"frameEnd"
'fps'
"data": additional metadata for each representation.
"""
label = "Integrate Asset New"
@ -94,18 +97,28 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
default_template_name = "publish"
template_name_profiles = None
def process(self, instance):
# file_url : file_size of all published and uploaded files
integrated_file_sizes = {}
TMP_FILE_EXT = 'tmp' # suffix to denote temporary files, use without '.'
def process(self, instance):
self.integrated_file_sizes = {}
if [ef for ef in self.exclude_families
if instance.data["family"] in ef]:
return
self.register(instance)
self.log.info("Integrating Asset in to the database ...")
self.log.info("instance.data: {}".format(instance.data))
if instance.data.get('transfer', True):
self.integrate(instance)
try:
self.register(instance)
self.log.info("Integrated Asset in to the database ...")
self.log.info("instance.data: {}".format(instance.data))
self.handle_destination_files(self.integrated_file_sizes,
'finalize')
except Exception:
# clean destination
self.log.critical("Error when registering", exc_info=True)
self.handle_destination_files(self.integrated_file_sizes, 'remove')
six.reraise(*sys.exc_info())
def register(self, instance):
# Required environment variables
@ -268,13 +281,21 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
representations = []
destination_list = []
orig_transfers = []
if 'transfers' not in instance.data:
instance.data['transfers'] = []
else:
orig_transfers = list(instance.data['transfers'])
template_name = self.template_name_from_instance(instance)
published_representations = {}
for idx, repre in enumerate(instance.data["representations"]):
# reset transfers for next representation
# instance.data['transfers'] is used as a global variable
# in current codebase
instance.data['transfers'] = list(orig_transfers)
published_files = []
# create template data for Anatomy
@ -455,13 +476,15 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
if repre_id is None:
repre_id = io.ObjectId()
data = repre.get("data") or {}
data.update({'path': dst, 'template': template})
representation = {
"_id": repre_id,
"schema": "pype:representation-2.0",
"type": "representation",
"parent": version_id,
"name": repre['name'],
"data": {'path': dst, 'template': template},
"data": data,
"dependencies": instance.data.get("dependencies", "").split(),
# Imprint shortcut to context
@ -477,6 +500,24 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
dst_padding_exp % int(repre.get("frameStart"))
)
# any file that should be physically copied is expected in
# 'transfers' or 'hardlinks'
if instance.data.get('transfers', False) or \
instance.data.get('hardlinks', False):
# could throw exception, will be caught in 'process'
# all integration to DB is being done together lower,
# so no rollback needed
self.log.debug("Integrating source files to destination ...")
self.integrated_file_sizes.update(self.integrate(instance))
self.log.debug("Integrated files {}".
format(self.integrated_file_sizes))
# get 'files' info for representation and all attached resources
self.log.debug("Preparing files information ...")
representation["files"] = self.get_files_info(
instance,
self.integrated_file_sizes)
self.log.debug("__ representation: {}".format(representation))
destination_list.append(dst)
self.log.debug("__ destination_list: {}".format(destination_list))
@ -514,10 +555,19 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
Args:
instance: the instance to integrate
Returns:
integrated_file_sizes: dictionary of destination file url and
its size in bytes
"""
transfers = instance.data.get("transfers", list())
# store destination url and size for reporting and rollback
integrated_file_sizes = {}
transfers = list(instance.data.get("transfers", list()))
for src, dest in transfers:
self.copy_file(src, dest)
if os.path.normpath(src) != os.path.normpath(dest):
dest = self.get_dest_temp_url(dest)
self.copy_file(src, dest)
# TODO needs to be updated during site implementation
integrated_file_sizes[dest] = os.path.getsize(dest)
# Produce hardlinked copies
# Note: hardlink can only be produced between two files on the same
@ -526,8 +576,15 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
# to ensure publishes remain safe and non-edited.
hardlinks = instance.data.get("hardlinks", list())
for src, dest in hardlinks:
self.log.debug("Hardlinking file .. {} -> {}".format(src, dest))
self.hardlink_file(src, dest)
dest = self.get_dest_temp_url(dest)
self.log.debug("Hardlinking file ... {} -> {}".format(src, dest))
if not os.path.exists(dest):
self.hardlink_file(src, dest)
# TODO needs to be updated during site implementation
integrated_file_sizes[dest] = os.path.getsize(dest)
return integrated_file_sizes
def copy_file(self, src, dst):
""" Copy given source to destination
@ -540,7 +597,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"""
src = os.path.normpath(src)
dst = os.path.normpath(dst)
self.log.debug("Copying file .. {} -> {}".format(src, dst))
self.log.debug("Copying file ... {} -> {}".format(src, dst))
dirname = os.path.dirname(dst)
try:
os.makedirs(dirname)
@ -549,20 +606,21 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
pass
else:
self.log.critical("An unexpected error occurred.")
raise
six.reraise(*sys.exc_info())
# copy file with speedcopy and check if size of files are simetrical
while True:
import shutil
try:
copyfile(src, dst)
except shutil.SameFileError as sfe:
self.log.critical("files are the same {} to {}".format(src, dst))
except shutil.SameFileError:
self.log.critical("files are the same {} to {}".format(src,
dst))
os.remove(dst)
try:
shutil.copyfile(src, dst)
self.log.debug("Copying files with shutil...")
except (OSError) as e:
except OSError as e:
self.log.critical("Cannot copy {} to {}".format(src, dst))
self.log.critical(e)
six.reraise(*sys.exc_info())
@ -579,7 +637,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
pass
else:
self.log.critical("An unexpected error occurred.")
raise
six.reraise(*sys.exc_info())
filelink.create(src, dst, filelink.HARDLINK)
@ -592,7 +650,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
})
if subset is None:
self.log.info("Subset '%s' not found, creating.." % subset_name)
self.log.info("Subset '%s' not found, creating ..." % subset_name)
self.log.debug("families. %s" % instance.data.get('families'))
self.log.debug(
"families. %s" % type(instance.data.get('families')))
@ -662,16 +720,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
else:
source = context.data["currentFile"]
anatomy = instance.context.data["anatomy"]
success, rootless_path = (
anatomy.find_root_template_from_path(source)
)
if success:
source = rootless_path
else:
self.log.warning((
"Could not find root path for remapping \"{}\"."
" This may cause issues on farm."
).format(source))
source = self.get_rootless_path(anatomy, source)
self.log.debug("Source: {}".format(source))
version_data = {
@ -770,3 +819,151 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
).format(family, task_name, template_name))
return template_name
def get_rootless_path(self, anatomy, path):
""" Returns, if possible, path without absolute portion from host
(eg. 'c:\' or '/opt/..')
This information is host dependent and shouldn't be captured.
Example:
'c:/projects/MyProject1/Assets/publish...' >
'{root}/MyProject1/Assets...'
Args:
anatomy: anatomy part from instance
path: path (absolute)
Returns:
path: modified path if possible, or unmodified path
+ warning logged
"""
success, rootless_path = (
anatomy.find_root_template_from_path(path)
)
if success:
path = rootless_path
else:
self.log.warning((
"Could not find root path for remapping \"{}\"."
" This may cause issues on farm."
).format(path))
return path
def get_files_info(self, instance, integrated_file_sizes):
""" Prepare 'files' portion for attached resources and main asset.
Combining records from 'transfers' and 'hardlinks' parts from
instance.
All attached resources should be added, currently without
Context info.
Arguments:
instance: the current instance being published
integrated_file_sizes: dictionary of destination path (absolute)
and its file size
Returns:
output_resources: array of dictionaries to be added to 'files' key
in representation
"""
resources = list(instance.data.get("transfers", []))
resources.extend(list(instance.data.get("hardlinks", [])))
self.log.debug("get_resource_files_info.resources:{}".
format(resources))
output_resources = []
anatomy = instance.context.data["anatomy"]
for _src, dest in resources:
path = self.get_rootless_path(anatomy, dest)
dest = self.get_dest_temp_url(dest)
file_hash = pype.api.source_hash(dest)
if self.TMP_FILE_EXT and \
',{}'.format(self.TMP_FILE_EXT) in file_hash:
file_hash = file_hash.replace(',{}'.format(self.TMP_FILE_EXT),
'')
file_info = self.prepare_file_info(path,
integrated_file_sizes[dest],
file_hash)
output_resources.append(file_info)
return output_resources
def get_dest_temp_url(self, dest):
""" Enhance destination path with TMP_FILE_EXT to denote temporary
file.
Temporary files will be renamed after successful registration
into DB and full copy to destination
Arguments:
dest: destination url of published file (absolute)
Returns:
dest: destination path + '.TMP_FILE_EXT'
"""
if self.TMP_FILE_EXT and '.{}'.format(self.TMP_FILE_EXT) not in dest:
dest += '.{}'.format(self.TMP_FILE_EXT)
return dest
def prepare_file_info(self, path, size=None, file_hash=None, sites=None):
""" Prepare information for one file (asset or resource)
Arguments:
path: destination url of published file (rootless)
size(optional): size of file in bytes
file_hash(optional): hash of file for synchronization validation
sites(optional): array of published locations,
['studio': {'created_dt':date}] by default
keys expected ['studio', 'site1', 'gdrive1']
Returns:
rec: dictionary with filled info
"""
rec = {
"_id": io.ObjectId(),
"path": path
}
if size:
rec["size"] = size
if file_hash:
rec["hash"] = file_hash
if sites:
rec["sites"] = sites
else:
meta = {"created_dt": datetime.now()}
rec["sites"] = {"studio": meta}
return rec
def handle_destination_files(self, integrated_file_sizes, mode):
""" Clean destination files
Called when error happened during integrating to DB or to disk
OR called to rename uploaded files from temporary name to final to
highlight publishing in progress/broken
Used to clean unwanted files
Arguments:
integrated_file_sizes: dictionary, file urls as keys, size as value
mode: 'remove' - clean files,
'finalize' - rename files,
remove TMP_FILE_EXT suffix denoting temp file
"""
if integrated_file_sizes:
for file_url, _file_size in integrated_file_sizes.items():
try:
if mode == 'remove':
self.log.debug("Removing file ...{}".format(file_url))
os.remove(file_url)
if mode == 'finalize':
self.log.debug("Renaming file ...{}".format(file_url))
import re
os.rename(file_url,
re.sub('\.{}$'.format(self.TMP_FILE_EXT),
'',
file_url)
)
except FileNotFoundError:
pass # file not there, nothing to delete
except OSError:
self.log.error("Cannot {} file {}".format(mode, file_url),
exc_info=True)
six.reraise(*sys.exc_info())

View file

@ -21,27 +21,6 @@ COPY = 1
HARDLINK = 2
def source_hash(filepath, *args):
"""Generate simple identifier for a source file.
This is used to identify whether a source file has previously been
processe into the pipeline, e.g. a texture.
The hash is based on source filepath, modification time and file size.
This is only used to identify whether a specific source file was already
published before from the same location with the same modification date.
We opt to do it this way as opposed to Avalanch C4 hash as this is much
faster and predictable enough for all our production use cases.
Args:
filepath (str): The source file path.
You can specify additional arguments in the function
to allow for specific 'processing' values to be included.
"""
# We replace dots with comma because . cannot be a key in a pymongo dict.
file_name = os.path.basename(filepath)
time = str(os.path.getmtime(filepath))
size = str(os.path.getsize(filepath))
return "|".join([file_name, time, size] + list(args)).replace(".", ",")
def find_paths_by_hash(texture_hash):
# Find the texture hash key in the dictionary and all paths that
# originate from it.
@ -363,7 +342,7 @@ class ExtractLook(pype.api.Extractor):
args = []
if do_maketx:
args.append("maketx")
texture_hash = source_hash(filepath, *args)
texture_hash = pype.api.source_hash(filepath, *args)
# If source has been published before with the same settings,
# then don't reprocess but hardlink from the original

View file

@ -0,0 +1,236 @@
import pymongo
import bson
import random
class TestPerformance():
'''
Class for testing performance of representation and their 'files' parts.
Discussion is if embedded array:
'files' : [ {'_id': '1111', 'path':'....},
{'_id'...}]
OR documents:
'files' : {
'1111': {'path':'....'},
'2222': {'path':'...'}
}
is faster.
Current results: without additional partial index documents is 3x faster
With index is array 50x faster then document
Partial index something like:
db.getCollection('performance_test').createIndex
({'files._id': 1},
{partialFilterExpresion: {'files': {'$exists': true}})
!DIDNT work for me, had to create manually in Compass
'''
MONGO_URL = 'mongodb://localhost:27017'
MONGO_DB = 'performance_test'
MONGO_COLLECTION = 'performance_test'
inserted_ids = []
def __init__(self, version='array'):
'''
It creates and fills collection, based on value of 'version'.
:param version: 'array' - files as embedded array,
'doc' - as document
'''
self.client = pymongo.MongoClient(self.MONGO_URL)
self.db = self.client[self.MONGO_DB]
self.collection_name = self.MONGO_COLLECTION
self.version = version
if self.version != 'array':
self.collection_name = self.MONGO_COLLECTION + '_doc'
self.collection = self.db[self.collection_name]
self.ids = [] # for testing
self.inserted_ids = []
def prepare(self, no_of_records=100000):
'''
Produce 'no_of_records' of representations with 'files' segment.
It depends on 'version' value in constructor, 'arrray' or 'doc'
:return:
'''
print('Purging {} collection'.format(self.collection_name))
self.collection.delete_many({})
id = bson.objectid.ObjectId()
insert_recs = []
for i in range(no_of_records):
file_id = bson.objectid.ObjectId()
file_id2 = bson.objectid.ObjectId()
file_id3 = bson.objectid.ObjectId()
self.inserted_ids.extend([file_id, file_id2, file_id3])
document = {"files": self.get_files(self.version, i,
file_id, file_id2, file_id3)
,
"context": {
"subset": "workfileLookdev",
"username": "petrk",
"task": "lookdev",
"family": "workfile",
"hierarchy": "Assets",
"project": {"code": "test", "name": "Test"},
"version": 1,
"asset": "Cylinder",
"representation": "mb",
"root": "C:/projects"
},
"dependencies": [],
"name": "mb",
"parent": {"oid": '{}'.format(id)},
"data": {
"path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\v001\\test_Cylinder_workfileLookdev_v001.mb",
"template": "{root}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}"
},
"type": "representation",
"schema": "pype:representation-2.0"
}
insert_recs.append(document)
print('Prepared {} records in {} collection'.
format(no_of_records, self.collection_name))
self.collection.insert_many(insert_recs)
# TODO refactore to produce real array and not needeing ugly regex
self.collection.insert_one({"inserted_id": self.inserted_ids})
print('-' * 50)
def run(self, queries=1000, loops=3):
'''
Run X'queries' that are searching collection Y'loops' times
:param queries: how many times do ..find(...)
:param loops: loop of testing X queries
:return: None
'''
print('Testing version {} on {}'.format(self.version,
self.collection_name))
inserted_ids = list(self.collection.
find({"inserted_id": {"$exists": True}}))
import re
self.ids = re.findall("'[0-9a-z]*'", str(inserted_ids))
import time
found_cnt = 0
for _ in range(loops):
start = time.time()
for _ in range(queries):
val = random.choice(self.ids)
val = val.replace("'", '')
if (self.version == 'array'):
# prepared for partial index, without 'files': exists
# wont engage
found = self.collection.\
find_one({'files': {"$exists": True},
'files._id': "{}".format(val)})
else:
key = "files.{}".format(val)
found = self.collection.find_one({key: {"$exists": True}})
if found:
found_cnt += 1
end = time.time()
print('duration per loop {}'.format(end - start))
print("found_cnt {}".format(found_cnt))
def get_files(self, mode, i, file_id, file_id2, file_id3):
'''
Wrapper to decide if 'array' or document version should be used
:param mode: 'array'|'doc'
:param i: step number
:param file_id: ObjectId of first dummy file
:param file_id2: ..
:param file_id3: ..
:return:
'''
if mode == 'array':
return self.get_files_array(i, file_id, file_id2, file_id3)
else:
return self.get_files_doc(i, file_id, file_id2, file_id3)
def get_files_array(self, i, file_id, file_id2, file_id3):
return [
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderA_workfileLookdev_v{0:03}.mb".format(i),
"_id": '{}'.format(file_id),
"hash": "temphash",
"sites": ["studio"],
"size":87236
},
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderB_workfileLookdev_v{0:03}.mb".format(i),
"_id": '{}'.format(file_id2),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
},
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderC_workfileLookdev_v{0:03}.mb".format(i),
"_id": '{}'.format(file_id3),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
]
def get_files_doc(self, i, file_id, file_id2, file_id3):
ret = {}
ret['{}'.format(file_id)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderA_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
ret['{}'.format(file_id2)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderB_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
ret['{}'.format(file_id3)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderC_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
return ret
if __name__ == '__main__':
tp = TestPerformance('array')
tp.prepare() # enable to prepare data
tp.run(1000, 3)
print('-'*50)
tp = TestPerformance('doc')
tp.prepare() # enable to prepare data
tp.run(1000, 3)