Marked 'process_single_file' as deprecated and moved to pipeline delivery as 'deliver_single_file'

This commit is contained in:
Jakub Trllo 2022-08-29 15:07:45 +02:00
parent dc77d4a609
commit eaff50b23e
4 changed files with 87 additions and 43 deletions

View file

@ -179,53 +179,38 @@ def check_destination_path(repre_id,
)
@deprecated("openpype.pipeline.delivery.deliver_single_file")
def process_single_file(
src_path, repre, anatomy, template_name, anatomy_data, format_dict,
report_items, log
):
"""Copy single file to calculated path based on template
Args:
src_path(str): path of source representation file
_repre (dict): full repre, used only in process_sequence, here only
as to share same signature
anatomy (Anatomy)
template_name (string): user selected delivery template name
anatomy_data (dict): data from repre to fill anatomy with
format_dict (dict): root dictionary with names and values
report_items (collections.defaultdict): to return error messages
log (Logger): for log printing
Returns:
(collections.defaultdict , int)
Args:
src_path(str): path of source representation file
_repre (dict): full repre, used only in process_sequence, here only
as to share same signature
anatomy (Anatomy)
template_name (string): user selected delivery template name
anatomy_data (dict): data from repre to fill anatomy with
format_dict (dict): root dictionary with names and values
report_items (collections.defaultdict): to return error messages
log (Logger): for log printing
Returns:
(collections.defaultdict , int)
Deprecated:
Function was moved to different location and will be removed
after 3.16.* release.
"""
# Make sure path is valid for all platforms
src_path = os.path.normpath(src_path.replace("\\", "/"))
if not os.path.exists(src_path):
msg = "{} doesn't exist for {}".format(src_path, repre["_id"])
report_items["Source file was not found"].append(msg)
return report_items, 0
from openpype.pipeline.delivery import deliver_single_file
anatomy_filled = anatomy.format(anatomy_data)
if format_dict:
template_result = anatomy_filled["delivery"][template_name]
delivery_path = template_result.rootless.format(**format_dict)
else:
delivery_path = anatomy_filled["delivery"][template_name]
# Backwards compatibility when extension contained `.`
delivery_path = delivery_path.replace("..", ".")
# Make sure path is valid for all platforms
delivery_path = os.path.normpath(delivery_path.replace("\\", "/"))
delivery_folder = os.path.dirname(delivery_path)
if not os.path.exists(delivery_folder):
os.makedirs(delivery_folder)
log.debug("Copying single: {} -> {}".format(src_path, delivery_path))
copy_file(src_path, delivery_path)
return report_items, 1
return deliver_single_file(
src_path, repre, anatomy, template_name, anatomy_data, format_dict,
report_items, log
)
def process_sequence(

View file

@ -21,9 +21,9 @@ from openpype.pipeline.load import get_representation_path_with_anatomy
from openpype.pipeline.delivery import (
get_format_dict,
check_destination_path,
deliver_single_file,
)
from openpype.lib.delivery import (
process_single_file,
process_sequence
)
@ -596,7 +596,7 @@ class Delivery(BaseAction):
self.log
)
if not frame:
process_single_file(*args)
deliver_single_file(*args)
else:
process_sequence(*args)

View file

@ -1,6 +1,8 @@
"""Functions useful for delivery of published representations."""
import os
import shutil
import glob
import clique
import collections
from openpype.lib import create_hard_link
@ -107,3 +109,60 @@ def check_destination_path(
report_items[msg].append(sub_msg)
return report_items
def deliver_single_file(
src_path,
repre,
anatomy,
template_name,
anatomy_data,
format_dict,
report_items,
log
):
"""Copy single file to calculated path based on template
Args:
src_path(str): path of source representation file
repre (dict): full repre, used only in process_sequence, here only
as to share same signature
anatomy (Anatomy)
template_name (string): user selected delivery template name
anatomy_data (dict): data from repre to fill anatomy with
format_dict (dict): root dictionary with names and values
report_items (collections.defaultdict): to return error messages
log (logging.Logger): for log printing
Returns:
(collections.defaultdict, int)
"""
# Make sure path is valid for all platforms
src_path = os.path.normpath(src_path.replace("\\", "/"))
if not os.path.exists(src_path):
msg = "{} doesn't exist for {}".format(src_path, repre["_id"])
report_items["Source file was not found"].append(msg)
return report_items, 0
anatomy_filled = anatomy.format(anatomy_data)
if format_dict:
template_result = anatomy_filled["delivery"][template_name]
delivery_path = template_result.rootless.format(**format_dict)
else:
delivery_path = anatomy_filled["delivery"][template_name]
# Backwards compatibility when extension contained `.`
delivery_path = delivery_path.replace("..", ".")
# Make sure path is valid for all platforms
delivery_path = os.path.normpath(delivery_path.replace("\\", "/"))
delivery_folder = os.path.dirname(delivery_path)
if not os.path.exists(delivery_folder):
os.makedirs(delivery_folder)
log.debug("Copying single: {} -> {}".format(src_path, delivery_path))
_copy_file(src_path, delivery_path)
return report_items, 1

View file

@ -16,9 +16,9 @@ from openpype.pipeline.load import get_representation_path_with_anatomy
from openpype.pipeline.delivery import (
get_format_dict,
check_destination_path,
deliver_single_file,
)
from openpype.lib.delivery import (
process_single_file,
process_sequence,
)
@ -208,7 +208,7 @@ class DeliveryOptionsDialog(QtWidgets.QDialog):
args[0] = src_path
if frame:
anatomy_data["frame"] = frame
new_report_items, uploaded = process_single_file(*args)
new_report_items, uploaded = deliver_single_file(*args)
report_items.update(new_report_items)
self._update_progress(uploaded)
else: # fallback for Pype2 and representations without files
@ -217,7 +217,7 @@ class DeliveryOptionsDialog(QtWidgets.QDialog):
repre["context"]["frame"] = len(str(frame)) * "#"
if not frame:
new_report_items, uploaded = process_single_file(*args)
new_report_items, uploaded = deliver_single_file(*args)
else:
new_report_items, uploaded = process_sequence(*args)
report_items.update(new_report_items)