Deadline Pype 3 - adds GlobalJobPreLoad.py as previous approach persisted first picked env vars on job indefinitely

This commit is contained in:
Petr Kalis 2021-03-02 17:16:42 +01:00
parent d08947269f
commit 8d2c87592f
3 changed files with 112 additions and 184 deletions

View file

@ -6,6 +6,7 @@ import json
from pathlib import Path
from pype.lib import PypeLogger
from pype.api import get_app_environments_for_context
class PypeCommands:
@ -63,6 +64,14 @@ class PypeCommands:
import pyblish.api
import pyblish.util
env = get_app_environments_for_context(
os.environ["AVALON_PROJECT"],
os.environ["AVALON_ASSET"],
os.environ["AVALON_TASK"],
os.environ["AVALON_APP_NAME"]
)
os.environ.update(env)
log = Logger.get_logger()
install()

View file

@ -35,8 +35,6 @@ class PypeEventListener(Deadline.Events.DeadlineEventListener):
Only directory path is needed.
"""
ALREADY_INJECTED = False
def __init__(self):
self.OnJobSubmittedCallback += self.OnJobSubmitted
self.OnJobStartedCallback += self.OnJobStarted
@ -66,8 +64,6 @@ class PypeEventListener(Deadline.Events.DeadlineEventListener):
self.OnThermalShutdownCallback += self.OnThermalShutdown
self.OnMachineRestartCallback += self.OnMachineRestart
self.ALREADY_INJECTED = False
def Cleanup(self):
del self.OnJobSubmittedCallback
del self.OnJobStartedCallback
@ -97,128 +93,32 @@ class PypeEventListener(Deadline.Events.DeadlineEventListener):
del self.OnThermalShutdownCallback
del self.OnMachineRestartCallback
def inject_pype_environment(self, job, additonalData=None):
if self.ALREADY_INJECTED:
self.LogInfo("Environment injected previously")
return
pype_render_job = job.GetJobEnvironmentKeyValue('PYPE_RENDER_JOB') \
or '0'
pype_publish_job = job.GetJobEnvironmentKeyValue('PYPE_PUBLISH_JOB') \
or '0'
if pype_publish_job == '1' and pype_render_job == '1':
raise RuntimeError("Misconfiguration. Job couldn't be both " +
"render and publish.")
if pype_publish_job == '1':
self.LogInfo("Publish job, skipping inject.")
return
elif pype_render_job == '0':
# not pype triggered job
return
# adding python search paths
paths = self.GetConfigEntryWithDefault("PythonSearchPaths", "").strip()
paths = paths.split(";")
for path in paths:
self.LogInfo("Extending sys.path with: " + str(path))
sys.path.append(path)
self.LogInfo("inject_pype_environment start")
try:
pype_app = self.get_pype_executable_path()
# tempfile.TemporaryFile cannot be used because of locking
export_url = os.path.join(tempfile.gettempdir(),
time.strftime('%Y%m%d%H%M%S'),
'env.json') # add HHMMSS + delete later
self.LogInfo("export_url {}".format(export_url))
args = [
pype_app,
'extractenvironments',
export_url
]
add_args = {}
add_args['project'] = \
job.GetJobEnvironmentKeyValue('AVALON_PROJECT')
add_args['asset'] = job.GetJobEnvironmentKeyValue('AVALON_ASSET')
add_args['task'] = job.GetJobEnvironmentKeyValue('AVALON_TASK')
add_args['app'] = job.GetJobEnvironmentKeyValue('AVALON_APP_NAME')
if all(add_args.values()):
for key, value in add_args.items():
args.append("--{}".format(key))
args.append(value)
else:
msg = "Required env vars: AVALON_PROJECT, AVALON_ASSET, " + \
"AVALON_TASK, AVALON_APP_NAME"
raise RuntimeError(msg)
self.LogInfo("args::{}".format(args))
exit_code = subprocess.call(args, shell=True)
if exit_code != 0:
raise RuntimeError("Publishing failed, check worker's log")
with open(export_url) as fp:
contents = json.load(fp)
self.LogInfo("contents::{}".format(contents))
for key, value in contents.items():
job.SetJobEnvironmentKeyValue(key, value)
Deadline.Scripting.RepositoryUtils.SaveJob(job) # IMPORTANT
self.ALREADY_INJECTED = True
os.remove(export_url)
self.LogInfo("inject_pype_environment end")
except Exception:
import traceback
self.LogInfo(traceback.format_exc())
self.LogInfo("inject_pype_environment failed")
Deadline.Scripting.RepositoryUtils.FailJob(job)
raise
def get_pype_executable_path(self):
def set_pype_executable_path(self, job):
"""
Returns calculated path based on settings and platform
Sets configurable PypeExecutable value to job extra infos.
Uses 'pype_console' executable
GlobalJobPreLoad takes this value, pulls env vars for each task
from specific worker itself. GlobalJobPreLoad is not easily
configured, so we are configuring Event itself.
"""
pype_command = "pype_console"
if platform.system().lower() == "linux":
pype_command = "pype_console.sh"
if platform.system().lower() == "windows":
pype_command = "pype_console.exe"
pype_execs = self.GetConfigEntryWithDefault("PypeExecutable", "")
job.SetJobExtraInfoKeyValue("pype_executables", pype_execs)
pype_root = self.GetConfigEntryWithDefault("PypeExecutable", "")
pype_app = os.path.join(pype_root.strip(), pype_command)
if not os.path.exists(pype_app):
raise RuntimeError("App '{}' doesn't exist. " +
"Fix it in Tools > Configure Events > " +
"pype".format(pype_app))
return pype_app
Deadline.Scripting.RepositoryUtils.SaveJob(job)
def updateFtrackStatus(self, job, statusName, createIfMissing=False):
"""Updates version status on ftrack"""
pass
def OnJobSubmitted(self, job):
self.LogInfo("OnJobSubmitted LOGGING")
# self.LogInfo("OnJobSubmitted LOGGING")
# for 1st time submit
self.inject_pype_environment(job)
self.set_pype_executable_path(job)
self.updateFtrackStatus(job, "Render Queued")
def OnJobStarted(self, job):
self.LogInfo("OnJobStarted")
# inject_pype_environment shouldnt be here, too late already
# self.LogInfo("OnJobStarted")
self.set_pype_executable_path(job)
self.updateFtrackStatus(job, "Rendering")
def OnJobFinished(self, job):
@ -226,22 +126,24 @@ class PypeEventListener(Deadline.Events.DeadlineEventListener):
self.updateFtrackStatus(job, "Artist Review")
def OnJobRequeued(self, job):
self.LogInfo("OnJobRequeued LOGGING")
self.inject_pype_environment(job)
# self.LogInfo("OnJobRequeued LOGGING")
self.set_pype_executable_path(job)
def OnJobFailed(self, job):
pass
def OnJobSuspended(self, job):
self.LogInfo("OnJobSuspended LOGGING")
# self.LogInfo("OnJobSuspended LOGGING")
self.updateFtrackStatus(job, "Render Queued")
def OnJobResumed(self, job):
self.LogInfo("OnJobResumed LOGGING")
# self.LogInfo("OnJobResumed LOGGING")
self.set_pype_executable_path(job)
self.updateFtrackStatus(job, "Rendering")
def OnJobPended(self, job):
self.LogInfo("OnJobPended LOGGING")
# self.LogInfo("OnJobPended LOGGING")
pass
def OnJobReleased(self, job):
pass
@ -277,9 +179,8 @@ class PypeEventListener(Deadline.Events.DeadlineEventListener):
pass
def OnSlaveStartingJob(self, host_name, job):
self.LogInfo("OnSlaveStartingJob LOGGING")
# inject params must be here for Resubmits
self.inject_pype_environment(job)
# self.LogInfo("OnSlaveStartingJob LOGGING")
self.set_pype_executable_path(job)
def OnSlaveStalled(self, job):
pass

View file

@ -1,76 +1,94 @@
# -*- coding: utf-8 -*-
"""Remap pype path and PYPE_METADATA_PATH."""
import platform
import os
import tempfile
import time
import subprocess
import json
from Deadline.Scripting import *
from Deadline.Scripting import RepositoryUtils
def pype_command_line(executable, arguments, workingDirectory):
"""Remap paths in comand line argument string.
Using Deadline rempper it will remap all path found in command-line.
Args:
executable (str): path to executable
arguments (str): arguments passed to executable
workingDirectory (str): working directory path
Returns:
Tuple(executable, arguments, workingDirectory)
"""
print("-" * 40)
print("executable: {}".format(executable))
print("arguments: {}".format(arguments))
print("workingDirectory: {}".format(workingDirectory))
print("-" * 40)
print("Remapping arguments ...")
arguments = RepositoryUtils.CheckPathMapping(arguments)
print("* {}".format(arguments))
print("-" * 40)
return executable, arguments, workingDirectory
def pype(deadlinePlugin):
"""Remaps `PYPE_METADATA_FILE` and `PYPE_PYTHON_EXE` environment vars.
`PYPE_METADATA_FILE` is used on farm to point to rendered data. This path
originates on platform from which this job was published. To be able to
publish on different platform, this path needs to be remapped.
`PYPE_PYTHON_EXE` can be used to specify custom location of python
interpreter to use for Pype. This is remappeda also if present even
though it probably doesn't make much sense.
Arguments:
deadlinePlugin: Deadline job plugin passed by Deadline
"""
def inject_pype_environment(deadlinePlugin):
job = deadlinePlugin.GetJob()
pype_metadata = job.GetJobEnvironmentKeyValue("PYPE_METADATA_FILE")
pype_python = job.GetJobEnvironmentKeyValue("PYPE_PYTHON_EXE")
# test if it is pype publish job.
if pype_metadata:
pype_metadata = RepositoryUtils.CheckPathMapping(pype_metadata)
if platform.system().lower() == "linux":
pype_metadata = pype_metadata.replace("\\", "/")
job = RepositoryUtils.GetJob(job.JobId, True) # invalidates cache
print("- remapping PYPE_METADATA_FILE: {}".format(pype_metadata))
job.SetJobEnvironmentKeyValue("PYPE_METADATA_FILE", pype_metadata)
deadlinePlugin.SetProcessEnvironmentVariable(
"PYPE_METADATA_FILE", pype_metadata)
pype_render_job = job.GetJobEnvironmentKeyValue('PYPE_RENDER_JOB') \
or '0'
pype_publish_job = job.GetJobEnvironmentKeyValue('PYPE_PUBLISH_JOB') \
or '0'
if pype_python:
pype_python = RepositoryUtils.CheckPathMapping(pype_python)
if platform.system().lower() == "linux":
pype_python = pype_python.replace("\\", "/")
if pype_publish_job == '1' and pype_render_job == '1':
raise RuntimeError("Misconfiguration. Job couldn't be both " +
"render and publish.")
print("- remapping PYPE_PYTHON_EXE: {}".format(pype_python))
job.SetJobEnvironmentKeyValue("PYPE_PYTHON_EXE", pype_python)
deadlinePlugin.SetProcessEnvironmentVariable(
"PYPE_PYTHON_EXE", pype_python)
if pype_publish_job == '1':
print("Publish job, skipping inject.")
return
elif pype_render_job == '0':
# not pype triggered job
return
deadlinePlugin.ModifyCommandLineCallback += pype_command_line
print("inject_pype_environment start")
try:
exe_list = job.GetJobExtraInfoKeyValue("pype_executables")
pype_app = FileUtils.SearchFileList(exe_list)
if pype_app == "":
raise RuntimeError(
"Pype executable was not found " +
"in the semicolon separated list \"" + exe_list + "\". " +
"The path to the render executable can be configured " +
"from the Plugin Configuration in the Deadline Monitor.")
# tempfile.TemporaryFile cannot be used because of locking
export_url = os.path.join(tempfile.gettempdir(),
time.strftime('%Y%m%d%H%M%S'),
'env.json') # add HHMMSS + delete later
print("export_url {}".format(export_url))
args = [
pype_app,
'extractenvironments',
export_url
]
add_args = {}
add_args['project'] = \
job.GetJobEnvironmentKeyValue('AVALON_PROJECT')
add_args['asset'] = job.GetJobEnvironmentKeyValue('AVALON_ASSET')
add_args['task'] = job.GetJobEnvironmentKeyValue('AVALON_TASK')
add_args['app'] = job.GetJobEnvironmentKeyValue('AVALON_APP_NAME')
if all(add_args.values()):
for key, value in add_args.items():
args.append("--{}".format(key))
args.append(value)
else:
msg = "Required env vars: AVALON_PROJECT, AVALON_ASSET, " + \
"AVALON_TASK, AVALON_APP_NAME"
raise RuntimeError(msg)
print("args::{}".format(args))
exit_code = subprocess.call(args, shell=True)
if exit_code != 0:
raise RuntimeError("Publishing failed, check worker's log")
with open(export_url) as fp:
contents = json.load(fp)
print("contents::{}".format(contents))
for key, value in contents.items():
deadlinePlugin.SetEnvironmentVariable(key, value)
os.remove(export_url)
print("inject_pype_environment end")
except Exception:
import traceback
print(traceback.format_exc())
print("inject_pype_environment failed")
RepositoryUtils.FailJob(job)
raise
def __main__(deadlinePlugin):
pype(deadlinePlugin)
inject_pype_environment(deadlinePlugin)