Added filling by previous published version

This commit is contained in:
Petr Kalis 2025-04-09 18:23:43 +02:00
parent d076f152fd
commit 6c404c88e9

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import os
import re
import copy
@ -5,6 +7,7 @@ import json
import shutil
import subprocess
from abc import ABC, abstractmethod
from typing import Dict, Any
import clique
import speedcopy
@ -29,6 +32,7 @@ from ayon_core.pipeline.publish import (
get_publish_instance_label,
)
from ayon_core.pipeline.publish.lib import add_repre_files_for_cleanup
from ayon_api import get_last_version_by_product_name, get_representations
def frame_to_timecode(frame: int, fps: float) -> str:
@ -430,7 +434,24 @@ class ExtractReview(pyblish.api.InstancePlugin):
resolution_height=temp_data["resolution_height"],
extension=temp_data["ext"],
)
elif fill_type == "previous":
added_frames_and_files = self.fill_sequence_gaps_with_previous(
collection=collection,
staging_dir=new_repre["stagingDir"],
instance=instance,
current_repre=repre,
start_frame=temp_data["frame_start"],
end_frame=temp_data["frame_end"],
)
# fallback to original workflow
if added_frames_and_files is None:
added_frames_and_files = self.fill_sequence_gaps_from_existing(
collection=collection,
staging_dir=new_repre["stagingDir"],
start_frame=temp_data["frame_start"],
end_frame=temp_data["frame_end"],
)
temp_data["filled_files"] = added_frames_and_files
# create or update outputName
output_name = new_repre.get("outputName", "")
@ -906,6 +927,99 @@ class ExtractReview(pyblish.api.InstancePlugin):
return all_args
def fill_sequence_gaps_with_previous(
self,
collection: str,
staging_dir: str,
instance: pyblish.plugin.Instance,
current_repre: Dict[Any, Any],
start_frame: int,
end_frame: int
) -> Dict[int, str] | None:
"""Tries to replace missing frames from ones from last version"""
repre_file_paths = self._get_last_version_files(
instance, current_repre)
if repre_file_paths is None:
# issues in getting last version files, falling back
return None
prev_collection = clique.assemble(
repre_file_paths,
patterns=[clique.PATTERNS["frames"]],
minimum_items=1
)[0][0]
prev_col_format = prev_collection.format("{head}{padding}{tail}")
added_files = {}
anatomy = instance.context.data["anatomy"]
col_format = collection.format("{head}{padding}{tail}")
for frame in range(start_frame, end_frame + 1):
if frame in collection.indexes:
continue
hole_fpath = os.path.join(staging_dir, col_format % frame)
previous_version_path = prev_col_format % frame
# limits too large padding coming from Anatomy
previous_version_path = (
os.path.join(
anatomy.fill_root(os.path.dirname(previous_version_path)),
os.path.basename(previous_version_path)
)
)
if not os.path.exists(previous_version_path):
self.log.warning(
"Missing frame should be replaced from "
f"'{previous_version_path}' but that doesn't exist. "
"Falling back to filling from currently last rendered."
)
return None
self.log.warning(
f"Replacing missing '{hole_fpath}' with "
f"'{previous_version_path}'"
)
speedcopy.copyfile(previous_version_path, hole_fpath)
added_files[frame] = hole_fpath
return added_files
def _get_last_version_files(
self,
instance: pyblish.plugin.Instance,
current_repre: Dict[Any, Any],
):
product_name = instance.data["productName"]
project_name = instance.data["projectEntity"]["name"]
folder_entity = instance.data["folderEntity"]
version_entity = get_last_version_by_product_name(
project_name,
product_name,
folder_entity["id"],
fields={"id"}
)
if not version_entity:
return None
repres = get_representations(
project_name,
version_ids=[version_entity["id"]]
)
matching_repre = None
for repre in repres:
if repre["name"] == current_repre["name"]:
matching_repre = repre
break
if not matching_repre:
return None
repre_file_paths = [
file_info["path"]
for file_info in matching_repre["files"]
]
return repre_file_paths
def fill_sequence_gaps_with_blanks(
self,
collection: str,
@ -1052,6 +1166,14 @@ class ExtractReview(pyblish.api.InstancePlugin):
# Make sure to have full path to one input file
full_input_path_single_file = full_input_path
filled_files = temp_data.get("filled_files", {})
if filled_files:
first_frame, first_file = list(filled_files.items())[0]
if first_file < full_input_path_single_file:
self.log.warning(f"Using filled frame: '{first_file}'")
full_input_path_single_file = first_file
temp_data["first_sequence_frame"] = first_frame
filename_suffix = output_def["filename_suffix"]
output_ext = output_def.get("ext")