diff --git a/openpype/hosts/tvpaint/worker/worker_job.py b/openpype/hosts/tvpaint/worker/worker_job.py index 1f30777901..308fbf3bd7 100644 --- a/openpype/hosts/tvpaint/worker/worker_job.py +++ b/openpype/hosts/tvpaint/worker/worker_job.py @@ -33,6 +33,17 @@ class JobFailed(Exception): @six.add_metaclass(ABCMeta) class BaseCommand: + """Abstract TVPaint command which can be executed through worker. + + Each command must have unique name and implemented 'execute' and + 'from_existing' methods. + + Command also have id which is created on command creation. + + The idea is that command is just a data container on sender side send + througth server to a worker where is replicated one by one, executed and + result sent back to sender through server. + """ @abstractproperty def name(self): """Command name (must be unique).""" @@ -44,6 +55,7 @@ class BaseCommand: else: data = copy.deepcopy(data) + # Use 'id' from data when replicating on process side command_id = data.get("id") if command_id is None: command_id = str(uuid4()) @@ -56,6 +68,11 @@ class BaseCommand: self._done = False def job_queue_root(self): + """Access to job queue root. + + Job queue root is shared access point to files shared across senders + and workers. + """ if self._parent is None: return None return self._parent.job_queue_root() @@ -65,30 +82,41 @@ class BaseCommand: @property def id(self): + """Command id.""" return self._command_data["id"] @property def parent(self): + """Parent of command expected type of 'TVPaintCommands'.""" return self._parent @property def communicator(self): + """TVPaint communicator. + + Available only on worker side. + """ return self._parent.communicator @property def done(self): + """Is command done.""" return self._done def set_done(self): + """Change state of done.""" self._done = True def set_result(self, result): + """Set result of executed command.""" self._result = result def result(self): + """Result of command.""" return copy.deepcopy(self._result) def response_data(self): + """Data send as response to sender.""" return { "id": self.id, "result": self._result, @@ -96,25 +124,35 @@ class BaseCommand: } def command_data(self): + """Raw command data.""" return copy.deepcopy(self._command_data) @abstractmethod def execute(self): + """Execute command on worker side.""" pass @classmethod @abstractmethod def from_existing(cls, data): + """Recreate object based on passed data.""" pass def execute_george(self, george_script): + """Execute george script in TVPaint.""" return self.parent.execute_george(george_script) def execute_george_through_file(self, george_script): + """Execute george script through temp file in TVPaint.""" return self.parent.execute_george_through_file(george_script) class ExecuteSimpleGeorgeScript(BaseCommand): + """Execute simple george script in TVPaint. + + Args: + script(str): Script that will be executed. + """ name = "execute_george_simple" def __init__(self, script, data=None): @@ -133,6 +171,18 @@ class ExecuteSimpleGeorgeScript(BaseCommand): class ExecuteGeorgeScript(BaseCommand): + """Execute multiline george script in TVPaint. + + Args: + script_lines(list): Lines that will be executed in george script + through temp george file. + tmp_file_keys(list): List of formatting keys in george script that + require replacement with path to a temp file where result will be + stored. The content of file is stored to result by the key. + root_dir_key(str): Formatting key that will be replaced in george + script with job queue root which can be different on worker side. + data(dict): Raw data about command. + """ name = "execute_george_through_file" def __init__( @@ -156,6 +206,7 @@ class ExecuteGeorgeScript(BaseCommand): if isinstance(script, list): script = "\n".join(script) + # Replace temporary files in george script for key in self._tmp_file_keys: output_file = tempfile.NamedTemporaryFile( mode="w", prefix=TMP_FILE_PREFIX, suffix=".txt", delete=False @@ -166,6 +217,7 @@ class ExecuteGeorgeScript(BaseCommand): script = script.replace(format_key, output_path) filepath_by_key[key] = output_path + # Replace job queue root in script if self._root_dir_key: job_queue_root = self.job_queue_root() format_key = "{" + self._root_dir_key + "}" @@ -173,8 +225,10 @@ class ExecuteGeorgeScript(BaseCommand): format_key, job_queue_root.replace("\\", "/") ) + # Execute the script self.execute_george_through_file(script) + # Store result of temporary files result = {} for key, filepath in filepath_by_key.items(): with open(filepath, "r") as stream: @@ -186,6 +240,7 @@ class ExecuteGeorgeScript(BaseCommand): @classmethod def from_existing(cls, data): + """Recreate the object from data.""" script_lines = data.pop("script_lines") tmp_file_keys = data.pop("tmp_file_keys", None) root_dir_key = data.pop("root_dir_key", None) @@ -193,6 +248,11 @@ class ExecuteGeorgeScript(BaseCommand): class CollectSceneData(BaseCommand): + """Helper command which will collect all usefull info about workfile. + + Result is dictionary with all layers data, exposure frames by layer ids + pre/post behavior of layers by their ids, group information and scene data. + """ name = "collect_scene_data" def execute(self): @@ -230,10 +290,22 @@ class CollectSceneData(BaseCommand): return cls(data) +@six.add_metaclass(ABCMeta) class TVPaintCommands: + """Wrapper around TVPaint commands to be able send multiple commands. + + Commands may send one or multiple commands at once. Also gives api access + for commands info. + + Base for sender and receiver which are extending the logic for their + purposes. One of differences is preparation of workfile path. + + Args: + workfile(str): Path to workfile. + job_queue_module(JobQueueModule): Object of OpenPype module JobQueue. + """ def __init__(self, workfile, job_queue_module=None): self._log = None - self._workfile = workfile self._commands = [] self._command_classes_by_name = None if job_queue_module is None: @@ -241,17 +313,31 @@ class TVPaintCommands: job_queue_module = manager.modules_by_name["job_queue"] self._job_queue_module = job_queue_module + self._workfile = self._prepare_workfile(workfile) + + @abstractmethod + def _prepare_workfile(self, workfile): + """Modification of workfile path on initialization to match platorm.""" + pass + def job_queue_root(self): + """Job queue root for current platform using current settings.""" return self._job_queue_module.get_jobs_root_from_settings() @property def log(self): + """Access to logger object.""" if self._log is None: self._log = PypeLogger.get_logger(self.__class__.__name__) return self._log @property def classes_by_name(self): + """Prepare commands classes for validation and recreation of commands. + + It is expected that all commands are defined in this python file so + we're looking for all implementation of BaseCommand in globals. + """ if self._command_classes_by_name is None: command_classes_by_name = {} for attr in globals().values(): @@ -272,16 +358,19 @@ class TVPaintCommands: return self._command_classes_by_name def add_command(self, command): + """Add command to process.""" command.set_parent(self) self._commands.append(command) def result(self): + """Result of commands in list in which they were processed.""" return [ command.result() for command in self._commands ] def response_data(self): + """Data which should be send from worker.""" return [ command.response_data() for command in self._commands @@ -289,13 +378,30 @@ class TVPaintCommands: class SenderTVPaintCommands(TVPaintCommands): + """Sender implementation of TVPaint Commands.""" + def _prepare_workfile(self, workfile): + """Remove job queue root from workfile path. + + It is expected that worker will add it's root before passed workfile. + """ + new_workfile = workfile.replace("\\", "/") + job_queue_root = self.job_queue_root.replace("\\", "/") + if job_queue_root not in new_workfile: + raise ValueError(( + "Workfile is not located in JobQueue root." + " Workfile path: \"{}\". JobQueue root: \"{}\"" + ).format(workfile, job_queue_root)) + return new_workfile.replace(job_queue_root, "") + def commands_data(self): + """Commands data to be able recreate them.""" return [ command.command_data() for command in self._commands ] def to_job_data(self): + """Convert commands to job data before sending to workers server.""" return { "workfile": self._workfile, "function": "commands", @@ -314,6 +420,7 @@ class SenderTVPaintCommands(TVPaintCommands): command.set_done() def _send_job(self): + """Send job to a workers server.""" # Send job data to job queue server job_data = self.to_job_data() self.log.debug("Sending job to JobQueue server.\n{}".format( @@ -328,6 +435,13 @@ class SenderTVPaintCommands(TVPaintCommands): return job_id def send_job_and_wait(self): + """Send job to workers server and wait for response. + + Result of job is stored into the object. + + Raises: + JobFailed: When job was finished but not successfully. + """ job_id = self._send_job() while True: job_status = self._job_queue_module.get_job_status(job_id) @@ -345,6 +459,13 @@ class SenderTVPaintCommands(TVPaintCommands): class ProcessTVPaintCommands(TVPaintCommands): + """Worker side of TVPaint Commands. + + It is expected this object is created only on worker's side from existing + data loaded from job. + + Workfile path logic is based on 'SenderTVPaintCommands'. + """ def __init__(self, workfile, commands, communicator): super(ProcessTVPaintCommands, self).__init__(workfile) @@ -352,11 +473,22 @@ class ProcessTVPaintCommands(TVPaintCommands): self.commands_from_data(commands) + def _prepare_workfile(self, workfile): + """Preprend job queue root before passed workfile.""" + workfile = workfile.replace("\\", "/") + job_queue_root = self.job_queue_root.replace("\\", "/") + new_workfile = "/".join([job_queue_root, workfile]) + while "//" in new_workfile: + new_workfile = new_workfile.replace("//", "/") + return os.path.normpath(new_workfile) + @property def communicator(self): + """Access to TVPaint communicator.""" return self._communicator def commands_from_data(self, commands_data): + """Recreate command from passed data.""" for command_data in commands_data: command_name = command_data["command"] @@ -365,9 +497,11 @@ class ProcessTVPaintCommands(TVPaintCommands): self.add_command(command) def execute_george(self, george_script): + """Helper method to execute george script.""" return self.communicator.execute_george(george_script) def execute_george_through_file(self, george_script): + """Helper method to execute george script through temp file.""" temporary_file = tempfile.NamedTemporaryFile( mode="w", prefix=TMP_FILE_PREFIX, suffix=".grg", delete=False ) @@ -378,19 +512,26 @@ class ProcessTVPaintCommands(TVPaintCommands): os.remove(temp_file_path) def _open_workfile(self): - workfile = self._workfile.replace("\\", "/") + """Open workfile in TVPaint.""" + workfile = self._workfile print("Opening workfile {}".format(workfile)) george_script = "tv_LoadProject '\"'\"{}\"'\"'".format(workfile) self.execute_george_through_file(george_script) def _close_workfile(self): + """Close workfile in TVPaint.""" print("Closing workfile") self.execute_george_through_file("tv_projectclose") def execute(self): + """Execute commands.""" + # First open the workfile self._open_workfile() + # Execute commands one by one + # TODO maybe stop processing when command fails? print("Commands execution started ({})".format(len(self._commands))) for command in self._commands: command.execute() command.set_done() + # Finally close workfile self._close_workfile() diff --git a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py index 2336f91be2..85506565f4 100644 --- a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py +++ b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py @@ -44,6 +44,13 @@ class WorkerClient(JsonRpcClient): class WorkerJobsConnection: + """WS connection to Job server. + + Helper class to create a connection to process jobs from job server. + + To be able receive jobs is needed to create a connection and then register + as worker for specific host. + """ retry_time_seconds = 5 def __init__(self, server_url, host_name, loop=None): @@ -73,6 +80,7 @@ class WorkerJobsConnection: return None def finish_job(self, success=True, message=None, data=None): + """Worker finished job and sets the result which is send to server.""" if self.client is None: print(( "Couldn't sent job status to server because" @@ -82,6 +90,7 @@ class WorkerJobsConnection: self.client.finish_job(success, message, data) async def main_loop(self, register_worker=True): + """Main loop of connection which keep connection to server alive.""" self._is_running = True while not self._stopped: @@ -156,6 +165,7 @@ class WorkerJobsConnection: await self._stop_cleanup() def register_as_worker(self): + """Register as worker ready to work on server side.""" asyncio.ensure_future(self._register_as_worker(), loop=self._loop) async def _register_as_worker(self):