OP-4465 - extract burnin is triggered by profiles in Settings

Cleaned up obsolete methods
This commit is contained in:
Petr Kalis 2022-12-12 17:41:51 +01:00
parent b50e609874
commit 3296ac68ef

View file

@ -1,5 +1,4 @@
import os
import re
import json
import copy
import tempfile
@ -21,6 +20,7 @@ from openpype.lib import (
CREATE_NO_WINDOW
)
from openpype.lib.profiles_filtering import filter_profiles
class ExtractBurnin(publish.Extractor):
@ -34,25 +34,7 @@ class ExtractBurnin(publish.Extractor):
label = "Extract burnins"
order = pyblish.api.ExtractorOrder + 0.03
families = ["review", "burnin"]
hosts = [
"nuke",
"maya",
"shell",
"hiero",
"premiere",
"traypublisher",
"standalonepublisher",
"harmony",
"fusion",
"aftereffects",
"tvpaint",
"webpublisher",
"aftereffects",
"photoshop",
"flame"
# "resolve"
]
optional = True
positions = [
@ -69,11 +51,15 @@ class ExtractBurnin(publish.Extractor):
"y_offset": 5
}
# Preset attributes
# Configurable by Settings
profiles = None
options = None
def process(self, instance):
if not self.profiles:
self.log.warning("No profiles present for create burnin")
return
# QUESTION what is this for and should we raise an exception?
if "representations" not in instance.data:
raise RuntimeError("Burnin needs already created mov to work on.")
@ -137,18 +123,29 @@ class ExtractBurnin(publish.Extractor):
return filtered_repres
def main_process(self, instance):
# TODO get these data from context
host_name = instance.context.data["hostName"]
task_name = os.environ["AVALON_TASK"]
family = self.main_family_from_instance(instance)
host_name = instance.data["anatomyData"]["app"]
families = list(set(instance.data["family"]).union(
set(instance.data["families"])))
task_data = instance.data["anatomyData"].get("task", {})
task_name = task_data.get("name")
task_type = task_data.get("type")
subset = instance.data["subset"]
filtering_criteria = {
"hosts": host_name,
"families": families,
"task_names": task_name,
"task_types": task_type,
"subset": subset
}
profile = filter_profiles(self.profiles, filtering_criteria,
logger=self.log)
# Find profile most matching current host, task and instance family
profile = self.find_matching_profile(host_name, task_name, family)
if not profile:
self.log.info((
"Skipped instance. None of profiles in presets are for"
" Host: \"{}\" | Family: \"{}\" | Task \"{}\""
).format(host_name, family, task_name))
" Host: \"{}\" | Families: \"{}\" | Task \"{}\" | Task type \"{}\" | Subset \"{}\" "
).format(host_name, families, task_name, task_type, subset))
return
self.log.debug("profile: {}".format(profile))
@ -158,8 +155,8 @@ class ExtractBurnin(publish.Extractor):
if not burnin_defs:
self.log.info((
"Skipped instance. Burnin definitions are not set for profile"
" Host: \"{}\" | Family: \"{}\" | Task \"{}\" | Profile \"{}\""
).format(host_name, family, task_name, profile))
" Host: \"{}\" | Families: \"{}\" | Task \"{}\" | Profile \"{}\""
).format(host_name, families, task_name, profile))
return
burnin_options = self._get_burnin_options()
@ -693,130 +690,6 @@ class ExtractBurnin(publish.Extractor):
)
})
def find_matching_profile(self, host_name, task_name, family):
""" Filter profiles by Host name, Task name and main Family.
Filtering keys are "hosts" (list), "tasks" (list), "families" (list).
If key is not find or is empty than it's expected to match.
Args:
profiles (list): Profiles definition from presets.
host_name (str): Current running host name.
task_name (str): Current context task name.
family (str): Main family of current Instance.
Returns:
dict/None: Return most matching profile or None if none of profiles
match at least one criteria.
"""
matching_profiles = None
highest_points = -1
for profile in self.profiles or tuple():
profile_points = 0
profile_value = []
# Host filtering
host_names = profile.get("hosts")
match = self.validate_value_by_regexes(host_name, host_names)
if match == -1:
continue
profile_points += match
profile_value.append(bool(match))
# Task filtering
task_names = profile.get("tasks")
match = self.validate_value_by_regexes(task_name, task_names)
if match == -1:
continue
profile_points += match
profile_value.append(bool(match))
# Family filtering
families = profile.get("families")
match = self.validate_value_by_regexes(family, families)
if match == -1:
continue
profile_points += match
profile_value.append(bool(match))
if profile_points > highest_points:
matching_profiles = []
highest_points = profile_points
if profile_points == highest_points:
profile["__value__"] = profile_value
matching_profiles.append(profile)
if not matching_profiles:
return
if len(matching_profiles) == 1:
return matching_profiles[0]
return self.profile_exclusion(matching_profiles)
def profile_exclusion(self, matching_profiles):
"""Find out most matching profile by host, task and family match.
Profiles are selectivelly filtered. Each profile should have
"__value__" key with list of booleans. Each boolean represents
existence of filter for specific key (host, taks, family).
Profiles are looped in sequence. In each sequence are split into
true_list and false_list. For next sequence loop are used profiles in
true_list if there are any profiles else false_list is used.
Filtering ends when only one profile left in true_list. Or when all
existence booleans loops passed, in that case first profile from left
profiles is returned.
Args:
matching_profiles (list): Profiles with same values.
Returns:
dict: Most matching profile.
"""
self.log.info(
"Search for first most matching profile in match order:"
" Host name -> Task name -> Family."
)
# Filter all profiles with highest points value. First filter profiles
# with matching host if there are any then filter profiles by task
# name if there are any and lastly filter by family. Else use first in
# list.
idx = 0
final_profile = None
while True:
profiles_true = []
profiles_false = []
for profile in matching_profiles:
value = profile["__value__"]
# Just use first profile when idx is greater than values.
if not idx < len(value):
final_profile = profile
break
if value[idx]:
profiles_true.append(profile)
else:
profiles_false.append(profile)
if final_profile is not None:
break
if profiles_true:
matching_profiles = profiles_true
else:
matching_profiles = profiles_false
if len(matching_profiles) == 1:
final_profile = matching_profiles[0]
break
idx += 1
final_profile.pop("__value__")
return final_profile
def filter_burnins_defs(self, profile, instance):
"""Filter outputs by their values from settings.
@ -909,56 +782,6 @@ class ExtractBurnin(publish.Extractor):
return True
return False
def compile_list_of_regexes(self, in_list):
"""Convert strings in entered list to compiled regex objects."""
regexes = []
if not in_list:
return regexes
for item in in_list:
if not item:
continue
try:
regexes.append(re.compile(item))
except TypeError:
self.log.warning((
"Invalid type \"{}\" value \"{}\"."
" Expected string based object. Skipping."
).format(str(type(item)), str(item)))
return regexes
def validate_value_by_regexes(self, value, in_list):
"""Validate in any regexe from list match entered value.
Args:
in_list (list): List with regexes.
value (str): String where regexes is checked.
Returns:
int: Returns `0` when list is not set or is empty. Returns `1` when
any regex match value and returns `-1` when none of regexes
match value entered.
"""
if not in_list:
return 0
output = -1
regexes = self.compile_list_of_regexes(in_list)
for regex in regexes:
if re.match(regex, value):
output = 1
break
return output
def main_family_from_instance(self, instance):
"""Return main family of entered instance."""
family = instance.data.get("family")
if not family:
family = instance.data["families"][0]
return family
def families_from_instance(self, instance):
"""Return all families of entered instance."""
families = []