use xml format to get information about input file from oiio

This commit is contained in:
Jakub Trllo 2022-02-02 20:59:01 +01:00
parent 13266d005c
commit f386045a0d

View file

@ -4,12 +4,35 @@ import logging
import collections
import tempfile
import xml.etree.ElementTree
from .execute import run_subprocess
from .vendor_bin_utils import (
get_oiio_tools_path,
is_oiio_supported
)
# Max length of string that is supported by ffmpeg
MAX_FFMPEG_STRING_LEN = 8196
# OIIO known xml tags
STRING_TAGS = {
"format"
}
INT_TAGS = {
"x", "y", "z",
"width", "height", "depth",
"full_x", "full_y", "full_z",
"full_width", "full_height", "full_depth",
"tile_width", "tile_height", "tile_depth",
"nchannels",
"alpha_channel",
"z_channel",
"deep",
"subimages",
}
# Regex to parse array attributes
ARRAY_TYPE_REGEX = re.compile(r"^(int|float|string)\[\d+\]$")
def get_transcode_temp_directory():
"""Creates temporary folder for transcoding.
@ -24,87 +47,211 @@ def get_transcode_temp_directory():
def get_oiio_info_for_input(filepath, logger=None):
"""Call oiiotool to get information about input and return stdout."""
args = [
get_oiio_tools_path(), "--info", "-v", filepath
]
return run_subprocess(args, logger=logger)
"""Call oiiotool to get information about input and return stdout.
def parse_oiio_info(oiio_info):
"""Create an object based on output from oiiotool.
Removes quotation marks from compression value. Parse channels into
dictionary - key is channel name value is determined type of channel
(e.g. 'uint', 'float').
Args:
oiio_info (str): Output of calling "oiiotool --info -v <path>"
Returns:
dict: Loaded data from output.
Stdout should contain xml format string.
"""
lines = [
line.strip()
for line in oiio_info.split("\n")
args = [
get_oiio_tools_path(), "--info", "-v", "-i:infoformat=xml", filepath
]
# Each line should contain information about one key
# key - value are separated with ": "
oiio_sep = ": "
data_map = {}
for line in lines:
parts = line.split(oiio_sep)
if len(parts) < 2:
output = run_subprocess(args, logger=logger)
output = output.replace("\r\n", "\n")
xml_started = False
lines = []
for line in output.split("\n"):
if not xml_started:
if not line.startswith("<"):
continue
xml_started = True
if xml_started:
lines.append(line)
if not xml_started:
raise ValueError(
"Failed to read input file \"{}\".\nOutput:\n{}".format(
filepath, output
)
)
xml_text = "\n".join(lines)
return parse_oiio_xml_output(xml_text, logger=logger)
class RationalToInt:
"""Rational value stored as division of 2 integers using string."""
def __init__(self, string_value):
parts = string_value.split("/")
top = float(parts[0])
bottom = 1.0
if len(parts) != 1:
bottom = float(parts[1])
self._value = top / bottom
self._string_value = string_value
@property
def value(self):
return self._value
@property
def string_value(self):
return self._string_value
def __format__(self, *args, **kwargs):
return self._string_value.__format__(*args, **kwargs)
def __float__(self):
return self._value
def __str__(self):
return self._string_value
def __repr__(self):
return "<{}> {}".format(self.__class__.__name__, self._string_value)
def convert_value_by_type_name(value_type, value, logger=None):
"""Convert value to proper type based on type name.
In some cases value types have custom python class.
"""
if logger is None:
logger = logging.getLogger(__name__)
# Simple types
if value_type == "string":
return value
if value_type == "int":
return int(value)
if value_type == "float":
return float(value)
# Vectors will probably have more types
if value_type == "vec2f":
return [float(item) for item in value.split(",")]
# Matrix should be always have square size of element 3x3, 4x4
# - are returned as list of lists
if value_type == "matrix":
output = []
current_index = -1
parts = value.split(",")
parts_len = len(parts)
if parts_len == 1:
divisor = 1
elif parts_len == 4:
divisor = 2
elif parts_len == 9:
divisor == 3
elif parts_len == 16:
divisor = 4
else:
logger.info("Unknown matrix resolution {}. Value: \"{}\"".format(
parts_len, value
))
for part in parts:
output.append(float(part))
return output
for idx, item in enumerate(parts):
list_index = idx % divisor
if list_index > current_index:
current_index = list_index
output.append([])
output[list_index].append(float(item))
return output
if value_type == "rational2i":
return RationalToInt(value)
# Array of other types is converted to list
re_result = ARRAY_TYPE_REGEX.findall(value_type)
if re_result:
array_type = re_result[0]
output = []
for item in value.split(","):
output.append(convert_value_by_type_name(array_type, item, logger=logger))
return output
logger.info((
"MISSING IMPLEMENTATION:"
" Unknown attrib type \"{}\". Value: {}"
).format(value_type, value))
return value
def parse_oiio_xml_output(xml_string, logger=None):
"""Parse xml output from OIIO info command."""
output = {}
if not xml_string:
return output
if logger is None:
logger = logging.getLogger("OIIO-xml-parse")
tree = xml.etree.ElementTree.fromstring(xml_string)
attribs = {}
output["attribs"] = attribs
for child in tree:
tag_name = child.tag
if tag_name == "attrib":
attrib_def = child.attrib
value = convert_value_by_type_name(attrib_def["type"], child.text, logger=logger)
attribs[attrib_def["name"]] = value
continue
key = parts.pop(0)
value = oiio_sep.join(parts)
data_map[key] = value
if "compression" in data_map:
value = data_map["compression"]
data_map["compression"] = value.replace("\"", "")
# Channels are stored as tex on each child
if tag_name == "channelnames":
value = []
for channel in child:
value.append(channel.text)
channels_info = {}
channels_value = data_map.get("channel list") or ""
if channels_value:
channels = channels_value.split(", ")
type_regex = re.compile(r"(?P<name>[^\(]+) \((?P<type>[^\)]+)\)")
for channel in channels:
match = type_regex.search(channel)
if not match:
channel_name = channel
channel_type = "uint"
else:
channel_name = match.group("name")
channel_type = match.group("type")
channels_info[channel_name] = channel_type
data_map["channels_info"] = channels_info
return data_map
# Convert known integer type tags to int
elif tag_name in INT_TAGS:
value = int(child.text)
# Keep value of known string tags
elif tag_name in STRING_TAGS:
value = child.text
# Keep value as text for unknown tags
# - feel free to add more tags
else:
value = child.text
logger.info((
"MISSING IMPLEMENTATION:"
" Unknown tag \"{}\". Value \"{}\""
).format(tag_name, value))
output[child.tag] = value
return output
def get_convert_rgb_channels(channels_info):
def get_convert_rgb_channels(channel_names):
"""Get first available RGB(A) group from channels info.
## Examples
```
# Ideal situation
channels_info: {
"R": ...,
"G": ...,
"B": ...,
"A": ...
channels_info: [
"R", "G", "B", "A"
}
```
Result will be `("R", "G", "B", "A")`
```
# Not ideal situation
channels_info: {
"beauty.red": ...,
"beuaty.green": ...,
"beauty.blue": ...,
"depth.Z": ...
}
channels_info: [
"beauty.red",
"beuaty.green",
"beauty.blue",
"depth.Z"
]
```
Result will be `("beauty.red", "beauty.green", "beauty.blue", None)`
@ -116,7 +263,7 @@ def get_convert_rgb_channels(channels_info):
"""
rgb_by_main_name = collections.defaultdict(dict)
main_name_order = [""]
for channel_name in channels_info.keys():
for channel_name in channel_names:
name_parts = channel_name.split(".")
rgb_part = name_parts.pop(-1).lower()
main_name = ".".join(name_parts)
@ -166,17 +313,18 @@ def should_convert_for_ffmpeg(src_filepath):
return None
# Load info about info from oiio tool
oiio_info = get_oiio_info_for_input(src_filepath)
input_info = parse_oiio_info(oiio_info)
input_info = get_oiio_info_for_input(src_filepath)
if not input_info:
return None
# Check compression
compression = input_info["compression"]
compression = input_info["attribs"].get("compression")
if compression in ("dwaa", "dwab"):
return True
# Check channels
channels_info = input_info["channels_info"]
review_channels = get_convert_rgb_channels(channels_info)
channel_names = input_info["channelnames"]
review_channels = get_convert_rgb_channels(channel_names)
if review_channels is None:
return None
@ -221,12 +369,11 @@ def convert_for_ffmpeg(
if input_frame_start is not None and input_frame_end is not None:
is_sequence = int(input_frame_end) != int(input_frame_start)
oiio_info = get_oiio_info_for_input(first_input_path)
input_info = parse_oiio_info(oiio_info)
input_info = get_oiio_info_for_input(first_input_path)
# Change compression only if source compression is "dwaa" or "dwab"
# - they're not supported in ffmpeg
compression = input_info["compression"]
compression = input_info["attribs"].get("compression")
if compression in ("dwaa", "dwab"):
compression = "none"
@ -237,8 +384,9 @@ def convert_for_ffmpeg(
first_input_path
]
channels_info = input_info["channels_info"]
review_channels = get_convert_rgb_channels(channels_info)
# Collect channels to export
channel_names = input_info["channelnames"]
review_channels = get_convert_rgb_channels(channel_names)
if review_channels is None:
raise ValueError(
"Couldn't find channels that can be used for conversion."