submit to deadline functionality

This commit is contained in:
Ondřej Samohel 2020-11-06 16:19:29 +01:00
parent 25b87bca12
commit 739baf70f5
No known key found for this signature in database
GPG key ID: 02376E18990A97C6

View file

@ -1,9 +1,14 @@
# -*- coding: utf-8 -*-
"""Abstract class for submitting jobs to Deadline."""
"""Abstract package for submitting jobs to Deadline.
It provides Deadline JobInfo data class.
"""
import os
from abc import ABCMeta, abstractmethod
import platform
import getpass
from collections import OrderedDict
import six
import attr
@ -171,13 +176,13 @@ class DeadlineJobInfo:
"""Return all environment key values formatted for Deadline.
Returns:
list of tuples: as `[('EnvironmentKeyValue0', 'key=value')]`
dict: as `{'EnvironmentKeyValue0', 'key=value'}`
"""
out = []
index = 0
for v in self._environmentKeyValue:
out.append(("EnvironmentKeyValue{}".format(index), v))
out["EnvironmentKeyValue{}".format(index)] = v
index += 1
return out
@ -199,13 +204,13 @@ class DeadlineJobInfo:
"""Return all ExtraInfo values formatted for Deadline.
Returns:
list of tuples: as `[('ExtraInfo0', 'value')]`
dict: as `{'ExtraInfo0': 'value'}`
"""
out = []
index = 0
for v in self._extraInfos:
out.append(("ExtraInfo{}".format(index), v))
out["ExtraInfo{}".format(index)] = v
index += 1
return out
@ -218,13 +223,13 @@ class DeadlineJobInfo:
"""Return all ExtraInfoKeyValue values formatted for Deadline.
Returns:
list of tuples: as `[('ExtraInfoKeyValue0', 'key=value')]`
dict: as {'ExtraInfoKeyValue0': 'key=value'}`
"""
out = []
index = 0
for v in self._extraInfoKeyValues:
out.append(("ExtraInfoKeyValue{}".format(index), v))
out["ExtraInfoKeyValue{}".format(index)] = v
index += 1
return out
@ -242,13 +247,13 @@ class DeadlineJobInfo:
"""Return all TaskExtraInfoName values formatted for Deadline.
Returns:
list of tuples: as `[('TaskExtraInfoName0', 'value')]`
dict: as `{'TaskExtraInfoName0': 'value'}`
"""
out = []
index = 0
for v in self._taskExtraInfos:
out.append(("TaskExtraInfoName{}".format(index), v))
out["TaskExtraInfoName{}".format(index)] = v
index += 1
return out
@ -267,13 +272,13 @@ class DeadlineJobInfo:
"""Return all OutputFilename values formatted for Deadline.
Returns:
list of tuples: as `[('OutputFilename0', 'filename')]`
dict: as `{'OutputFilename0': 'filename'}`
"""
out = []
index = 0
for v in self._outputFilename:
out.append(("OutputFilename{}".format(index), v))
out["OutputFilename{}".format(index)] = v
index += 1
return out
@ -286,13 +291,13 @@ class DeadlineJobInfo:
"""Return all OutputFilename#Tile values formatted for Deadline.
Returns:
list of tuples: as `[('OutputFilename#Tile', 'tile')]`
dict: as `{'OutputFilenme#Tile': 'tile'}`
"""
out = []
index = 0
for v in self._outputFilenameTile:
out.append(("OutputFilename{}Tile".format(index), v))
out["OutputFilename{}Tile".format(index)] = v
index += 1
return out
@ -305,13 +310,13 @@ class DeadlineJobInfo:
"""Return all OutputDirectory values formatted for Deadline.
Returns:
list of tuples: as `[('OutputDirectory0', 'dir')]`
dict: as `{'OutputDirectory0': 'dir'}`
"""
out = []
index = 0
for v in self._outputDirectory:
out.append(("OutputDirectory{}".format(index), v))
out["OutputDirectory{}".format(index)] = v
index += 1
return out
@ -333,21 +338,234 @@ class DeadlineJobInfo:
MaintenanceJobStartFrame = attr.ib(default=None) # Default: 0
MaintenanceJobEndFrame = attr.ib(default=None) # Default: 0
def render(self):
"""Return all data serialized as dictionary.
@attr.s
class DeadlinePluginInfo:
SceneFile = attr.ib()
Returns:
OrderedDict: all serialized data.
"""
def no_privates(a, _):
return not a.name.startswith("_")
serialized = attr.asdict(
self, dict_factory=OrderedDict, filter=no_privates)
serialized.update(self.EnvironmentKeyValue)
serialized.update(self.ExtraInfo)
serialized.update(self.ExtraInfoKeyValue)
serialized.update(self.TaskExtraInfoName)
serialized.update(self.OutputFilename)
serialized.update(self.OutputFilenameTile)
serialized.update(self.OutputDirectory)
return serialized
@six.add_metaclass(ABCMeta)
class AbstractSubmitDeadline(pyblish.api.InstancePlugin):
"""Class abstracting access to Deadline."""
label = "Submit to Deadline"
order = pyblish.api.IntegratorOrder + 0.1
use_published = True
asset_dependencies = False
def process(self, instance):
"""Plugin entry point."""
self._instance = instance
context = instance.context
self._deadline_url = os.environ.get(
"DEADLINE_REST_URL", "http://localhost:8082")
assert self._deadline_url, "Requires DEADLINE_REST_URL"
file_path = None
if self.use_published:
file_path = self.from_published_scene()
# fallback if nothing was set
if not file_path:
self.log.warning("Falling back to workfile")
file_path = context.data["currentFile"]
self.scene_path = file_path
self.log.info("Using {} for render/export.".format(file_path))
self.job_info = self.get_job_info()
self.plugin_info = self.get_plugin_info()
self.aux_files = self.get_aux_files()
def process_submission(self):
"""Process data for submission.
This takes Deadline JobInfo, PluginInfo, AuxFile, creates payload
from them and submit it do Deadline.
Returns:
str: Deadline job ID
"""
payload = self.assemble_payload()
return self.submit(payload)
@abstractmethod
def get_job_info(self):
"""Return filled Deadline JobInfo.
This is host/plugin specific implementation of how to fill data in.
See:
:class:`DeadlineJobInfo`
Returns:
dict: Filled Deadline JobInfo.
"""
pass
@abstractmethod
def get_plugin_info(self):
"""Return filled Deadline PluginInfo.
This is host/plugin specific implementation of how to fill data in.
See:
:class:`DeadlineJobInfo`
Returns:
dict: Filled Deadline JobInfo.
"""
pass
def get_aux_files(self):
"""Return list of auxiliary files for Deadline job.
If needed this should be overriden, otherwise return empty list as
that field even empty must be present on Deadline submission.
Returns:
list: List of files.
"""
return []
def from_published_scene(self, replace_in_path=True):
"""Switch work scene for published scene.
If rendering/exporting from published scenes is enabled, this will
replace paths from working scene to published scene.
Args:
replace_in_path (bool): if True, it will try to find
old scene name in path of expected files and replace it
with name of published scene.
Returns:
str: Published scene path.
Note:
Published scene path is actually determined from project Anatomy
as at the time this plugin is running scene can still no be
published.
"""
anatomy = self._instance.context.data['anatomy']
for i in self._instance.context:
if "workfile" in i.data["families"]:
# test if there is instance of workfile waiting
# to be published.
assert i.data["publish"] is True, (
"Workfile (scene) must be published along")
# determine published path from Anatomy.
template_data = i.data.get("anatomyData")
rep = i.data.get("representations")[0].get("name")
template_data["representation"] = rep
template_data["ext"] = rep
template_data["comment"] = None
anatomy_filled = anatomy.format(template_data)
template_filled = anatomy_filled["publish"]["path"]
filepath = os.path.normpath(template_filled)
self.log.info("Using published scene for render {}".format(
filepath))
if not os.path.exists(filepath):
self.log.error("published scene does not exist!")
raise
if not replace_in_path:
return filepath
# now we need to switch scene in expected files
# because <scene> token will now point to published
# scene file and that might differ from current one
new_scene = os.path.splitext(
os.path.basename(filepath))[0]
orig_scene = os.path.splitext(
os.path.basename(
self._instance.context.data["currentFile"]))[0]
exp = self._instance.data.get("expectedFiles")
if isinstance(exp[0], dict):
# we have aovs and we need to iterate over them
new_exp = {}
for aov, files in exp[0].items():
replaced_files = []
for f in files:
replaced_files.append(
f.replace(orig_scene, new_scene)
)
new_exp[aov] = replaced_files
self._instance.data["expectedFiles"] = [new_exp]
else:
new_exp = []
for f in exp:
new_exp.append(
f.replace(orig_scene, new_scene)
)
self._instance.data["expectedFiles"] = [new_exp]
self.log.info("Scene name was switched {} -> {}".format(
orig_scene, new_scene
))
return filepath
def assemble_payload(
self, job_info=None, plugin_info=None, aux_files=None):
"""Assemble payload data from its various parts.
Args:
job_info (dict): Deadline JobInfo. You can use
:class:`DeadlineJobInfo` for it.
plugin_info (dict): Deadline PluginInfo. Plugin specific options.
aux_files (list, optional): List of auxiliary file to submit with
the job.
Returns:
dict: Deadline Payload.
"""
return {
"JobInfo": job_info or self.job_info,
"PluginInfo": plugin_info or self.plugin_info,
"AuxFiles": aux_files or self.aux_files
}
def submit(self, payload):
"""Submit payload to Deadline API end-point.
This takes payload in the form of JSON file and POST it to
Deadline jobs end-point.
Args:
payload (str): string encoded json with job payload.
Returns:
str: resulting Deadline job id.
Throws:
RuntimeError: if submission fails.
"""
url = "{}/api/jobs".format(self._deadline_url)
response = self._requests_post(url, json=payload)
if not response.ok:
@ -357,8 +575,8 @@ class AbstractSubmitDeadline(pyblish.api.InstancePlugin):
self.log.debug(payload)
raise RuntimeError(response.text)
dependency = response.json()
return dependency["_id"]
result = response.json()
return result["_id"]
def _requests_post(self, *args, **kwargs):
"""Wrap request post method.