added few docstrings

This commit is contained in:
iLLiCiTiT 2021-11-11 16:08:05 +01:00
parent bedb58daa7
commit 83ba1115a4
2 changed files with 153 additions and 2 deletions

View file

@ -33,6 +33,17 @@ class JobFailed(Exception):
@six.add_metaclass(ABCMeta)
class BaseCommand:
"""Abstract TVPaint command which can be executed through worker.
Each command must have unique name and implemented 'execute' and
'from_existing' methods.
Command also have id which is created on command creation.
The idea is that command is just a data container on sender side send
througth server to a worker where is replicated one by one, executed and
result sent back to sender through server.
"""
@abstractproperty
def name(self):
"""Command name (must be unique)."""
@ -44,6 +55,7 @@ class BaseCommand:
else:
data = copy.deepcopy(data)
# Use 'id' from data when replicating on process side
command_id = data.get("id")
if command_id is None:
command_id = str(uuid4())
@ -56,6 +68,11 @@ class BaseCommand:
self._done = False
def job_queue_root(self):
"""Access to job queue root.
Job queue root is shared access point to files shared across senders
and workers.
"""
if self._parent is None:
return None
return self._parent.job_queue_root()
@ -65,30 +82,41 @@ class BaseCommand:
@property
def id(self):
"""Command id."""
return self._command_data["id"]
@property
def parent(self):
"""Parent of command expected type of 'TVPaintCommands'."""
return self._parent
@property
def communicator(self):
"""TVPaint communicator.
Available only on worker side.
"""
return self._parent.communicator
@property
def done(self):
"""Is command done."""
return self._done
def set_done(self):
"""Change state of done."""
self._done = True
def set_result(self, result):
"""Set result of executed command."""
self._result = result
def result(self):
"""Result of command."""
return copy.deepcopy(self._result)
def response_data(self):
"""Data send as response to sender."""
return {
"id": self.id,
"result": self._result,
@ -96,25 +124,35 @@ class BaseCommand:
}
def command_data(self):
"""Raw command data."""
return copy.deepcopy(self._command_data)
@abstractmethod
def execute(self):
"""Execute command on worker side."""
pass
@classmethod
@abstractmethod
def from_existing(cls, data):
"""Recreate object based on passed data."""
pass
def execute_george(self, george_script):
"""Execute george script in TVPaint."""
return self.parent.execute_george(george_script)
def execute_george_through_file(self, george_script):
"""Execute george script through temp file in TVPaint."""
return self.parent.execute_george_through_file(george_script)
class ExecuteSimpleGeorgeScript(BaseCommand):
"""Execute simple george script in TVPaint.
Args:
script(str): Script that will be executed.
"""
name = "execute_george_simple"
def __init__(self, script, data=None):
@ -133,6 +171,18 @@ class ExecuteSimpleGeorgeScript(BaseCommand):
class ExecuteGeorgeScript(BaseCommand):
"""Execute multiline george script in TVPaint.
Args:
script_lines(list): Lines that will be executed in george script
through temp george file.
tmp_file_keys(list): List of formatting keys in george script that
require replacement with path to a temp file where result will be
stored. The content of file is stored to result by the key.
root_dir_key(str): Formatting key that will be replaced in george
script with job queue root which can be different on worker side.
data(dict): Raw data about command.
"""
name = "execute_george_through_file"
def __init__(
@ -156,6 +206,7 @@ class ExecuteGeorgeScript(BaseCommand):
if isinstance(script, list):
script = "\n".join(script)
# Replace temporary files in george script
for key in self._tmp_file_keys:
output_file = tempfile.NamedTemporaryFile(
mode="w", prefix=TMP_FILE_PREFIX, suffix=".txt", delete=False
@ -166,6 +217,7 @@ class ExecuteGeorgeScript(BaseCommand):
script = script.replace(format_key, output_path)
filepath_by_key[key] = output_path
# Replace job queue root in script
if self._root_dir_key:
job_queue_root = self.job_queue_root()
format_key = "{" + self._root_dir_key + "}"
@ -173,8 +225,10 @@ class ExecuteGeorgeScript(BaseCommand):
format_key, job_queue_root.replace("\\", "/")
)
# Execute the script
self.execute_george_through_file(script)
# Store result of temporary files
result = {}
for key, filepath in filepath_by_key.items():
with open(filepath, "r") as stream:
@ -186,6 +240,7 @@ class ExecuteGeorgeScript(BaseCommand):
@classmethod
def from_existing(cls, data):
"""Recreate the object from data."""
script_lines = data.pop("script_lines")
tmp_file_keys = data.pop("tmp_file_keys", None)
root_dir_key = data.pop("root_dir_key", None)
@ -193,6 +248,11 @@ class ExecuteGeorgeScript(BaseCommand):
class CollectSceneData(BaseCommand):
"""Helper command which will collect all usefull info about workfile.
Result is dictionary with all layers data, exposure frames by layer ids
pre/post behavior of layers by their ids, group information and scene data.
"""
name = "collect_scene_data"
def execute(self):
@ -230,10 +290,22 @@ class CollectSceneData(BaseCommand):
return cls(data)
@six.add_metaclass(ABCMeta)
class TVPaintCommands:
"""Wrapper around TVPaint commands to be able send multiple commands.
Commands may send one or multiple commands at once. Also gives api access
for commands info.
Base for sender and receiver which are extending the logic for their
purposes. One of differences is preparation of workfile path.
Args:
workfile(str): Path to workfile.
job_queue_module(JobQueueModule): Object of OpenPype module JobQueue.
"""
def __init__(self, workfile, job_queue_module=None):
self._log = None
self._workfile = workfile
self._commands = []
self._command_classes_by_name = None
if job_queue_module is None:
@ -241,17 +313,31 @@ class TVPaintCommands:
job_queue_module = manager.modules_by_name["job_queue"]
self._job_queue_module = job_queue_module
self._workfile = self._prepare_workfile(workfile)
@abstractmethod
def _prepare_workfile(self, workfile):
"""Modification of workfile path on initialization to match platorm."""
pass
def job_queue_root(self):
"""Job queue root for current platform using current settings."""
return self._job_queue_module.get_jobs_root_from_settings()
@property
def log(self):
"""Access to logger object."""
if self._log is None:
self._log = PypeLogger.get_logger(self.__class__.__name__)
return self._log
@property
def classes_by_name(self):
"""Prepare commands classes for validation and recreation of commands.
It is expected that all commands are defined in this python file so
we're looking for all implementation of BaseCommand in globals.
"""
if self._command_classes_by_name is None:
command_classes_by_name = {}
for attr in globals().values():
@ -272,16 +358,19 @@ class TVPaintCommands:
return self._command_classes_by_name
def add_command(self, command):
"""Add command to process."""
command.set_parent(self)
self._commands.append(command)
def result(self):
"""Result of commands in list in which they were processed."""
return [
command.result()
for command in self._commands
]
def response_data(self):
"""Data which should be send from worker."""
return [
command.response_data()
for command in self._commands
@ -289,13 +378,30 @@ class TVPaintCommands:
class SenderTVPaintCommands(TVPaintCommands):
"""Sender implementation of TVPaint Commands."""
def _prepare_workfile(self, workfile):
"""Remove job queue root from workfile path.
It is expected that worker will add it's root before passed workfile.
"""
new_workfile = workfile.replace("\\", "/")
job_queue_root = self.job_queue_root.replace("\\", "/")
if job_queue_root not in new_workfile:
raise ValueError((
"Workfile is not located in JobQueue root."
" Workfile path: \"{}\". JobQueue root: \"{}\""
).format(workfile, job_queue_root))
return new_workfile.replace(job_queue_root, "")
def commands_data(self):
"""Commands data to be able recreate them."""
return [
command.command_data()
for command in self._commands
]
def to_job_data(self):
"""Convert commands to job data before sending to workers server."""
return {
"workfile": self._workfile,
"function": "commands",
@ -314,6 +420,7 @@ class SenderTVPaintCommands(TVPaintCommands):
command.set_done()
def _send_job(self):
"""Send job to a workers server."""
# Send job data to job queue server
job_data = self.to_job_data()
self.log.debug("Sending job to JobQueue server.\n{}".format(
@ -328,6 +435,13 @@ class SenderTVPaintCommands(TVPaintCommands):
return job_id
def send_job_and_wait(self):
"""Send job to workers server and wait for response.
Result of job is stored into the object.
Raises:
JobFailed: When job was finished but not successfully.
"""
job_id = self._send_job()
while True:
job_status = self._job_queue_module.get_job_status(job_id)
@ -345,6 +459,13 @@ class SenderTVPaintCommands(TVPaintCommands):
class ProcessTVPaintCommands(TVPaintCommands):
"""Worker side of TVPaint Commands.
It is expected this object is created only on worker's side from existing
data loaded from job.
Workfile path logic is based on 'SenderTVPaintCommands'.
"""
def __init__(self, workfile, commands, communicator):
super(ProcessTVPaintCommands, self).__init__(workfile)
@ -352,11 +473,22 @@ class ProcessTVPaintCommands(TVPaintCommands):
self.commands_from_data(commands)
def _prepare_workfile(self, workfile):
"""Preprend job queue root before passed workfile."""
workfile = workfile.replace("\\", "/")
job_queue_root = self.job_queue_root.replace("\\", "/")
new_workfile = "/".join([job_queue_root, workfile])
while "//" in new_workfile:
new_workfile = new_workfile.replace("//", "/")
return os.path.normpath(new_workfile)
@property
def communicator(self):
"""Access to TVPaint communicator."""
return self._communicator
def commands_from_data(self, commands_data):
"""Recreate command from passed data."""
for command_data in commands_data:
command_name = command_data["command"]
@ -365,9 +497,11 @@ class ProcessTVPaintCommands(TVPaintCommands):
self.add_command(command)
def execute_george(self, george_script):
"""Helper method to execute george script."""
return self.communicator.execute_george(george_script)
def execute_george_through_file(self, george_script):
"""Helper method to execute george script through temp file."""
temporary_file = tempfile.NamedTemporaryFile(
mode="w", prefix=TMP_FILE_PREFIX, suffix=".grg", delete=False
)
@ -378,19 +512,26 @@ class ProcessTVPaintCommands(TVPaintCommands):
os.remove(temp_file_path)
def _open_workfile(self):
workfile = self._workfile.replace("\\", "/")
"""Open workfile in TVPaint."""
workfile = self._workfile
print("Opening workfile {}".format(workfile))
george_script = "tv_LoadProject '\"'\"{}\"'\"'".format(workfile)
self.execute_george_through_file(george_script)
def _close_workfile(self):
"""Close workfile in TVPaint."""
print("Closing workfile")
self.execute_george_through_file("tv_projectclose")
def execute(self):
"""Execute commands."""
# First open the workfile
self._open_workfile()
# Execute commands one by one
# TODO maybe stop processing when command fails?
print("Commands execution started ({})".format(len(self._commands)))
for command in self._commands:
command.execute()
command.set_done()
# Finally close workfile
self._close_workfile()

View file

@ -44,6 +44,13 @@ class WorkerClient(JsonRpcClient):
class WorkerJobsConnection:
"""WS connection to Job server.
Helper class to create a connection to process jobs from job server.
To be able receive jobs is needed to create a connection and then register
as worker for specific host.
"""
retry_time_seconds = 5
def __init__(self, server_url, host_name, loop=None):
@ -73,6 +80,7 @@ class WorkerJobsConnection:
return None
def finish_job(self, success=True, message=None, data=None):
"""Worker finished job and sets the result which is send to server."""
if self.client is None:
print((
"Couldn't sent job status to server because"
@ -82,6 +90,7 @@ class WorkerJobsConnection:
self.client.finish_job(success, message, data)
async def main_loop(self, register_worker=True):
"""Main loop of connection which keep connection to server alive."""
self._is_running = True
while not self._stopped:
@ -156,6 +165,7 @@ class WorkerJobsConnection:
await self._stop_cleanup()
def register_as_worker(self):
"""Register as worker ready to work on server side."""
asyncio.ensure_future(self._register_as_worker(), loop=self._loop)
async def _register_as_worker(self):