add ffmpeg-python, and publish quicktime to ftrack

This commit is contained in:
Milan Kolar 2018-12-07 17:38:16 +01:00
parent bdbd1a7189
commit dee6f9a7a7
27 changed files with 2268 additions and 49 deletions

View file

@ -275,6 +275,7 @@ def collect_animation_data():
# get scene values as defaults
start = cmds.playbackOptions(query=True, animationStartTime=True)
end = cmds.playbackOptions(query=True, animationEndTime=True)
fps = mel.eval('currentTimeUnitToFPS()')
# build attributes
data = OrderedDict()
@ -282,6 +283,7 @@ def collect_animation_data():
data["endFrame"] = end
data["handles"] = 1
data["step"] = 1.0
data["fps"] = fps
return data

View file

@ -64,7 +64,7 @@ class IntegrateFtrackApi(pyblish.api.InstancePlugin):
# Create a new entity if none exits.
if not assettype_entity:
assettype_entity = session.create("AssetType", assettype_data)
self.log.info(
self.log.debug(
"Created new AssetType with data: ".format(assettype_data)
)
@ -88,7 +88,7 @@ class IntegrateFtrackApi(pyblish.api.InstancePlugin):
# Create a new entity if none exits.
if not asset_entity:
asset_entity = session.create("Asset", asset_data)
self.log.info(
self.log.debug(
info_msg.format(
entity_type="Asset",
data=asset_data,
@ -123,7 +123,7 @@ class IntegrateFtrackApi(pyblish.api.InstancePlugin):
assetversion_entity = session.create(
"AssetVersion", assetversion_data
)
self.log.info(
self.log.debug(
info_msg.format(
entity_type="AssetVersion",
data=assetversion_data,
@ -271,6 +271,9 @@ class IntegrateFtrackApi(pyblish.api.InstancePlugin):
existing_component_metadata.update(component_metadata)
component_entity["metadata"] = existing_component_metadata
# if component_data['name'] = 'ftrackreview-mp4-mp4':
# assetversion_entity["thumbnail_id"]
# Setting assetversion thumbnail
if data.get("thumbnail", False):
assetversion_entity["thumbnail_id"] = component_entity["id"]

View file

@ -1,6 +1,7 @@
import pyblish.api
import os
import clique
import json
class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
@ -13,6 +14,7 @@ class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
order = pyblish.api.IntegratorOrder + 0.48
label = 'Integrate Ftrack Component'
families = ["ftrack"]
family_mapping = {'camera': 'cam',
'look': 'look',
@ -21,7 +23,6 @@ class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
'rig': 'rig',
'setdress': 'setdress',
'pointcache': 'cache',
'review': 'mov',
'write': 'img',
'render': 'render'}
@ -53,6 +54,18 @@ class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
filename, ext = os.path.splitext(file)
self.log.debug('dest ext: ' + ext)
if family == 'review':
component_name = "ftrackreview-mp4"
location = ft_session.query(
'Location where name is "ftrack.server"').one()
metadata = {'ftr_meta': json.dumps({
'frameIn': int(instance.data["startFrame"]),
'frameOut': int(instance.data["startFrame"]),
'frameRate': 25})}
else:
component_name = ext[1:]
metadata = None
componentList.append({"assettype_data": {
"short": asset_type,
},
@ -63,11 +76,12 @@ class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
"version": version_number,
},
"component_data": {
"name": ext[1:], # Default component name is "main".
"name": component_name, # Default component name is "main".
"metadata": metadata
},
"component_path": file,
'component_location': location,
"component_overwrite": False,
"component_overwrite": False
}
)

View file

@ -0,0 +1,87 @@
import pyblish.api
import os
import clique
import json
class IntegrateFtrackInstance(pyblish.api.InstancePlugin):
"""Collect ftrack component data
Add ftrack component list to instance.
"""
order = pyblish.api.IntegratorOrder + 0.48
label = 'Integrate Ftrack Component'
families = ['review', 'ftrack']
family_mapping = {'camera': 'cam',
'look': 'look',
'mayaAscii': 'scene',
'model': 'geo',
'rig': 'rig',
'setdress': 'setdress',
'pointcache': 'cache',
'review': 'mov',
'write': 'img',
'render': 'render'}
def process(self, instance):
self.log.debug('instance {}'.format(instance))
assumed_data = instance.data["assumedTemplateData"]
assumed_version = assumed_data["version"]
version_number = int(assumed_version)
family = instance.data['family'].lower()
asset_type = ''
asset_type = self.family_mapping[family]
componentList = []
dst_list = instance.data['destination_list']
ft_session = instance.context.data["ftrackSession"]
for file in instance.data['destination_list']:
self.log.debug('file {}'.format(file))
for file in dst_list:
filename, ext = os.path.splitext(file)
self.log.debug('dest ext: ' + ext)
component_name = "ftrackreview-mp4"
location = ft_session.query(
'Location where name is "ftrack.server"').one()
metadata = {'ftr_meta': json.dumps({
'frameIn': int(instance.data["startFrame"]),
'frameOut': int(instance.data["startFrame"]),
'frameRate': 25})}
componentList.append({"assettype_data": {
"short": asset_type,
},
"asset_data": {
"name": instance.data["subset"],
},
"assetversion_data": {
"version": version_number,
},
"component_data": {
"name": component_name, # Default component name is "main".
"metadata": metadata
},
"component_path": file,
'component_location': location,
"component_overwrite": False
}
)
self.log.debug('componentsList: {}'.format(str(componentList)))
instance.data["ftrackComponentsList"] = componentList

View file

@ -18,16 +18,16 @@ class CleanUp(pyblish.api.InstancePlugin):
import tempfile
staging_dir = instance.data.get("stagingDir", None)
if not staging_dir or not os.path.exists(staging_dir):
self.log.info("No staging directory found: %s" % staging_dir)
return
temp_root = tempfile.gettempdir()
if not os.path.normpath(staging_dir).startswith(temp_root):
self.log.info("Skipping cleanup. Staging directory is not in the "
"temp folder: %s" % staging_dir)
return
self.log.info("Removing temporary folder ...")
shutil.rmtree(staging_dir)
# staging_dir = instance.data.get("stagingDir", None)
# if not staging_dir or not os.path.exists(staging_dir):
# self.log.info("No staging directory found: %s" % staging_dir)
# return
#
# temp_root = tempfile.gettempdir()
# if not os.path.normpath(staging_dir).startswith(temp_root):
# self.log.info("Skipping cleanup. Staging directory is not in the "
# "temp folder: %s" % staging_dir)
# return
#
# self.log.info("Removing temporary folder ...")
# shutil.rmtree(staging_dir)

View file

@ -15,7 +15,7 @@ class CollectAnimationOutputGeometry(pyblish.api.InstancePlugin):
"""
order = pyblish.api.CollectorOrder + 0.4
order = pyblish.api.CollectorOrder + 0.2
families = ["animation"]
label = "Collect Animation Output Geometry"
hosts = ["maya"]

View file

@ -203,7 +203,7 @@ class CollectLook(pyblish.api.InstancePlugin):
"""
order = pyblish.api.CollectorOrder + 0.4
order = pyblish.api.CollectorOrder + 0.2
families = ["look"]
label = "Collect Look"
hosts = ["maya"]

View file

@ -15,7 +15,8 @@ class CollectModelData(pyblish.api.InstancePlugin):
"""
order = pyblish.api.CollectorOrder + 0.499
order = pyblish.api.CollectorOrder + 0.2
label = 'Collect Model Data'
families = ["model"]

View file

@ -7,7 +7,7 @@ class CollectReviewData(pyblish.api.InstancePlugin):
"""
order = pyblish.api.CollectorOrder + 0.499
order = pyblish.api.CollectorOrder + 0.3
label = 'Collect Review Data'
families = ["review"]

View file

@ -23,7 +23,7 @@ class CollectSetDress(pyblish.api.InstancePlugin):
"""
order = pyblish.api.CollectorOrder + 0.49
order = pyblish.api.CollectorOrder + 0.2
label = "Set Dress"
families = ["setdress"]

View file

@ -26,7 +26,7 @@ class CollectYetiCache(pyblish.api.InstancePlugin):
Other information is the name of the transform and it's Colorbleed ID
"""
order = pyblish.api.CollectorOrder + 0.45
order = pyblish.api.CollectorOrder + 0.2
label = "Collect Yeti Cache"
families = ["yetiRig", "yeticache"]
hosts = ["maya"]

View file

@ -19,7 +19,7 @@ SETTINGS = {"renderDensity",
class CollectYetiRig(pyblish.api.InstancePlugin):
"""Collect all information of the Yeti Rig"""
order = pyblish.api.CollectorOrder + 0.4
order = pyblish.api.CollectorOrder + 0.2
label = "Collect Yeti Rig"
families = ["yetiRig"]
hosts = ["maya"]

View file

@ -1,15 +1,18 @@
import os
import contextlib
import time
import sys
import capture_gui
import clique
import pype.maya.lib as lib
import pype.api
from maya import cmds
import pymel.core as pm
from pype.vendor import ffmpeg
reload(ffmpeg)
import avalon.maya
@ -33,7 +36,7 @@ class ExtractQuicktime(pype.api.Extractor):
"""
label = "Quicktime (Alembic)"
label = "Quicktime"
hosts = ["maya"]
families = ["review"]
@ -126,29 +129,40 @@ class ExtractQuicktime(pype.api.Extractor):
preset['filename'] = path
preset['overwrite'] = True
# pm.refresh(f=True)
#
# refreshFrameInt = int(pm.playbackOptions(q=True, minTime=True))
# pm.currentTime(refreshFrameInt - 1, edit=True)
# pm.currentTime(refreshFrameInt, edit=True)
pm.refresh(f=True)
refreshFrameInt = int(pm.playbackOptions(q=True, minTime=True))
pm.currentTime(refreshFrameInt - 1, edit=True)
pm.currentTime(refreshFrameInt, edit=True)
with maintained_time():
playblast = capture_gui.lib.capture_scene(preset)
if "files" not in instance.data:
instance.data["files"] = list()
instance.data["files"].append(playblast)
self.log.info("file list {}".format(playblast))
# self.log.info("Calculating HUD data overlay")
# movieFullPth = path + ".mov"
# fls = [os.path.join(dir_path, f).replace("\\","/") for f in os.listdir( dir_path ) if f.endswith(preset['compression'])]
#self.log.info(" these %s" % fls[0])
# stagingdir = "C:/Users/milan.kolar/AppData/Local/Temp/pyblish_tmp_ucsymm"
collected_frames = os.listdir(stagingdir)
collections, remainder = clique.assemble(collected_frames)
input_path = os.path.join(stagingdir, collections[0].format('{head}{padding}{tail}'))
self.log.info("input {}".format(input_path))
movieFile = filename + ".mov"
full_movie_path = os.path.join(stagingdir, movieFile)
self.log.info("output {}".format(full_movie_path))
# fls = [os.path.join(stagingdir, filename).replace("\\","/") for f in os.listdir( dir_path ) if f.endswith(preset['compression'])]
# self.log.info("file list {}}".format(fls[0]))
out, err = (
ffmpeg
.input(input_path, framerate=25)
.output(full_movie_path)
.run(overwrite_output=True)
)
if "files" not in instance.data:
instance.data["files"] = list()
instance.data["files"].append(movieFile)
# ftrackStrings = fStrings.annotationData()
# nData = ftrackStrings.niceData
@ -172,10 +186,6 @@ class ExtractQuicktime(pype.api.Extractor):
# playblast = (ann.expPth).replace("\\","/")
instance.data["outputPath_qt"] = playblast
self.log.info("Outputting video to %s" % playblast)
@contextlib.contextmanager
def maintained_time():
ct = cmds.currentTime(query=True)

12
pype/vendor/builtins/__init__.py vendored Normal file
View file

@ -0,0 +1,12 @@
from __future__ import absolute_import
import sys
__future_module__ = True
if sys.version_info[0] < 3:
from __builtin__ import *
# Overwrite any old definitions with the equivalent future.builtins ones:
from future.builtins import *
else:
raise ImportError('This package should not be accessible on Python 3. '
'Either you are trying to run from the python-future src folder '
'or your installation of python-future is corrupted.')

10
pype/vendor/ffmpeg/__init__.py vendored Normal file
View file

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from . import _filters, _ffmpeg, _run, _probe
from ._filters import *
from ._ffmpeg import *
from ._run import *
from ._view import *
from ._probe import *
__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ + _view.__all__ + _probe.__all__

102
pype/vendor/ffmpeg/_ffmpeg.py vendored Normal file
View file

@ -0,0 +1,102 @@
from __future__ import unicode_literals
#from past.builtins import basestring
from ._utils import basestring
from .nodes import (
filter_operator,
GlobalNode,
InputNode,
MergeOutputsNode,
OutputNode,
output_operator,
)
def input(filename, **kwargs):
"""Input file URL (ffmpeg ``-i`` option)
Any supplied kwargs are passed to ffmpeg verbatim (e.g. ``t=20``,
``f='mp4'``, ``acodec='pcm'``, etc.).
To tell ffmpeg to read from stdin, use ``pipe:`` as the filename.
Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__
"""
kwargs['filename'] = filename
fmt = kwargs.pop('f', None)
if fmt:
if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt
return InputNode(input.__name__, kwargs=kwargs).stream()
@output_operator()
def global_args(stream, *args):
"""Add extra global command-line argument(s), e.g. ``-progress``.
"""
return GlobalNode(stream, global_args.__name__, args).stream()
@output_operator()
def overwrite_output(stream):
"""Overwrite output files without asking (ffmpeg ``-y`` option)
Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__
"""
return GlobalNode(stream, overwrite_output.__name__, ['-y']).stream()
@output_operator()
def merge_outputs(*streams):
"""Include all given outputs in one ffmpeg command line
"""
return MergeOutputsNode(streams, merge_outputs.__name__).stream()
@filter_operator()
def output(*streams_and_filename, **kwargs):
"""Output file URL
Syntax:
`ffmpeg.output(stream1[, stream2, stream3...], filename, **ffmpeg_args)`
Any supplied keyword arguments are passed to ffmpeg verbatim (e.g.
``t=20``, ``f='mp4'``, ``acodec='pcm'``, ``vcodec='rawvideo'``,
etc.). Some keyword-arguments are handled specially, as shown below.
Args:
video_bitrate: parameter for ``-b:v``, e.g. ``video_bitrate=1000``.
audio_bitrate: parameter for ``-b:a``, e.g. ``audio_bitrate=200``.
format: alias for ``-f`` parameter, e.g. ``format='mp4'``
(equivalent to ``f='mp4'``).
If multiple streams are provided, they are mapped to the same
output.
To tell ffmpeg to write to stdout, use ``pipe:`` as the filename.
Official documentation: `Synopsis <https://ffmpeg.org/ffmpeg.html#Synopsis>`__
"""
streams_and_filename = list(streams_and_filename)
if 'filename' not in kwargs:
if not isinstance(streams_and_filename[-1], basestring):
raise ValueError('A filename must be provided')
kwargs['filename'] = streams_and_filename.pop(-1)
streams = streams_and_filename
fmt = kwargs.pop('f', None)
if fmt:
if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt
return OutputNode(streams, output.__name__, kwargs=kwargs).stream()
__all__ = [
'input',
'merge_outputs',
'output',
'overwrite_output',
]

453
pype/vendor/ffmpeg/_filters.py vendored Normal file
View file

@ -0,0 +1,453 @@
from __future__ import unicode_literals
from .nodes import FilterNode, filter_operator
from ._utils import escape_chars
@filter_operator()
def filter_multi_output(stream_spec, filter_name, *args, **kwargs):
"""Apply custom filter with one or more outputs.
This is the same as ``filter_`` except that the filter can produce more than one output.
To reference an output stream, use either the ``.stream`` operator or bracket shorthand:
Example:
```
split = ffmpeg.input('in.mp4').filter_multi_output('split')
split0 = split.stream(0)
split1 = split[1]
ffmpeg.concat(split0, split1).output('out.mp4').run()
```
"""
return FilterNode(stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None)
@filter_operator()
def filter(stream_spec, filter_name, *args, **kwargs):
"""Apply custom filter.
``filter_`` is normally used by higher-level filter functions such as ``hflip``, but if a filter implementation
is missing from ``fmpeg-python``, you can call ``filter_`` directly to have ``fmpeg-python`` pass the filter name
and arguments to ffmpeg verbatim.
Args:
stream_spec: a Stream, list of Streams, or label-to-Stream dictionary mapping
filter_name: ffmpeg filter name, e.g. `colorchannelmixer`
*args: list of args to pass to ffmpeg verbatim
**kwargs: list of keyword-args to pass to ffmpeg verbatim
The function name is suffixed with ``_`` in order avoid confusion with the standard python ``filter`` function.
Example:
``ffmpeg.input('in.mp4').filter('hflip').output('out.mp4').run()``
"""
return filter_multi_output(stream_spec, filter_name, *args, **kwargs).stream()
@filter_operator()
def filter_(stream_spec, filter_name, *args, **kwargs):
"""Alternate name for ``filter``, so as to not collide with the
built-in python ``filter`` operator.
"""
return filter(stream_spec, filter_name, *args, **kwargs)
@filter_operator()
def split(stream):
return FilterNode(stream, split.__name__)
@filter_operator()
def asplit(stream):
return FilterNode(stream, asplit.__name__)
@filter_operator()
def setpts(stream, expr):
"""Change the PTS (presentation timestamp) of the input frames.
Args:
expr: The expression which is evaluated for each frame to construct its timestamp.
Official documentation: `setpts, asetpts <https://ffmpeg.org/ffmpeg-filters.html#setpts_002c-asetpts>`__
"""
return FilterNode(stream, setpts.__name__, args=[expr]).stream()
@filter_operator()
def trim(stream, **kwargs):
"""Trim the input so that the output contains one continuous subpart of the input.
Args:
start: Specify the time of the start of the kept section, i.e. the frame with the timestamp start will be the
first frame in the output.
end: Specify the time of the first frame that will be dropped, i.e. the frame immediately preceding the one
with the timestamp end will be the last frame in the output.
start_pts: This is the same as start, except this option sets the start timestamp in timebase units instead of
seconds.
end_pts: This is the same as end, except this option sets the end timestamp in timebase units instead of
seconds.
duration: The maximum duration of the output in seconds.
start_frame: The number of the first frame that should be passed to the output.
end_frame: The number of the first frame that should be dropped.
Official documentation: `trim <https://ffmpeg.org/ffmpeg-filters.html#trim>`__
"""
return FilterNode(stream, trim.__name__, kwargs=kwargs).stream()
@filter_operator()
def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs):
"""Overlay one video on top of another.
Args:
x: Set the expression for the x coordinates of the overlaid video on the main video. Default value is 0. In
case the expression is invalid, it is set to a huge value (meaning that the overlay will not be displayed
within the output visible area).
y: Set the expression for the y coordinates of the overlaid video on the main video. Default value is 0. In
case the expression is invalid, it is set to a huge value (meaning that the overlay will not be displayed
within the output visible area).
eof_action: The action to take when EOF is encountered on the secondary input; it accepts one of the following
values:
* ``repeat``: Repeat the last frame (the default).
* ``endall``: End both streams.
* ``pass``: Pass the main input through.
eval: Set when the expressions for x, and y are evaluated.
It accepts the following values:
* ``init``: only evaluate expressions once during the filter initialization or when a command is
processed
* ``frame``: evaluate expressions for each incoming frame
Default value is ``frame``.
shortest: If set to 1, force the output to terminate when the shortest input terminates. Default value is 0.
format: Set the format for the output video.
It accepts the following values:
* ``yuv420``: force YUV420 output
* ``yuv422``: force YUV422 output
* ``yuv444``: force YUV444 output
* ``rgb``: force packed RGB output
* ``gbrp``: force planar RGB output
Default value is ``yuv420``.
rgb (deprecated): If set to 1, force the filter to accept inputs in the RGB color space. Default value is 0.
This option is deprecated, use format instead.
repeatlast: If set to 1, force the filter to draw the last overlay frame over the main input until the end of
the stream. A value of 0 disables this behavior. Default value is 1.
Official documentation: `overlay <https://ffmpeg.org/ffmpeg-filters.html#overlay-1>`__
"""
kwargs['eof_action'] = eof_action
return FilterNode([main_parent_node, overlay_parent_node], overlay.__name__, kwargs=kwargs, max_inputs=2).stream()
@filter_operator()
def hflip(stream):
"""Flip the input video horizontally.
Official documentation: `hflip <https://ffmpeg.org/ffmpeg-filters.html#hflip>`__
"""
return FilterNode(stream, hflip.__name__).stream()
@filter_operator()
def vflip(stream):
"""Flip the input video vertically.
Official documentation: `vflip <https://ffmpeg.org/ffmpeg-filters.html#vflip>`__
"""
return FilterNode(stream, vflip.__name__).stream()
@filter_operator()
def crop(stream, x, y, width, height, **kwargs):
"""Crop the input video.
Args:
x: The horizontal position, in the input video, of the left edge of
the output video.
y: The vertical position, in the input video, of the top edge of the
output video.
width: The width of the output video. Must be greater than 0.
heigth: The height of the output video. Must be greater than 0.
Official documentation: `crop <https://ffmpeg.org/ffmpeg-filters.html#crop>`__
"""
return FilterNode(
stream,
crop.__name__,
args=[width, height, x, y],
kwargs=kwargs
).stream()
@filter_operator()
def drawbox(stream, x, y, width, height, color, thickness=None, **kwargs):
"""Draw a colored box on the input image.
Args:
x: The expression which specifies the top left corner x coordinate of the box. It defaults to 0.
y: The expression which specifies the top left corner y coordinate of the box. It defaults to 0.
width: Specify the width of the box; if 0 interpreted as the input width. It defaults to 0.
heigth: Specify the height of the box; if 0 interpreted as the input height. It defaults to 0.
color: Specify the color of the box to write. For the general syntax of this option, check the "Color" section
in the ffmpeg-utils manual. If the special value invert is used, the box edge color is the same as the
video with inverted luma.
thickness: The expression which sets the thickness of the box edge. Default value is 3.
w: Alias for ``width``.
h: Alias for ``height``.
c: Alias for ``color``.
t: Alias for ``thickness``.
Official documentation: `drawbox <https://ffmpeg.org/ffmpeg-filters.html#drawbox>`__
"""
if thickness:
kwargs['t'] = thickness
return FilterNode(stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs).stream()
@filter_operator()
def drawtext(stream, text=None, x=0, y=0, escape_text=True, **kwargs):
"""Draw a text string or text from a specified file on top of a video, using the libfreetype library.
To enable compilation of this filter, you need to configure FFmpeg with ``--enable-libfreetype``. To enable default
font fallback and the font option you need to configure FFmpeg with ``--enable-libfontconfig``. To enable the
text_shaping option, you need to configure FFmpeg with ``--enable-libfribidi``.
Args:
box: Used to draw a box around text using the background color. The value must be either 1 (enable) or 0
(disable). The default value of box is 0.
boxborderw: Set the width of the border to be drawn around the box using boxcolor. The default value of
boxborderw is 0.
boxcolor: The color to be used for drawing box around text. For the syntax of this option, check the "Color"
section in the ffmpeg-utils manual. The default value of boxcolor is "white".
line_spacing: Set the line spacing in pixels of the border to be drawn around the box using box. The default
value of line_spacing is 0.
borderw: Set the width of the border to be drawn around the text using bordercolor. The default value of
borderw is 0.
bordercolor: Set the color to be used for drawing border around text. For the syntax of this option, check the
"Color" section in the ffmpeg-utils manual. The default value of bordercolor is "black".
expansion: Select how the text is expanded. Can be either none, strftime (deprecated) or normal (default). See
the Text expansion section below for details.
basetime: Set a start time for the count. Value is in microseconds. Only applied in the deprecated strftime
expansion mode. To emulate in normal expansion mode use the pts function, supplying the start time (in
seconds) as the second argument.
fix_bounds: If true, check and fix text coords to avoid clipping.
fontcolor: The color to be used for drawing fonts. For the syntax of this option, check the "Color" section in
the ffmpeg-utils manual. The default value of fontcolor is "black".
fontcolor_expr: String which is expanded the same way as text to obtain dynamic fontcolor value. By default
this option has empty value and is not processed. When this option is set, it overrides fontcolor option.
font: The font family to be used for drawing text. By default Sans.
fontfile: The font file to be used for drawing text. The path must be included. This parameter is mandatory if
the fontconfig support is disabled.
alpha: Draw the text applying alpha blending. The value can be a number between 0.0 and 1.0. The expression
accepts the same variables x, y as well. The default value is 1. Please see fontcolor_expr.
fontsize: The font size to be used for drawing text. The default value of fontsize is 16.
text_shaping: If set to 1, attempt to shape the text (for example, reverse the order of right-to-left text and
join Arabic characters) before drawing it. Otherwise, just draw the text exactly as given. By default 1 (if
supported).
ft_load_flags: The flags to be used for loading the fonts. The flags map the corresponding flags supported by
libfreetype, and are a combination of the following values:
* ``default``
* ``no_scale``
* ``no_hinting``
* ``render``
* ``no_bitmap``
* ``vertical_layout``
* ``force_autohint``
* ``crop_bitmap``
* ``pedantic``
* ``ignore_global_advance_width``
* ``no_recurse``
* ``ignore_transform``
* ``monochrome``
* ``linear_design``
* ``no_autohint``
Default value is "default". For more information consult the documentation for the FT_LOAD_* libfreetype
flags.
shadowcolor: The color to be used for drawing a shadow behind the drawn text. For the syntax of this option,
check the "Color" section in the ffmpeg-utils manual. The default value of shadowcolor is "black".
shadowx: The x offset for the text shadow position with respect to the position of the text. It can be either
positive or negative values. The default value is "0".
shadowy: The y offset for the text shadow position with respect to the position of the text. It can be either
positive or negative values. The default value is "0".
start_number: The starting frame number for the n/frame_num variable. The default value is "0".
tabsize: The size in number of spaces to use for rendering the tab. Default value is 4.
timecode: Set the initial timecode representation in "hh:mm:ss[:;.]ff" format. It can be used with or without
text parameter. timecode_rate option must be specified.
rate: Set the timecode frame rate (timecode only).
timecode_rate: Alias for ``rate``.
r: Alias for ``rate``.
tc24hmax: If set to 1, the output of the timecode option will wrap around at 24 hours. Default is 0 (disabled).
text: The text string to be drawn. The text must be a sequence of UTF-8 encoded characters. This parameter is
mandatory if no file is specified with the parameter textfile.
textfile: A text file containing text to be drawn. The text must be a sequence of UTF-8 encoded characters.
This parameter is mandatory if no text string is specified with the parameter text. If both text and
textfile are specified, an error is thrown.
reload: If set to 1, the textfile will be reloaded before each frame. Be sure to update it atomically, or it
may be read partially, or even fail.
x: The expression which specifies the offset where text will be drawn within the video frame. It is relative to
the left border of the output image. The default value is "0".
y: The expression which specifies the offset where text will be drawn within the video frame. It is relative to
the top border of the output image. The default value is "0". See below for the list of accepted constants
and functions.
Expression constants:
The parameters for x and y are expressions containing the following constants and functions:
- dar: input display aspect ratio, it is the same as ``(w / h) * sar``
- hsub: horizontal chroma subsample values. For example for the pixel format "yuv422p" hsub is 2 and vsub
is 1.
- vsub: vertical chroma subsample values. For example for the pixel format "yuv422p" hsub is 2 and vsub
is 1.
- line_h: the height of each text line
- lh: Alias for ``line_h``.
- main_h: the input height
- h: Alias for ``main_h``.
- H: Alias for ``main_h``.
- main_w: the input width
- w: Alias for ``main_w``.
- W: Alias for ``main_w``.
- ascent: the maximum distance from the baseline to the highest/upper grid coordinate used to place a glyph
outline point, for all the rendered glyphs. It is a positive value, due to the grid's orientation with the Y
axis upwards.
- max_glyph_a: Alias for ``ascent``.
- descent: the maximum distance from the baseline to the lowest grid coordinate used to place a glyph outline
point, for all the rendered glyphs. This is a negative value, due to the grid's orientation, with the Y axis
upwards.
- max_glyph_d: Alias for ``descent``.
- max_glyph_h: maximum glyph height, that is the maximum height for all the glyphs contained in the rendered
text, it is equivalent to ascent - descent.
- max_glyph_w: maximum glyph width, that is the maximum width for all the glyphs contained in the rendered
text.
- n: the number of input frame, starting from 0
- rand(min, max): return a random number included between min and max
- sar: The input sample aspect ratio.
- t: timestamp expressed in seconds, NAN if the input timestamp is unknown
- text_h: the height of the rendered text
- th: Alias for ``text_h``.
- text_w: the width of the rendered text
- tw: Alias for ``text_w``.
- x: the x offset coordinates where the text is drawn.
- y: the y offset coordinates where the text is drawn.
These parameters allow the x and y expressions to refer each other, so you can for example specify
``y=x/dar``.
Official documentation: `drawtext <https://ffmpeg.org/ffmpeg-filters.html#drawtext>`__
"""
if text is not None:
if escape_text:
text = escape_chars(text, '\\\'%')
kwargs['text'] = text
if x != 0:
kwargs['x'] = x
if y != 0:
kwargs['y'] = y
return filter(stream, drawtext.__name__, **kwargs)
@filter_operator()
def concat(*streams, **kwargs):
"""Concatenate audio and video streams, joining them together one after the other.
The filter works on segments of synchronized video and audio streams. All segments must have the same number of
streams of each type, and that will also be the number of streams at output.
Args:
unsafe: Activate unsafe mode: do not fail if segments have a different format.
Related streams do not always have exactly the same duration, for various reasons including codec frame size or
sloppy authoring. For that reason, related synchronized streams (e.g. a video and its audio track) should be
concatenated at once. The concat filter will use the duration of the longest stream in each segment (except the
last one), and if necessary pad shorter audio streams with silence.
For this filter to work correctly, all segments must start at timestamp 0.
All corresponding streams must have the same parameters in all segments; the filtering system will automatically
select a common pixel format for video streams, and a common sample format, sample rate and channel layout for
audio streams, but other settings, such as resolution, must be converted explicitly by the user.
Different frame rates are acceptable but will result in variable frame rate at output; be sure to configure the
output file to handle it.
Official documentation: `concat <https://ffmpeg.org/ffmpeg-filters.html#concat>`__
"""
video_stream_count = kwargs.get('v', 1)
audio_stream_count = kwargs.get('a', 0)
stream_count = video_stream_count + audio_stream_count
if len(streams) % stream_count != 0:
raise ValueError(
'Expected concat input streams to have length multiple of {} (v={}, a={}); got {}'
.format(stream_count, video_stream_count, audio_stream_count, len(streams)))
kwargs['n'] = int(len(streams) / stream_count)
return FilterNode(streams, concat.__name__, kwargs=kwargs, max_inputs=None).stream()
@filter_operator()
def zoompan(stream, **kwargs):
"""Apply Zoom & Pan effect.
Args:
zoom: Set the zoom expression. Default is 1.
x: Set the x expression. Default is 0.
y: Set the y expression. Default is 0.
d: Set the duration expression in number of frames. This sets for how many number of frames effect will last
for single input image.
s: Set the output image size, default is ``hd720``.
fps: Set the output frame rate, default is 25.
z: Alias for ``zoom``.
Official documentation: `zoompan <https://ffmpeg.org/ffmpeg-filters.html#zoompan>`__
"""
return FilterNode(stream, zoompan.__name__, kwargs=kwargs).stream()
@filter_operator()
def hue(stream, **kwargs):
"""Modify the hue and/or the saturation of the input.
Args:
h: Specify the hue angle as a number of degrees. It accepts an expression, and defaults to "0".
s: Specify the saturation in the [-10,10] range. It accepts an expression and defaults to "1".
H: Specify the hue angle as a number of radians. It accepts an expression, and defaults to "0".
b: Specify the brightness in the [-10,10] range. It accepts an expression and defaults to "0".
Official documentation: `hue <https://ffmpeg.org/ffmpeg-filters.html#hue>`__
"""
return FilterNode(stream, hue.__name__, kwargs=kwargs).stream()
@filter_operator()
def colorchannelmixer(stream, *args, **kwargs):
"""Adjust video input frames by re-mixing color channels.
Official documentation: `colorchannelmixer <https://ffmpeg.org/ffmpeg-filters.html#colorchannelmixer>`__
"""
return FilterNode(stream, colorchannelmixer.__name__, kwargs=kwargs).stream()
__all__ = [
'colorchannelmixer',
'concat',
'crop',
'drawbox',
'drawtext',
'filter',
'filter_',
'filter_multi_output',
'hflip',
'hue',
'overlay',
'setpts',
'trim',
'vflip',
'zoompan',
]

25
pype/vendor/ffmpeg/_probe.py vendored Normal file
View file

@ -0,0 +1,25 @@
import json
import subprocess
from ._run import Error
def probe(filename, cmd='ffprobe'):
"""Run ffprobe on the specified file and return a JSON representation of the output.
Raises:
:class:`ffmpeg.Error`: if ffprobe returns a non-zero exit code,
an :class:`Error` is returned with a generic error message.
The stderr output can be retrieved by accessing the
``stderr`` property of the exception.
"""
args = [cmd, '-show_format', '-show_streams', '-of', 'json', filename]
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if p.returncode != 0:
raise Error('ffprobe', out, err)
return json.loads(out.decode('utf-8'))
__all__ = [
'probe',
]

222
pype/vendor/ffmpeg/_run.py vendored Normal file
View file

@ -0,0 +1,222 @@
from __future__ import unicode_literals
from .dag import get_outgoing_edges, topo_sort
from ._utils import basestring
from builtins import str
from functools import reduce
import collections
import copy
import operator
import subprocess
from ._ffmpeg import (
input,
output,
)
from .nodes import (
get_stream_spec_nodes,
FilterNode,
GlobalNode,
InputNode,
OutputNode,
output_operator,
)
class Error(Exception):
def __init__(self, cmd, stdout, stderr):
super(Error, self).__init__('{} error (see stderr output for detail)'.format(cmd))
self.stdout = stdout
self.stderr = stderr
def _convert_kwargs_to_cmd_line_args(kwargs):
args = []
for k in sorted(kwargs.keys()):
v = kwargs[k]
args.append('-{}'.format(k))
if v is not None:
args.append('{}'.format(v))
return args
def _get_input_args(input_node):
if input_node.name == input.__name__:
kwargs = copy.copy(input_node.kwargs)
filename = kwargs.pop('filename')
fmt = kwargs.pop('format', None)
video_size = kwargs.pop('video_size', None)
args = []
if fmt:
args += ['-f', fmt]
if video_size:
args += ['-video_size', '{}x{}'.format(video_size[0], video_size[1])]
args += _convert_kwargs_to_cmd_line_args(kwargs)
args += ['-i', filename]
else:
raise ValueError('Unsupported input node: {}'.format(input_node))
return args
def _format_input_stream_name(stream_name_map, edge, is_final_arg=False):
prefix = stream_name_map[edge.upstream_node, edge.upstream_label]
if not edge.upstream_selector:
suffix = ''
else:
suffix = ':{}'.format(edge.upstream_selector)
if is_final_arg and isinstance(edge.upstream_node, InputNode):
## Special case: `-map` args should not have brackets for input
## nodes.
fmt = '{}{}'
else:
fmt = '[{}{}]'
return fmt.format(prefix, suffix)
def _format_output_stream_name(stream_name_map, edge):
return '[{}]'.format(stream_name_map[edge.upstream_node, edge.upstream_label])
def _get_filter_spec(node, outgoing_edge_map, stream_name_map):
incoming_edges = node.incoming_edges
outgoing_edges = get_outgoing_edges(node, outgoing_edge_map)
inputs = [_format_input_stream_name(stream_name_map, edge) for edge in incoming_edges]
outputs = [_format_output_stream_name(stream_name_map, edge) for edge in outgoing_edges]
filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs))
return filter_spec
def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map):
stream_count = 0
for upstream_node in filter_nodes:
outgoing_edge_map = outgoing_edge_maps[upstream_node]
for upstream_label, downstreams in list(outgoing_edge_map.items()):
if len(downstreams) > 1:
# TODO: automatically insert `splits` ahead of time via graph transformation.
raise ValueError(
'Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
'`split` filter is probably required'.format(upstream_node, upstream_label))
stream_name_map[upstream_node, upstream_label] = 's{}'.format(stream_count)
stream_count += 1
def _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map):
_allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map)
filter_specs = [_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map) for node in filter_nodes]
return ';'.join(filter_specs)
def _get_global_args(node):
return list(node.args)
def _get_output_args(node, stream_name_map):
if node.name != output.__name__:
raise ValueError('Unsupported output node: {}'.format(node))
args = []
if len(node.incoming_edges) == 0:
raise ValueError('Output node {} has no mapped streams'.format(node))
for edge in node.incoming_edges:
# edge = node.incoming_edges[0]
stream_name = _format_input_stream_name(stream_name_map, edge, is_final_arg=True)
if stream_name != '0' or len(node.incoming_edges) > 1:
args += ['-map', stream_name]
kwargs = copy.copy(node.kwargs)
filename = kwargs.pop('filename')
if 'format' in kwargs:
args += ['-f', kwargs.pop('format')]
if 'video_bitrate' in kwargs:
args += ['-b:v', str(kwargs.pop('video_bitrate'))]
if 'audio_bitrate' in kwargs:
args += ['-b:a', str(kwargs.pop('audio_bitrate'))]
if 'video_size' in kwargs:
video_size = kwargs.pop('video_size')
if not isinstance(video_size, basestring) and isinstance(video_size, collections.Iterable):
video_size = '{}x{}'.format(video_size[0], video_size[1])
args += ['-video_size', video_size]
args += _convert_kwargs_to_cmd_line_args(kwargs)
args += [filename]
return args
@output_operator()
def get_args(stream_spec, overwrite_output=False):
"""Build command-line arguments to be passed to ffmpeg."""
nodes = get_stream_spec_nodes(stream_spec)
args = []
# TODO: group nodes together, e.g. `-i somefile -r somerate`.
sorted_nodes, outgoing_edge_maps = topo_sort(nodes)
input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)]
output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)]
global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)]
filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)]
stream_name_map = {(node, None): str(i) for i, node in enumerate(input_nodes)}
filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map)
args += reduce(operator.add, [_get_input_args(node) for node in input_nodes])
if filter_arg:
args += ['-filter_complex', filter_arg]
args += reduce(operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes])
args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], [])
if overwrite_output:
args += ['-y']
return args
@output_operator()
def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
"""Build command-line for invoking ffmpeg.
The :meth:`run` function uses this to build the commnad line
arguments and should work in most cases, but calling this function
directly is useful for debugging or if you need to invoke ffmpeg
manually for whatever reason.
This is the same as calling :meth:`get_args` except that it also
includes the ``ffmpeg`` command as the first argument.
"""
if isinstance(cmd, basestring):
cmd = [cmd]
elif type(cmd) != list:
cmd = list(cmd)
return cmd + get_args(stream_spec, overwrite_output=overwrite_output)
@output_operator()
def run(
stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None,
quiet=False, overwrite_output=False):
"""Ivoke ffmpeg for the supplied node graph.
Args:
capture_stdout: if True, capture stdout (to be used with
``pipe:`` ffmpeg outputs).
capture_stderr: if True, capture stderr.
quiet: shorthand for setting ``capture_stdout`` and ``capture_stderr``.
input: text to be sent to stdin (to be used with ``pipe:``
ffmpeg inputs)
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
``overwrite_output=True``).
Returns: (out, err) tuple containing captured stdout and stderr data.
"""
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if input else None
stdout_stream = subprocess.PIPE if capture_stdout or quiet else None
stderr_stream = subprocess.PIPE if capture_stderr or quiet else None
p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
out, err = p.communicate(input)
retcode = p.poll()
if retcode:
raise Error('ffmpeg', out, err)
print err
return out, err
__all__ = [
'compile',
'Error',
'get_args',
'run',
]

80
pype/vendor/ffmpeg/_utils.py vendored Normal file
View file

@ -0,0 +1,80 @@
from __future__ import unicode_literals
from builtins import str
#from past.builtins import basestring
import hashlib
import sys
if sys.version_info.major == 2:
# noinspection PyUnresolvedReferences,PyShadowingBuiltins
str = str
# `past.builtins.basestring` module can't be imported on Python3 in some environments (Ubuntu).
# This code is copy-pasted from it to avoid crashes.
class BaseBaseString(type):
def __instancecheck__(cls, instance):
return isinstance(instance, (bytes, str))
def __subclasshook__(cls, thing):
# TODO: What should go here?
raise NotImplemented
def with_metaclass(meta, *bases):
class metaclass(meta):
__call__ = type.__call__
__init__ = type.__init__
def __new__(cls, name, this_bases, d):
if this_bases is None:
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
return metaclass('temporary_class', None, {})
if sys.version_info.major >= 3:
class basestring(with_metaclass(BaseBaseString)):
pass
else:
# noinspection PyUnresolvedReferences,PyCompatibility
from builtins import basestring
def _recursive_repr(item):
"""Hack around python `repr` to deterministically represent dictionaries.
This is able to represent more things than json.dumps, since it does not require things to be JSON serializable
(e.g. datetimes).
"""
if isinstance(item, basestring):
result = str(item)
elif isinstance(item, list):
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
elif isinstance(item, dict):
kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)]
result = '{' + ', '.join(kv_pairs) + '}'
else:
result = repr(item)
return result
def get_hash(item):
repr_ = _recursive_repr(item).encode('utf-8')
return hashlib.md5(repr_).hexdigest()
def get_hash_int(item):
return int(get_hash(item), base=16)
def escape_chars(text, chars):
"""Helper function to escape uncomfortable characters."""
text = str(text)
chars = list(set(chars))
if '\\' in chars:
chars.remove('\\')
chars.insert(0, '\\')
for ch in chars:
text = text.replace(ch, '\\' + ch)
return text

98
pype/vendor/ffmpeg/_view.py vendored Normal file
View file

@ -0,0 +1,98 @@
from __future__ import unicode_literals
from builtins import str
from .dag import get_outgoing_edges
from ._run import topo_sort
import tempfile
from ffmpeg.nodes import (
FilterNode,
get_stream_spec_nodes,
InputNode,
OutputNode,
stream_operator,
)
_RIGHT_ARROW = '\u2192'
def _get_node_color(node):
if isinstance(node, InputNode):
color = '#99cc00'
elif isinstance(node, OutputNode):
color = '#99ccff'
elif isinstance(node, FilterNode):
color = '#ffcc00'
else:
color = None
return color
@stream_operator()
def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
try:
import graphviz
except ImportError:
raise ImportError('failed to import graphviz; please make sure graphviz is installed (e.g. `pip install '
'graphviz`)')
show_labels = kwargs.pop('show_labels', True)
if pipe and filename is not None:
raise ValueError('Can\'t specify both `filename` and `pipe`')
elif not pipe and filename is None:
filename = tempfile.mktemp()
nodes = get_stream_spec_nodes(stream_spec)
sorted_nodes, outgoing_edge_maps = topo_sort(nodes)
graph = graphviz.Digraph(format='png')
graph.attr(rankdir='LR')
if len(list(kwargs.keys())) != 0:
raise ValueError('Invalid kwargs key(s): {}'.format(', '.join(list(kwargs.keys()))))
for node in sorted_nodes:
color = _get_node_color(node)
if detail:
lines = [node.short_repr]
lines += ['{!r}'.format(arg) for arg in node.args]
lines += ['{}={!r}'.format(key, node.kwargs[key]) for key in sorted(node.kwargs)]
node_text = '\n'.join(lines)
else:
node_text = node.short_repr
graph.node(str(hash(node)), node_text, shape='box', style='filled', fillcolor=color)
outgoing_edge_map = outgoing_edge_maps.get(node, {})
for edge in get_outgoing_edges(node, outgoing_edge_map):
kwargs = {}
up_label = edge.upstream_label
down_label = edge.downstream_label
up_selector = edge.upstream_selector
if show_labels and (up_label is not None or down_label is not None or up_selector is not None):
if up_label is None:
up_label = ''
if up_selector is not None:
up_label += ":" + up_selector
if down_label is None:
down_label = ''
if up_label != '' and down_label != '':
middle = ' {} '.format(_RIGHT_ARROW)
else:
middle = ''
kwargs['label'] = '{} {} {}'.format(up_label, middle, down_label)
upstream_node_id = str(hash(edge.upstream_node))
downstream_node_id = str(hash(edge.downstream_node))
graph.edge(upstream_node_id, downstream_node_id, **kwargs)
if pipe:
return graph.pipe()
else:
graph.view(filename, cleanup=True)
return stream_spec
__all__ = [
'view',
]

182
pype/vendor/ffmpeg/dag.py vendored Normal file
View file

@ -0,0 +1,182 @@
from __future__ import unicode_literals
from ._utils import get_hash, get_hash_int
from builtins import object
from collections import namedtuple
class DagNode(object):
"""Node in a directed-acyclic graph (DAG).
Edges:
DagNodes are connected by edges. An edge connects two nodes with a label for each side:
- ``upstream_node``: upstream/parent node
- ``upstream_label``: label on the outgoing side of the upstream node
- ``downstream_node``: downstream/child node
- ``downstream_label``: label on the incoming side of the downstream node
For example, DagNode A may be connected to DagNode B with an edge labelled "foo" on A's side, and "bar" on B's
side:
_____ _____
| | | |
| A >[foo]---[bar]> B |
|_____| |_____|
Edge labels may be integers or strings, and nodes cannot have more than one incoming edge with the same label.
DagNodes may have any number of incoming edges and any number of outgoing edges. DagNodes keep track only of
their incoming edges, but the entire graph structure can be inferred by looking at the furthest downstream
nodes and working backwards.
Hashing:
DagNodes must be hashable, and two nodes are considered to be equivalent if they have the same hash value.
Nodes are immutable, and the hash should remain constant as a result. If a node with new contents is required,
create a new node and throw the old one away.
String representation:
In order for graph visualization tools to show useful information, nodes must be representable as strings. The
``repr`` operator should provide a more or less "full" representation of the node, and the ``short_repr``
property should be a shortened, concise representation.
Again, because nodes are immutable, the string representations should remain constant.
"""
def __hash__(self):
"""Return an integer hash of the node."""
raise NotImplementedError()
def __eq__(self, other):
"""Compare two nodes; implementations should return True if (and only if) hashes match."""
raise NotImplementedError()
def __repr__(self, other):
"""Return a full string representation of the node."""
raise NotImplementedError()
@property
def short_repr(self):
"""Return a partial/concise representation of the node."""
raise NotImplementedError()
@property
def incoming_edge_map(self):
"""Provides information about all incoming edges that connect to this node.
The edge map is a dictionary that maps an ``incoming_label`` to ``(outgoing_node, outgoing_label)``. Note that
implicity, ``incoming_node`` is ``self``. See "Edges" section above.
"""
raise NotImplementedError()
DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label', 'upstream_selector'])
def get_incoming_edges(downstream_node, incoming_edge_map):
edges = []
for downstream_label, upstream_info in list(incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label, upstream_selector)]
return edges
def get_outgoing_edges(upstream_node, outgoing_edge_map):
edges = []
for upstream_label, downstream_infos in list(outgoing_edge_map.items()):
for downstream_info in downstream_infos:
downstream_node, downstream_label, downstream_selector = downstream_info
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label, downstream_selector)]
return edges
class KwargReprNode(DagNode):
"""A DagNode that can be represented as a set of args+kwargs.
"""
@property
def __upstream_hashes(self):
hashes = []
for downstream_label, upstream_info in list(self.incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label, upstream_selector]]
return hashes
@property
def __inner_hash(self):
props = {'args': self.args, 'kwargs': self.kwargs}
return get_hash(props)
def __get_hash(self):
hashes = self.__upstream_hashes + [self.__inner_hash]
return get_hash_int(hashes)
def __init__(self, incoming_edge_map, name, args, kwargs):
self.__incoming_edge_map = incoming_edge_map
self.name = name
self.args = args
self.kwargs = kwargs
self.__hash = self.__get_hash()
def __hash__(self):
return self.__hash
def __eq__(self, other):
return hash(self) == hash(other)
@property
def short_hash(self):
return '{:x}'.format(abs(hash(self)))[:12]
def long_repr(self, include_hash=True):
formatted_props = ['{!r}'.format(arg) for arg in self.args]
formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)]
out = '{}({})'.format(self.name, ', '.join(formatted_props))
if include_hash:
out += ' <{}>'.format(self.short_hash)
return out
def __repr__(self):
return self.long_repr()
@property
def incoming_edges(self):
return get_incoming_edges(self, self.incoming_edge_map)
@property
def incoming_edge_map(self):
return self.__incoming_edge_map
@property
def short_repr(self):
return self.name
def topo_sort(downstream_nodes):
marked_nodes = []
sorted_nodes = []
outgoing_edge_maps = {}
def visit(upstream_node, upstream_label, downstream_node, downstream_label, downstream_selector=None):
if upstream_node in marked_nodes:
raise RuntimeError('Graph is not a DAG')
if downstream_node is not None:
outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {})
outgoing_edge_infos = outgoing_edge_map.get(upstream_label, [])
outgoing_edge_infos += [(downstream_node, downstream_label, downstream_selector)]
outgoing_edge_map[upstream_label] = outgoing_edge_infos
outgoing_edge_maps[upstream_node] = outgoing_edge_map
if upstream_node not in sorted_nodes:
marked_nodes.append(upstream_node)
for edge in upstream_node.incoming_edges:
visit(edge.upstream_node, edge.upstream_label, edge.downstream_node, edge.downstream_label, edge.upstream_selector)
marked_nodes.remove(upstream_node)
sorted_nodes.append(upstream_node)
unmarked_nodes = [(node, None) for node in downstream_nodes]
while unmarked_nodes:
upstream_node, upstream_label = unmarked_nodes.pop()
visit(upstream_node, upstream_label, None, None)
return sorted_nodes, outgoing_edge_maps

289
pype/vendor/ffmpeg/nodes.py vendored Normal file
View file

@ -0,0 +1,289 @@
from __future__ import unicode_literals
# #from past.builtins import basestring
# import basestring
from .dag import KwargReprNode
from ._utils import escape_chars, get_hash_int
from builtins import object, basestring
import os
def _is_of_types(obj, types):
valid = False
for stream_type in types:
if isinstance(obj, stream_type):
valid = True
break
return valid
def _get_types_str(types):
return ', '.join(['{}.{}'.format(x.__module__, x.__name__) for x in types])
class Stream(object):
"""Represents the outgoing edge of an upstream node; may be used to create more downstream nodes."""
def __init__(self, upstream_node, upstream_label, node_types, upstream_selector=None):
if not _is_of_types(upstream_node, node_types):
raise TypeError('Expected upstream node to be of one of the following type(s): {}; got {}'.format(
_get_types_str(node_types), type(upstream_node)))
self.node = upstream_node
self.label = upstream_label
self.selector = upstream_selector
def __hash__(self):
return get_hash_int([hash(self.node), hash(self.label)])
def __eq__(self, other):
return hash(self) == hash(other)
def __repr__(self):
node_repr = self.node.long_repr(include_hash=False)
selector = ''
if self.selector:
selector = ':{}'.format(self.selector)
out = '{}[{!r}{}] <{}>'.format(node_repr, self.label, selector, self.node.short_hash)
return out
def __getitem__(self, index):
"""
Select a component (audio, video) of the stream.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input[:'a'].filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input[:'v'].hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
if self.selector is not None:
raise ValueError('Stream already has a selector: {}'.format(self))
elif not isinstance(index, basestring):
raise TypeError("Expected string index (e.g. 'a'); got {!r}".format(index))
return self.node.stream(label=self.label, selector=index)
def get_stream_map(stream_spec):
if stream_spec is None:
stream_map = {}
elif isinstance(stream_spec, Stream):
stream_map = {None: stream_spec}
elif isinstance(stream_spec, (list, tuple)):
stream_map = dict(enumerate(stream_spec))
elif isinstance(stream_spec, dict):
stream_map = stream_spec
return stream_map
def get_stream_map_nodes(stream_map):
nodes = []
for stream in list(stream_map.values()):
if not isinstance(stream, Stream):
raise TypeError('Expected Stream; got {}'.format(type(stream)))
nodes.append(stream.node)
return nodes
def get_stream_spec_nodes(stream_spec):
stream_map = get_stream_map(stream_spec)
return get_stream_map_nodes(stream_map)
class Node(KwargReprNode):
"""Node base"""
@classmethod
def __check_input_len(cls, stream_map, min_inputs, max_inputs):
if min_inputs is not None and len(stream_map) < min_inputs:
raise ValueError('Expected at least {} input stream(s); got {}'.format(min_inputs, len(stream_map)))
elif max_inputs is not None and len(stream_map) > max_inputs:
raise ValueError('Expected at most {} input stream(s); got {}'.format(max_inputs, len(stream_map)))
@classmethod
def __check_input_types(cls, stream_map, incoming_stream_types):
for stream in list(stream_map.values()):
if not _is_of_types(stream, incoming_stream_types):
raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}'
.format(_get_types_str(incoming_stream_types), type(stream)))
@classmethod
def __get_incoming_edge_map(cls, stream_map):
incoming_edge_map = {}
for downstream_label, upstream in list(stream_map.items()):
incoming_edge_map[downstream_label] = (upstream.node, upstream.label, upstream.selector)
return incoming_edge_map
def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs,
max_inputs, args=[], kwargs={}):
stream_map = get_stream_map(stream_spec)
self.__check_input_len(stream_map, min_inputs, max_inputs)
self.__check_input_types(stream_map, incoming_stream_types)
incoming_edge_map = self.__get_incoming_edge_map(stream_map)
super(Node, self).__init__(incoming_edge_map, name, args, kwargs)
self.__outgoing_stream_type = outgoing_stream_type
self.__incoming_stream_types = incoming_stream_types
def stream(self, label=None, selector=None):
"""Create an outgoing stream originating from this node.
More nodes may be attached onto the outgoing stream.
"""
return self.__outgoing_stream_type(self, label, upstream_selector=selector)
def __getitem__(self, item):
"""Create an outgoing stream originating from this node; syntactic sugar for ``self.stream(label)``.
It can also be used to apply a selector: e.g. ``node[0:'a']`` returns a stream with label 0 and
selector ``'a'``, which is the same as ``node.stream(label=0, selector='a')``.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input[:'a'].filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input[:'v'].hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
if isinstance(item, slice):
return self.stream(label=item.start, selector=item.stop)
else:
return self.stream(label=item)
class FilterableStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(FilterableStream, self).__init__(upstream_node, upstream_label, {InputNode, FilterNode},
upstream_selector)
# noinspection PyMethodOverriding
class InputNode(Node):
"""InputNode type"""
def __init__(self, name, args=[], kwargs={}):
super(InputNode, self).__init__(
stream_spec=None,
name=name,
incoming_stream_types={},
outgoing_stream_type=FilterableStream,
min_inputs=0,
max_inputs=0,
args=args,
kwargs=kwargs
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
# noinspection PyMethodOverriding
class FilterNode(Node):
def __init__(self, stream_spec, name, max_inputs=1, args=[], kwargs={}):
super(FilterNode, self).__init__(
stream_spec=stream_spec,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=FilterableStream,
min_inputs=1,
max_inputs=max_inputs,
args=args,
kwargs=kwargs
)
"""FilterNode"""
def _get_filter(self, outgoing_edges):
args = self.args
kwargs = self.kwargs
if self.name in ('split', 'asplit'):
args = [len(outgoing_edges)]
out_args = [escape_chars(x, '\\\'=:') for x in args]
out_kwargs = {}
for k, v in list(kwargs.items()):
k = escape_chars(k, '\\\'=:')
v = escape_chars(v, '\\\'=:')
out_kwargs[k] = v
arg_params = [escape_chars(v, '\\\'=:') for v in out_args]
kwarg_params = ['{}={}'.format(k, out_kwargs[k]) for k in sorted(out_kwargs)]
params = arg_params + kwarg_params
params_text = escape_chars(self.name, '\\\'=:')
if params:
params_text += '={}'.format(':'.join(params))
return escape_chars(params_text, '\\\'[],;')
# noinspection PyMethodOverriding
class OutputNode(Node):
def __init__(self, stream, name, args=[], kwargs={}):
super(OutputNode, self).__init__(
stream_spec=stream,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None,
args=args,
kwargs=kwargs
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
class OutputStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode, MergeOutputsNode},
upstream_selector=upstream_selector)
# noinspection PyMethodOverriding
class MergeOutputsNode(Node):
def __init__(self, streams, name):
super(MergeOutputsNode, self).__init__(
stream_spec=streams,
name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None
)
# noinspection PyMethodOverriding
class GlobalNode(Node):
def __init__(self, stream, name, args=[], kwargs={}):
super(GlobalNode, self).__init__(
stream_spec=stream,
name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=1,
args=args,
kwargs=kwargs
)
def stream_operator(stream_classes={Stream}, name=None):
def decorator(func):
func_name = name or func.__name__
[setattr(stream_class, func_name, func) for stream_class in stream_classes]
return func
return decorator
def filter_operator(name=None):
return stream_operator(stream_classes={FilterableStream}, name=name)
def output_operator(name=None):
return stream_operator(stream_classes={OutputStream}, name=name)

0
pype/vendor/ffmpeg/tests/__init__.py vendored Normal file
View file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

629
pype/vendor/ffmpeg/tests/test_ffmpeg.py vendored Normal file
View file

@ -0,0 +1,629 @@
from __future__ import unicode_literals
from builtins import str
from builtins import bytes
from builtins import range
import ffmpeg
import os
import pytest
import random
import re
import subprocess
TEST_DIR = os.path.dirname(__file__)
SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data')
TEST_INPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'in1.mp4')
TEST_OVERLAY_FILE = os.path.join(SAMPLE_DATA_DIR, 'overlay.png')
TEST_OUTPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'out1.mp4')
TEST_OUTPUT_FILE2 = os.path.join(SAMPLE_DATA_DIR, 'out2.mp4')
BOGUS_INPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'bogus')
subprocess.check_call(['ffmpeg', '-version'])
def test_escape_chars():
assert ffmpeg._utils.escape_chars('a:b', ':') == 'a\:b'
assert ffmpeg._utils.escape_chars('a\\:b', ':\\') == 'a\\\\\\:b'
assert ffmpeg._utils.escape_chars('a:b,c[d]e%{}f\'g\'h\\i', '\\\':,[]%') == 'a\\:b\\,c\\[d\\]e\\%{}f\\\'g\\\'h\\\\i'
assert ffmpeg._utils.escape_chars(123, ':\\') == '123'
def test_fluent_equality():
base1 = ffmpeg.input('dummy1.mp4')
base2 = ffmpeg.input('dummy1.mp4')
base3 = ffmpeg.input('dummy2.mp4')
t1 = base1.trim(start_frame=10, end_frame=20)
t2 = base1.trim(start_frame=10, end_frame=20)
t3 = base1.trim(start_frame=10, end_frame=30)
t4 = base2.trim(start_frame=10, end_frame=20)
t5 = base3.trim(start_frame=10, end_frame=20)
assert t1 == t2
assert t1 != t3
assert t1 == t4
assert t1 != t5
def test_fluent_concat():
base = ffmpeg.input('dummy.mp4')
trimmed1 = base.trim(start_frame=10, end_frame=20)
trimmed2 = base.trim(start_frame=30, end_frame=40)
trimmed3 = base.trim(start_frame=50, end_frame=60)
concat1 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat2 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat3 = ffmpeg.concat(trimmed1, trimmed3, trimmed2)
assert concat1 == concat2
assert concat1 != concat3
def test_fluent_output():
(ffmpeg
.input('dummy.mp4')
.trim(start_frame=10, end_frame=20)
.output('dummy2.mp4')
)
def test_fluent_complex_filter():
in_file = ffmpeg.input('dummy.mp4')
return (ffmpeg
.concat(
in_file.trim(start_frame=10, end_frame=20),
in_file.trim(start_frame=30, end_frame=40),
in_file.trim(start_frame=50, end_frame=60)
)
.output('dummy2.mp4')
)
def test_node_repr():
in_file = ffmpeg.input('dummy.mp4')
trim1 = ffmpeg.trim(in_file, start_frame=10, end_frame=20)
trim2 = ffmpeg.trim(in_file, start_frame=30, end_frame=40)
trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60)
concatted = ffmpeg.concat(trim1, trim2, trim3)
output = ffmpeg.output(concatted, 'dummy2.mp4')
assert repr(in_file.node) == 'input(filename={!r}) <{}>'.format('dummy.mp4', in_file.node.short_hash)
assert repr(trim1.node) == 'trim(end_frame=20, start_frame=10) <{}>'.format(trim1.node.short_hash)
assert repr(trim2.node) == 'trim(end_frame=40, start_frame=30) <{}>'.format(trim2.node.short_hash)
assert repr(trim3.node) == 'trim(end_frame=60, start_frame=50) <{}>'.format(trim3.node.short_hash)
assert repr(concatted.node) == 'concat(n=3) <{}>'.format(concatted.node.short_hash)
assert repr(output.node) == 'output(filename={!r}) <{}>'.format('dummy2.mp4', output.node.short_hash)
def test_stream_repr():
in_file = ffmpeg.input('dummy.mp4')
assert repr(in_file) == 'input(filename={!r})[None] <{}>'.format('dummy.mp4', in_file.node.short_hash)
split0 = in_file.filter_multi_output('split')[0]
assert repr(split0) == 'split()[0] <{}>'.format(split0.node.short_hash)
dummy_out = in_file.filter_multi_output('dummy')['out']
assert repr(dummy_out) == 'dummy()[{!r}] <{}>'.format(dummy_out.label, dummy_out.node.short_hash)
def test__get_args__simple():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4')
assert out_file.get_args() == ['-i', 'dummy.mp4', 'dummy2.mp4']
def test_global_args():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4').global_args('-progress', 'someurl')
assert out_file.get_args() == ['-i', 'dummy.mp4', 'dummy2.mp4', '-progress', 'someurl']
def _get_simple_example():
return ffmpeg.input(TEST_INPUT_FILE1).output(TEST_OUTPUT_FILE1)
def _get_complex_filter_example():
split = (ffmpeg
.input(TEST_INPUT_FILE1)
.vflip()
.split()
)
split0 = split[0]
split1 = split[1]
overlay_file = ffmpeg.input(TEST_OVERLAY_FILE)
overlay_file = ffmpeg.crop(overlay_file, 10, 10, 158, 112)
return (ffmpeg
.concat(
split0.trim(start_frame=10, end_frame=20),
split1.trim(start_frame=30, end_frame=40),
)
.overlay(overlay_file.hflip())
.drawbox(50, 50, 120, 120, color='red', thickness=5)
.output(TEST_OUTPUT_FILE1)
.overwrite_output()
)
def test__get_args__complex_filter():
out = _get_complex_filter_example()
args = ffmpeg.get_args(out)
assert args == ['-i', TEST_INPUT_FILE1,
'-i', TEST_OVERLAY_FILE,
'-filter_complex',
'[0]vflip[s0];' \
'[s0]split=2[s1][s2];' \
'[s1]trim=end_frame=20:start_frame=10[s3];' \
'[s2]trim=end_frame=40:start_frame=30[s4];' \
'[s3][s4]concat=n=2[s5];' \
'[1]crop=158:112:10:10[s6];' \
'[s6]hflip[s7];' \
'[s5][s7]overlay=eof_action=repeat[s8];' \
'[s8]drawbox=50:50:120:120:red:t=5[s9]',
'-map', '[s9]', TEST_OUTPUT_FILE1,
'-y'
]
def test_combined_output():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
i2 = ffmpeg.input(TEST_OVERLAY_FILE)
out = ffmpeg.output(i1, i2, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i', TEST_INPUT_FILE1,
'-i', TEST_OVERLAY_FILE,
'-map', '0',
'-map', '1',
TEST_OUTPUT_FILE1
]
def test_filter_with_selector():
i = ffmpeg.input(TEST_INPUT_FILE1)
v1 = i['v'].hflip()
a1 = i['a'].filter('aecho', 0.8, 0.9, 1000, 0.3)
out = ffmpeg.output(a1, v1, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i', TEST_INPUT_FILE1,
'-filter_complex',
'[0:a]aecho=0.8:0.9:1000:0.3[s0];' \
'[0:v]hflip[s1]',
'-map', '[s0]', '-map', '[s1]',
TEST_OUTPUT_FILE1
]
def test_get_item_with_bad_selectors():
input = ffmpeg.input(TEST_INPUT_FILE1)
with pytest.raises(ValueError) as excinfo:
input['a']['a']
assert str(excinfo.value).startswith('Stream already has a selector:')
with pytest.raises(TypeError) as excinfo:
input[:'a']
assert str(excinfo.value).startswith("Expected string index (e.g. 'a')")
with pytest.raises(TypeError) as excinfo:
input[5]
assert str(excinfo.value).startswith("Expected string index (e.g. 'a')")
def _get_complex_filter_asplit_example():
split = (ffmpeg
.input(TEST_INPUT_FILE1)
.vflip()
.asplit()
)
split0 = split[0]
split1 = split[1]
return (ffmpeg
.concat(
split0.filter('atrim', start=10, end=20),
split1.filter('atrim', start=30, end=40),
)
.output(TEST_OUTPUT_FILE1)
.overwrite_output()
)
def test_filter_concat__video_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = (
ffmpeg
.concat(in1, in2)
.output('out.mp4')
.get_args()
)
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[0][1]concat=n=2[s0]',
'-map',
'[s0]',
'out.mp4',
]
def test_filter_concat__audio_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = (
ffmpeg
.concat(in1, in2, v=0, a=1)
.output('out.mp4')
.get_args()
)
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[0][1]concat=a=1:n=2:v=0[s0]',
'-map',
'[s0]',
'out.mp4'
]
def test_filter_concat__audio_video():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
joined = ffmpeg.concat(in1['v'], in1['a'], in2.hflip(), in2['a'], v=1, a=1).node
args = (
ffmpeg
.output(joined[0], joined[1], 'out.mp4')
.get_args()
)
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[1]hflip[s0];[0:v][0:a][s0][1:a]concat=a=1:n=2:v=1[s1][s2]',
'-map',
'[s1]',
'-map',
'[s2]',
'out.mp4',
]
def test_filter_concat__wrong_stream_count():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
with pytest.raises(ValueError) as excinfo:
ffmpeg.concat(in1['v'], in1['a'], in2.hflip(), v=1, a=1).node
assert str(excinfo.value) == \
'Expected concat input streams to have length multiple of 2 (v=1, a=1); got 3'
def test_filter_asplit():
out = _get_complex_filter_asplit_example()
args = out.get_args()
assert args == [
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
'[0]vflip[s0];[s0]asplit=2[s1][s2];[s1]atrim=end=20:start=10[s3];[s2]atrim=end=40:start=30[s4];[s3]'
'[s4]concat=n=2[s5]',
'-map',
'[s5]',
TEST_OUTPUT_FILE1,
'-y'
]
def test__output__bitrate():
args = (
ffmpeg
.input('in')
.output('out', video_bitrate=1000, audio_bitrate=200)
.get_args()
)
assert args == ['-i', 'in', '-b:v', '1000', '-b:a', '200', 'out']
@pytest.mark.parametrize('video_size', [(320, 240), '320x240'])
def test__output__video_size(video_size):
args = (
ffmpeg
.input('in')
.output('out', video_size=video_size)
.get_args()
)
assert args == ['-i', 'in', '-video_size', '320x240', 'out']
def test_filter_normal_arg_escape():
"""Test string escaping of normal filter args (e.g. ``font`` param of ``drawtext`` filter)."""
def _get_drawtext_font_repr(font):
"""Build a command-line arg using drawtext ``font`` param and extract the ``-filter_complex`` arg."""
args = (ffmpeg
.input('in')
.drawtext('test', font='a{}b'.format(font))
.output('out')
.get_args()
)
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[s0\]', args[3], re.MULTILINE)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1)
expected_backslash_counts = {
'x': 0,
'\'': 3,
'\\': 3,
'%': 0,
':': 2,
',': 1,
'[': 1,
']': 1,
'=': 2,
'\n': 0,
}
for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_font_repr(ch)
assert expected == actual
def test_filter_text_arg_str_escape():
"""Test string escaping of normal filter args (e.g. ``text`` param of ``drawtext`` filter)."""
def _get_drawtext_text_repr(text):
"""Build a command-line arg using drawtext ``text`` param and extract the ``-filter_complex`` arg."""
args = (ffmpeg
.input('in')
.drawtext('a{}b'.format(text))
.output('out')
.get_args()
)
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=text=a((.|\n)*)b\[s0\]', args[3], re.MULTILINE)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1)
expected_backslash_counts = {
'x': 0,
'\'': 7,
'\\': 7,
'%': 4,
':': 2,
',': 1,
'[': 1,
']': 1,
'=': 2,
'\n': 0,
}
for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_text_repr(ch)
assert expected == actual
#def test_version():
# subprocess.check_call(['ffmpeg', '-version'])
def test__compile():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4')
assert out_file.compile() == ['ffmpeg', '-i', 'dummy.mp4', 'dummy2.mp4']
assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4']
def test__run():
stream = _get_complex_filter_example()
out, err = ffmpeg.run(stream)
assert out is None
assert err is None
@pytest.mark.parametrize('capture_stdout', [True, False])
@pytest.mark.parametrize('capture_stderr', [True, False])
def test__run__capture_out(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['echo', 'test'])
stream = _get_simple_example()
out, err = ffmpeg.run(stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr)
if capture_stdout:
assert out == 'test\n'.encode()
else:
assert out is None
if capture_stderr:
assert err == ''.encode()
else:
assert err is None
def test__run__input_output(mocker):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['cat'])
stream = _get_simple_example()
out, err = ffmpeg.run(stream, input='test'.encode(), capture_stdout=True)
assert out == 'test'.encode()
assert err is None
@pytest.mark.parametrize('capture_stdout', [True, False])
@pytest.mark.parametrize('capture_stderr', [True, False])
def test__run__error(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['ffmpeg'])
stream = _get_complex_filter_example()
with pytest.raises(ffmpeg.Error) as excinfo:
out, err = ffmpeg.run(stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr)
assert str(excinfo.value) == 'ffmpeg error (see stderr output for detail)'
out = excinfo.value.stdout
err = excinfo.value.stderr
if capture_stdout:
assert out == ''.encode()
else:
assert out is None
if capture_stderr:
assert err.decode().startswith('ffmpeg version')
else:
assert err is None
def test__run__multi_output():
in_ = ffmpeg.input(TEST_INPUT_FILE1)
out1 = in_.output(TEST_OUTPUT_FILE1)
out2 = in_.output(TEST_OUTPUT_FILE2)
ffmpeg.run([out1, out2], overwrite_output=True)
def test__run__dummy_cmd():
stream = _get_complex_filter_example()
ffmpeg.run(stream, cmd='true')
def test__run__dummy_cmd_list():
stream = _get_complex_filter_example()
ffmpeg.run(stream, cmd=['true', 'ignored'])
def test__filter__custom():
stream = ffmpeg.input('dummy.mp4')
stream = ffmpeg.filter(stream, 'custom_filter', 'a', 'b', kwarg1='c')
stream = ffmpeg.output(stream, 'dummy2.mp4')
assert stream.get_args() == [
'-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[s0]',
'dummy2.mp4'
]
def test__filter__custom_fluent():
stream = (ffmpeg
.input('dummy.mp4')
.filter('custom_filter', 'a', 'b', kwarg1='c')
.output('dummy2.mp4')
)
assert stream.get_args() == [
'-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[s0]',
'dummy2.mp4'
]
def test__merge_outputs():
in_ = ffmpeg.input('in.mp4')
out1 = in_.output('out1.mp4')
out2 = in_.output('out2.mp4')
assert ffmpeg.merge_outputs(out1, out2).get_args() == [
'-i', 'in.mp4', 'out1.mp4', 'out2.mp4'
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in.mp4', 'out2.mp4', 'out1.mp4'
]
def test__input__start_time():
assert ffmpeg.input('in', ss=10.5).output('out').get_args() == ['-ss', '10.5', '-i', 'in', 'out']
assert ffmpeg.input('in', ss=0.0).output('out').get_args() == ['-ss', '0.0', '-i', 'in', 'out']
def test_multi_passthrough():
out1 = ffmpeg.input('in1.mp4').output('out1.mp4')
out2 = ffmpeg.input('in2.mp4').output('out2.mp4')
out = ffmpeg.merge_outputs(out1, out2)
assert ffmpeg.get_args(out) == [
'-i', 'in1.mp4',
'-i', 'in2.mp4',
'out1.mp4',
'-map', '1',
'out2.mp4'
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in2.mp4',
'-i', 'in1.mp4',
'out2.mp4',
'-map', '1',
'out1.mp4'
]
def test_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = (
ffmpeg
.output(i1['1'], i1['2'], TEST_OUTPUT_FILE1)
.get_args()
)
assert args == [
'-i', TEST_INPUT_FILE1,
'-map', '0:1',
'-map', '0:2',
TEST_OUTPUT_FILE1,
]
def test_mixed_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = (
ffmpeg
.output(i1['1'].hflip(), i1['2'], TEST_OUTPUT_FILE1)
.get_args()
)
assert args == [
'-i', TEST_INPUT_FILE1,
'-filter_complex',
'[0:1]hflip[s0]',
'-map', '[s0]',
'-map', '0:2',
TEST_OUTPUT_FILE1,
]
def test_pipe():
width = 32
height = 32
frame_size = width * height * 3 # 3 bytes for rgb24
frame_count = 10
start_frame = 2
out = (ffmpeg
.input('pipe:0', format='rawvideo', pixel_format='rgb24', video_size=(width, height), framerate=10)
.trim(start_frame=start_frame)
.output('pipe:1', format='rawvideo')
)
args = out.get_args()
assert args == [
'-f', 'rawvideo',
'-video_size', '{}x{}'.format(width, height),
'-framerate', '10',
'-pixel_format', 'rgb24',
'-i', 'pipe:0',
'-filter_complex',
'[0]trim=start_frame=2[s0]',
'-map', '[s0]',
'-f', 'rawvideo',
'pipe:1'
]
cmd = ['ffmpeg'] + args
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
in_data = bytes(bytearray([random.randint(0,255) for _ in range(frame_size * frame_count)]))
p.stdin.write(in_data) # note: this could block, in which case need to use threads
p.stdin.close()
out_data = p.stdout.read()
assert len(out_data) == frame_size * (frame_count - start_frame)
assert out_data == in_data[start_frame*frame_size:]
def test__probe():
data = ffmpeg.probe(TEST_INPUT_FILE1)
assert set(data.keys()) == {'format', 'streams'}
assert data['format']['duration'] == '7.036000'
def test__probe__exception():
with pytest.raises(ffmpeg.Error) as excinfo:
ffmpeg.probe(BOGUS_INPUT_FILE)
assert str(excinfo.value) == 'ffprobe error (see stderr output for detail)'
assert 'No such file or directory'.encode() in excinfo.value.stderr