mirror of
https://github.com/ynput/ayon-core.git
synced 2026-01-01 16:34:53 +01:00
308 lines
11 KiB
Python
308 lines
11 KiB
Python
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import pyblish.api
|
|
from openpype.lib import (
|
|
get_ffmpeg_tool_args,
|
|
get_oiio_tool_args,
|
|
is_oiio_supported,
|
|
|
|
run_subprocess,
|
|
path_to_subprocess_arg,
|
|
)
|
|
|
|
|
|
class ExtractThumbnail(pyblish.api.InstancePlugin):
|
|
"""Create jpg thumbnail from sequence using ffmpeg"""
|
|
|
|
label = "Extract Thumbnail"
|
|
order = pyblish.api.ExtractorOrder + 0.49
|
|
families = [
|
|
"imagesequence", "render", "render2d", "prerender",
|
|
"source", "clip", "take", "online", "image"
|
|
]
|
|
hosts = [
|
|
"shell",
|
|
"fusion",
|
|
"resolve",
|
|
"traypublisher",
|
|
"substancepainter",
|
|
"nuke",
|
|
]
|
|
enabled = False
|
|
|
|
# presentable attribute
|
|
ffmpeg_args = None
|
|
|
|
def process(self, instance):
|
|
# run main process
|
|
self._main_process(instance)
|
|
|
|
# Make sure cleanup happens to representations which are having both
|
|
# tags `delete` and `need_thumbnail`
|
|
for repre in tuple(instance.data["representations"]):
|
|
tags = repre.get("tags") or []
|
|
# skip representations which are going to be published on farm
|
|
if "publish_on_farm" in tags:
|
|
continue
|
|
if (
|
|
"delete" in tags
|
|
and "need_thumbnail" in tags
|
|
):
|
|
self.log.debug(
|
|
"Removing representation: {}".format(repre)
|
|
)
|
|
instance.data["representations"].remove(repre)
|
|
|
|
def _main_process(self, instance):
|
|
subset_name = instance.data["subset"]
|
|
instance_repres = instance.data.get("representations")
|
|
if not instance_repres:
|
|
self.log.debug((
|
|
"Instance {} does not have representations. Skipping"
|
|
).format(subset_name))
|
|
return
|
|
|
|
self.log.debug(
|
|
"Processing instance with subset name {}".format(subset_name)
|
|
)
|
|
|
|
# Skip if instance have 'review' key in data set to 'False'
|
|
if not self._is_review_instance(instance):
|
|
self.log.debug("Skipping - no review set on instance.")
|
|
return
|
|
|
|
# Check if already has thumbnail created
|
|
if self._already_has_thumbnail(instance_repres):
|
|
self.log.debug("Thumbnail representation already present.")
|
|
return
|
|
|
|
# skip crypto passes.
|
|
# TODO: This is just a quick fix and has its own side-effects - it is
|
|
# affecting every subset name with `crypto` in its name.
|
|
# This must be solved properly, maybe using tags on
|
|
# representation that can be determined much earlier and
|
|
# with better precision.
|
|
if "crypto" in subset_name.lower():
|
|
self.log.debug("Skipping crypto passes.")
|
|
return
|
|
|
|
# first check for any explicitly marked representations for thumbnail
|
|
explicit_repres = self._get_explicit_repres_for_thumbnail(instance)
|
|
if explicit_repres:
|
|
filtered_repres = explicit_repres
|
|
else:
|
|
filtered_repres = self._get_filtered_repres(instance)
|
|
|
|
if not filtered_repres:
|
|
self.log.info(
|
|
"Instance doesn't have representations that can be used "
|
|
"as source for thumbnail. Skipping thumbnail extraction."
|
|
)
|
|
return
|
|
|
|
# Create temp directory for thumbnail
|
|
# - this is to avoid "override" of source file
|
|
dst_staging = tempfile.mkdtemp(prefix="pyblish_tmp_")
|
|
self.log.debug(
|
|
"Create temp directory {} for thumbnail".format(dst_staging)
|
|
)
|
|
# Store new staging to cleanup paths
|
|
instance.context.data["cleanupFullPaths"].append(dst_staging)
|
|
|
|
thumbnail_created = False
|
|
oiio_supported = is_oiio_supported()
|
|
for repre in filtered_repres:
|
|
repre_files = repre["files"]
|
|
if not isinstance(repre_files, (list, tuple)):
|
|
input_file = repre_files
|
|
else:
|
|
file_index = int(float(len(repre_files)) * 0.5)
|
|
input_file = repre_files[file_index]
|
|
|
|
src_staging = os.path.normpath(repre["stagingDir"])
|
|
full_input_path = os.path.join(src_staging, input_file)
|
|
self.log.debug("input {}".format(full_input_path))
|
|
filename = os.path.splitext(input_file)[0]
|
|
jpeg_file = filename + "_thumb.jpg"
|
|
full_output_path = os.path.join(dst_staging, jpeg_file)
|
|
|
|
if oiio_supported:
|
|
self.log.debug("Trying to convert with OIIO")
|
|
# If the input can read by OIIO then use OIIO method for
|
|
# conversion otherwise use ffmpeg
|
|
thumbnail_created = self.create_thumbnail_oiio(
|
|
full_input_path, full_output_path
|
|
)
|
|
|
|
# Try to use FFMPEG if OIIO is not supported or for cases when
|
|
# oiiotool isn't available
|
|
if not thumbnail_created:
|
|
if oiio_supported:
|
|
self.log.debug(
|
|
"Converting with FFMPEG because input"
|
|
" can't be read by OIIO."
|
|
)
|
|
|
|
thumbnail_created = self.create_thumbnail_ffmpeg(
|
|
full_input_path, full_output_path
|
|
)
|
|
|
|
# Skip representation and try next one if wasn't created
|
|
if not thumbnail_created:
|
|
continue
|
|
|
|
if len(explicit_repres) > 1:
|
|
repre_name = "thumbnail_{}".format(repre["outputName"])
|
|
else:
|
|
repre_name = "thumbnail"
|
|
|
|
# add thumbnail path to instance data for integrator
|
|
instance_thumb_path = instance.data.get("thumbnailPath")
|
|
if (
|
|
not instance_thumb_path
|
|
or not os.path.isfile(instance_thumb_path)
|
|
):
|
|
instance.data["thumbnailPath"] = full_output_path
|
|
|
|
new_repre = {
|
|
"name": repre_name,
|
|
"ext": "jpg",
|
|
"files": jpeg_file,
|
|
"stagingDir": dst_staging,
|
|
"thumbnail": True,
|
|
"tags": ["thumbnail"]
|
|
}
|
|
|
|
# adding representation
|
|
instance.data["representations"].append(new_repre)
|
|
|
|
if explicit_repres:
|
|
# this key will then align assetVersion ftrack thumbnail sync
|
|
new_repre["outputName"] = repre["outputName"]
|
|
self.log.debug(
|
|
"Adding explicit thumbnail representation: {}".format(
|
|
new_repre))
|
|
else:
|
|
self.log.debug(
|
|
"Adding thumbnail representation: {}".format(new_repre)
|
|
)
|
|
# There is no need to create more then one thumbnail
|
|
break
|
|
|
|
if not thumbnail_created:
|
|
self.log.warning("Thumbnail has not been created.")
|
|
|
|
def _is_review_instance(self, instance):
|
|
# TODO: We should probably handle "not creating" of thumbnail
|
|
# other way then checking for "review" key on instance data?
|
|
if instance.data.get("review", True):
|
|
return True
|
|
return False
|
|
|
|
def _already_has_thumbnail(self, repres):
|
|
for repre in repres:
|
|
self.log.debug("repre {}".format(repre))
|
|
if repre["name"] == "thumbnail":
|
|
return True
|
|
return False
|
|
|
|
def _get_explicit_repres_for_thumbnail(self, instance):
|
|
src_repres = instance.data.get("representations") or []
|
|
# This is mainly for Nuke where we have multiple representations for
|
|
# one instance. We want to use only one representation for thumbnail
|
|
# first check if any of the representations have
|
|
# `need_thumbnail` in tags and add them to filtered_repres
|
|
need_thumb_repres = [
|
|
repre for repre in src_repres
|
|
if "need_thumbnail" in repre.get("tags", [])
|
|
if "publish_on_farm" not in repre.get("tags", [])
|
|
]
|
|
if not need_thumb_repres:
|
|
return []
|
|
|
|
self.log.info(
|
|
"Instance has representation with tag `need_thumbnail`. "
|
|
"Using only this representations for thumbnail creation. "
|
|
)
|
|
self.log.debug(
|
|
"Representations: {}".format(need_thumb_repres)
|
|
)
|
|
return need_thumb_repres
|
|
|
|
def _get_filtered_repres(self, instance):
|
|
filtered_repres = []
|
|
src_repres = instance.data.get("representations") or []
|
|
for repre in src_repres:
|
|
self.log.debug(repre)
|
|
tags = repre.get("tags") or []
|
|
valid = "review" in tags or "thumb-nuke" in tags
|
|
if not valid:
|
|
continue
|
|
|
|
if not repre.get("files"):
|
|
self.log.debug((
|
|
"Representation \"{}\" doesn't have files. Skipping"
|
|
).format(repre["name"]))
|
|
continue
|
|
|
|
filtered_repres.append(repre)
|
|
return filtered_repres
|
|
|
|
def create_thumbnail_oiio(self, src_path, dst_path):
|
|
self.log.debug("Extracting thumbnail with OIIO: {}".format(dst_path))
|
|
oiio_cmd = get_oiio_tool_args(
|
|
"oiiotool",
|
|
"-a", src_path,
|
|
"-o", dst_path
|
|
)
|
|
self.log.debug("running: {}".format(" ".join(oiio_cmd)))
|
|
try:
|
|
run_subprocess(oiio_cmd, logger=self.log)
|
|
return True
|
|
except Exception:
|
|
self.log.warning(
|
|
"Failed to create thumbnail using oiiotool",
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
def create_thumbnail_ffmpeg(self, src_path, dst_path):
|
|
self.log.debug("Extracting thumbnail with FFMPEG: {}".format(dst_path))
|
|
|
|
ffmpeg_path_args = get_ffmpeg_tool_args("ffmpeg")
|
|
ffmpeg_args = self.ffmpeg_args or {}
|
|
|
|
jpeg_items = [
|
|
subprocess.list2cmdline(ffmpeg_path_args)
|
|
]
|
|
# flag for large file sizes
|
|
max_int = 2147483647
|
|
jpeg_items.extend([
|
|
"-y",
|
|
"-analyzeduration", str(max_int),
|
|
"-probesize", str(max_int),
|
|
])
|
|
# use same input args like with mov
|
|
jpeg_items.extend(ffmpeg_args.get("input") or [])
|
|
# input file
|
|
jpeg_items.extend(["-i", path_to_subprocess_arg(src_path)])
|
|
# output arguments from presets
|
|
jpeg_items.extend(ffmpeg_args.get("output") or [])
|
|
# we just want one frame from movie files
|
|
jpeg_items.extend(["-vframes", "1"])
|
|
# output file
|
|
jpeg_items.append(path_to_subprocess_arg(dst_path))
|
|
subprocess_command = " ".join(jpeg_items)
|
|
try:
|
|
run_subprocess(
|
|
subprocess_command, shell=True, logger=self.log
|
|
)
|
|
return True
|
|
except Exception:
|
|
self.log.warning(
|
|
"Failed to create thumbnail using ffmpeg",
|
|
exc_info=True
|
|
)
|
|
return False
|