PYPE-1901 - added check that no other batch is currently processed

PS cannot be run twice on a host machine
This commit is contained in:
Petr Kalis 2021-10-19 13:01:28 +02:00
parent b0628953d8
commit 0188febe4f
2 changed files with 81 additions and 21 deletions

View file

@ -100,6 +100,55 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None):
) )
def fail_batch(_id, batches_in_progress, dbcon):
"""Set current batch as failed as there are some stuck batches."""
running_batches = [str(batch["_id"])
for batch in batches_in_progress
if batch["_id"] != _id]
msg = "There are still running batches {}\n". \
format("\n".join(running_batches))
msg += "Ask admin to check them and reprocess current batch"
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"log": msg
}}
)
raise ValueError(msg)
def find_variant_key(application_manager, host):
"""Searches for latest installed variant for 'host'
Args:
application_manager (ApplicationManager)
host (str)
Returns
(string) (optional)
Raises:
(ValueError) if no variant found
"""
app_group = application_manager.app_groups.get(host)
if not app_group or not app_group.enabled:
raise ValueError("No application {} configured".format(host))
found_variant_key = None
# finds most up-to-date variant if any installed
for variant_key, variant in app_group.variants.items():
for executable in variant.executables:
if executable.exists():
found_variant_key = variant_key
if not found_variant_key:
raise ValueError("No executable for {} found".format(host))
return found_variant_key
def _get_close_plugin(close_plugin_name, log): def _get_close_plugin(close_plugin_name, log):
if close_plugin_name: if close_plugin_name:
plugins = pyblish.api.discover() plugins = pyblish.api.discover()

View file

@ -3,7 +3,6 @@
import os import os
import sys import sys
import json import json
from datetime import datetime
import time import time
from openpype.lib import PypeLogger from openpype.lib import PypeLogger
@ -12,7 +11,9 @@ from openpype.lib.plugin_tools import parse_json, get_batch_asset_task_info
from openpype.lib.remote_publish import ( from openpype.lib.remote_publish import (
get_webpublish_conn, get_webpublish_conn,
start_webpublish_log, start_webpublish_log,
publish_and_log publish_and_log,
fail_batch,
find_variant_key
) )
@ -125,10 +126,17 @@ class PypeCommands:
wants to process uploaded .psd file and publish collected layers wants to process uploaded .psd file and publish collected layers
from there. from there.
Checks if no other batches are running (status =='in_progress). If
so, it sleeps for SLEEP (this is separate process),
waits for WAIT_FOR seconds altogether.
Requires installed host application on the machine. Requires installed host application on the machine.
Runs publish process as user would, in automatic fashion. Runs publish process as user would, in automatic fashion.
""" """
SLEEP = 5 # seconds for another loop check for concurrently runs
WAIT_FOR = 300 # seconds to wait for conc. runs
from openpype import install, uninstall from openpype import install, uninstall
from openpype.api import Logger from openpype.api import Logger
@ -141,25 +149,12 @@ class PypeCommands:
from openpype.lib import ApplicationManager from openpype.lib import ApplicationManager
application_manager = ApplicationManager() application_manager = ApplicationManager()
app_group = application_manager.app_groups.get(host) found_variant_key = find_variant_key(application_manager, host)
if not app_group or not app_group.enabled:
raise ValueError("No application {} configured".format(host))
found_variant_key = None
# finds most up-to-date variant if any installed
for variant_key, variant in app_group.variants.items():
for executable in variant.executables:
if executable.exists():
found_variant_key = variant_key
if not found_variant_key:
raise ValueError("No executable for {} found".format(host))
app_name = "{}/{}".format(host, found_variant_key) app_name = "{}/{}".format(host, found_variant_key)
batch_data = None batch_data = None
if batch_dir and os.path.exists(batch_dir): if batch_dir and os.path.exists(batch_dir):
# TODO check if batch manifest is same as tasks manifests
batch_data = parse_json(os.path.join(batch_dir, "manifest.json")) batch_data = parse_json(os.path.join(batch_dir, "manifest.json"))
if not batch_data: if not batch_data:
@ -174,6 +169,27 @@ class PypeCommands:
batch_data["files"][0]) batch_data["files"][0])
print("workfile_path {}".format(workfile_path)) print("workfile_path {}".format(workfile_path))
_, batch_id = os.path.split(batch_dir)
dbcon = get_webpublish_conn()
# safer to start logging here, launch might be broken altogether
_id = start_webpublish_log(dbcon, batch_id, user)
in_progress = True
slept_times = 0
while in_progress:
batches_in_progress = list(dbcon.find({
"status": "in_progress"
}))
if len(batches_in_progress) > 1:
if slept_times * SLEEP >= WAIT_FOR:
fail_batch(_id, batches_in_progress, dbcon)
print("Another batch running, sleeping for a bit")
time.sleep(SLEEP)
slept_times += 1
else:
in_progress = False
# must have for proper launch of app # must have for proper launch of app
env = get_app_environments_for_context( env = get_app_environments_for_context(
project, project,
@ -183,11 +199,6 @@ class PypeCommands:
) )
os.environ.update(env) os.environ.update(env)
_, batch_id = os.path.split(batch_dir)
dbcon = get_webpublish_conn()
# safer to start logging here, launch might be broken altogether
_id = start_webpublish_log(dbcon, batch_id, user)
os.environ["OPENPYPE_PUBLISH_DATA"] = batch_dir os.environ["OPENPYPE_PUBLISH_DATA"] = batch_dir
os.environ["IS_HEADLESS"] = "true" os.environ["IS_HEADLESS"] = "true"
# must pass identifier to update log lines for a batch # must pass identifier to update log lines for a batch