Merge pull request #2555 from pypeclub/feature/OP-2427_Webpublisher-Add-endpoint-to-reprocess-batch-through-UI

Webpublisher: Added endpoint to reprocess batch through UI
This commit is contained in:
Petr Kalis 2022-01-20 16:24:50 +01:00 committed by GitHub
commit a40894fa32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 52 deletions

View file

@ -12,7 +12,7 @@ from openpype.lib.plugin_tools import (
parse_json,
get_batch_asset_task_info
)
from openpype.lib.remote_publish import get_webpublish_conn
from openpype.lib.remote_publish import get_webpublish_conn, IN_PROGRESS_STATUS
class CollectBatchData(pyblish.api.ContextPlugin):
@ -74,7 +74,7 @@ class CollectBatchData(pyblish.api.ContextPlugin):
dbcon.update_one(
{
"batch_id": batch_id,
"status": "in_progress"
"status": IN_PROGRESS_STATUS
},
{
"$set": {

View file

@ -11,10 +11,14 @@ from avalon.api import AvalonMongoDB
from openpype.lib import OpenPypeMongoConnection
from openpype_modules.avalon_apps.rest_api import _RestApiEndpoint
from openpype.lib.remote_publish import get_task_data
from openpype.settings import get_project_settings
from openpype.lib import PypeLogger
from openpype.lib.remote_publish import (
get_task_data,
ERROR_STATUS,
REPROCESS_STATUS
)
log = PypeLogger.get_logger("WebServer")
@ -61,7 +65,7 @@ class OpenPypeRestApiResource(RestApiResource):
self.dbcon = mongo_client[database_name]["webpublishes"]
class WebpublisherProjectsEndpoint(_RestApiEndpoint):
class ProjectsEndpoint(_RestApiEndpoint):
"""Returns list of dict with project info (id, name)."""
async def get(self) -> Response:
output = []
@ -82,7 +86,7 @@ class WebpublisherProjectsEndpoint(_RestApiEndpoint):
)
class WebpublisherHiearchyEndpoint(_RestApiEndpoint):
class HiearchyEndpoint(_RestApiEndpoint):
"""Returns dictionary with context tree from assets."""
async def get(self, project_name) -> Response:
query_projection = {
@ -181,7 +185,7 @@ class TaskNode(Node):
self["attributes"] = {}
class WebpublisherBatchPublishEndpoint(_RestApiEndpoint):
class BatchPublishEndpoint(_RestApiEndpoint):
"""Triggers headless publishing of batch."""
async def post(self, request) -> Response:
# Validate existence of openpype executable
@ -190,7 +194,7 @@ class WebpublisherBatchPublishEndpoint(_RestApiEndpoint):
msg = "Non existent OpenPype executable {}".format(openpype_app)
raise RuntimeError(msg)
log.info("WebpublisherBatchPublishEndpoint called")
log.info("BatchPublishEndpoint called")
content = await request.json()
# Each filter have extensions which are checked on first task item
@ -286,7 +290,7 @@ class WebpublisherBatchPublishEndpoint(_RestApiEndpoint):
)
class WebpublisherTaskPublishEndpoint(_RestApiEndpoint):
class TaskPublishEndpoint(_RestApiEndpoint):
"""Prepared endpoint triggered after each task - for future development."""
async def post(self, request) -> Response:
return Response(
@ -301,21 +305,37 @@ class BatchStatusEndpoint(_RestApiEndpoint):
async def get(self, batch_id) -> Response:
output = self.dbcon.find_one({"batch_id": batch_id})
if output:
status = 200
else:
output = {"msg": "Batch id {} not found".format(batch_id),
"status": "queued",
"progress": 0}
status = 404
body = self.resource.encode(output)
return Response(
status=200,
body=self.resource.encode(output),
status=status,
body=body,
content_type="application/json"
)
class PublishesStatusEndpoint(_RestApiEndpoint):
class UserReportEndpoint(_RestApiEndpoint):
"""Returns list of dict with batch info for user (email address)."""
async def get(self, user) -> Response:
output = list(self.dbcon.find({"user": user}))
output = list(self.dbcon.find({"user": user},
projection={"log": False}))
if output:
status = 200
else:
output = {"msg": "User {} not found".format(user)}
status = 404
body = self.resource.encode(output)
return Response(
status=200,
body=self.resource.encode(output),
status=status,
body=body,
content_type="application/json"
)
@ -351,3 +371,28 @@ class ConfiguredExtensionsEndpoint(_RestApiEndpoint):
body=self.resource.encode(dict(configured)),
content_type="application/json"
)
class BatchReprocessEndpoint(_RestApiEndpoint):
"""Marks latest 'batch_id' for reprocessing, returns 404 if not found."""
async def post(self, batch_id) -> Response:
batches = self.dbcon.find({"batch_id": batch_id,
"status": ERROR_STATUS}).sort("_id", -1)
if batches:
self.dbcon.update_one(
{"_id": batches[0]["_id"]},
{"$set": {"status": REPROCESS_STATUS}}
)
output = [{"msg": "Batch id {} set to reprocess".format(batch_id)}]
status = 200
else:
output = [{"msg": "Batch id {} not found".format(batch_id)}]
status = 404
body = self.resource.encode(output)
return Response(
status=status,
body=body,
content_type="application/json"
)

View file

@ -11,13 +11,19 @@ from openpype.lib import PypeLogger
from .webpublish_routes import (
RestApiResource,
OpenPypeRestApiResource,
WebpublisherBatchPublishEndpoint,
WebpublisherTaskPublishEndpoint,
WebpublisherHiearchyEndpoint,
WebpublisherProjectsEndpoint,
HiearchyEndpoint,
ProjectsEndpoint,
ConfiguredExtensionsEndpoint,
BatchPublishEndpoint,
BatchReprocessEndpoint,
BatchStatusEndpoint,
PublishesStatusEndpoint,
ConfiguredExtensionsEndpoint
TaskPublishEndpoint,
UserReportEndpoint
)
from openpype.lib.remote_publish import (
ERROR_STATUS,
REPROCESS_STATUS,
SENT_REPROCESSING_STATUS
)
@ -41,14 +47,14 @@ def run_webserver(*args, **kwargs):
upload_dir=kwargs["upload_dir"],
executable=kwargs["executable"],
studio_task_queue=studio_task_queue)
projects_endpoint = WebpublisherProjectsEndpoint(resource)
projects_endpoint = ProjectsEndpoint(resource)
server_manager.add_route(
"GET",
"/api/projects",
projects_endpoint.dispatch
)
hiearchy_endpoint = WebpublisherHiearchyEndpoint(resource)
hiearchy_endpoint = HiearchyEndpoint(resource)
server_manager.add_route(
"GET",
"/api/hierarchy/{project_name}",
@ -64,7 +70,7 @@ def run_webserver(*args, **kwargs):
# triggers publish
webpublisher_task_publish_endpoint = \
WebpublisherBatchPublishEndpoint(resource)
BatchPublishEndpoint(resource)
server_manager.add_route(
"POST",
"/api/webpublish/batch",
@ -72,7 +78,7 @@ def run_webserver(*args, **kwargs):
)
webpublisher_batch_publish_endpoint = \
WebpublisherTaskPublishEndpoint(resource)
TaskPublishEndpoint(resource)
server_manager.add_route(
"POST",
"/api/webpublish/task",
@ -88,13 +94,21 @@ def run_webserver(*args, **kwargs):
batch_status_endpoint.dispatch
)
user_status_endpoint = PublishesStatusEndpoint(openpype_resource)
user_status_endpoint = UserReportEndpoint(openpype_resource)
server_manager.add_route(
"GET",
"/api/publishes/{user}",
user_status_endpoint.dispatch
)
webpublisher_batch_reprocess_endpoint = \
BatchReprocessEndpoint(openpype_resource)
server_manager.add_route(
"POST",
"/api/webpublish/reprocess/{batch_id}",
webpublisher_batch_reprocess_endpoint.dispatch
)
server_manager.start_server()
last_reprocessed = time.time()
while True:
@ -116,8 +130,12 @@ def reprocess_failed(upload_dir, webserver_url):
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
dbcon = mongo_client[database_name]["webpublishes"]
results = dbcon.find({"status": "reprocess"})
results = dbcon.find({"status": REPROCESS_STATUS})
reprocessed_batches = set()
for batch in results:
if batch["batch_id"] in reprocessed_batches:
continue
batch_url = os.path.join(upload_dir,
batch["batch_id"],
"manifest.json")
@ -130,7 +148,7 @@ def reprocess_failed(upload_dir, webserver_url):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"progress": 100,
"log": batch.get("log") + msg
}}
@ -141,18 +159,24 @@ def reprocess_failed(upload_dir, webserver_url):
with open(batch_url) as f:
data = json.loads(f.read())
dbcon.update_many(
{
"batch_id": batch["batch_id"],
"status": {"$in": [ERROR_STATUS, REPROCESS_STATUS]}
},
{
"$set": {
"finish_date": datetime.now(),
"status": SENT_REPROCESSING_STATUS,
"progress": 100
}
}
)
try:
r = requests.post(server_url, json=data)
log.info("response{}".format(r))
except Exception:
log.info("exception", exc_info=True)
dbcon.update_one(
{"_id": batch["_id"]},
{"$set":
{
"finish_date": datetime.now(),
"status": "sent_for_reprocessing",
"progress": 100
}}
)
reprocessed_batches.add(batch["batch_id"])

View file

@ -11,6 +11,13 @@ from openpype import uninstall
from openpype.lib.mongo import OpenPypeMongoConnection
from openpype.lib.plugin_tools import parse_json
ERROR_STATUS = "error"
IN_PROGRESS_STATUS = "in_progress"
REPROCESS_STATUS = "reprocess"
SENT_REPROCESSING_STATUS = "sent_for_reprocessing"
FINISHED_REPROCESS_STATUS = "republishing_finished"
FINISHED_OK_STATUS = "finished_ok"
def headless_publish(log, close_plugin_name=None, is_test=False):
"""Runs publish in a opened host with a context and closes Python process.
@ -26,7 +33,7 @@ def headless_publish(log, close_plugin_name=None, is_test=False):
"batch will be unfinished!")
return
publish_and_log(dbcon, _id, log, close_plugin_name)
publish_and_log(dbcon, _id, log, close_plugin_name=close_plugin_name)
else:
publish(log, close_plugin_name)
@ -52,7 +59,7 @@ def start_webpublish_log(dbcon, batch_id, user):
"batch_id": batch_id,
"start_date": datetime.now(),
"user": user,
"status": "in_progress",
"status": IN_PROGRESS_STATUS,
"progress": 0 # integer 0-100, percentage
}).inserted_id
@ -84,13 +91,14 @@ def publish(log, close_plugin_name=None):
sys.exit(1)
def publish_and_log(dbcon, _id, log, close_plugin_name=None):
def publish_and_log(dbcon, _id, log, close_plugin_name=None, batch_id=None):
"""Loops through all plugins, logs ok and fails into OP DB.
Args:
dbcon (OpenPypeMongoConnection)
_id (str)
_id (str) - id of current job in DB
log (OpenPypeLogger)
batch_id (str) - id sent from frontend
close_plugin_name (str): name of plugin with responsibility to
close host app
"""
@ -121,7 +129,7 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"log": os.linesep.join(log_lines)
}}
@ -143,15 +151,29 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None):
)
# final update
if batch_id:
dbcon.update_many(
{"batch_id": batch_id, "status": SENT_REPROCESSING_STATUS},
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_REPROCESS_STATUS,
}
}
)
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "finished_ok",
"progress": 100,
"log": os.linesep.join(log_lines)
}}
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_OK_STATUS,
"progress": 100,
"log": os.linesep.join(log_lines)
}
}
)
@ -168,7 +190,7 @@ def fail_batch(_id, batches_in_progress, dbcon):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"log": msg
}}

View file

@ -14,7 +14,8 @@ from openpype.lib.remote_publish import (
publish_and_log,
fail_batch,
find_variant_key,
get_task_data
get_task_data,
IN_PROGRESS_STATUS
)
@ -209,7 +210,7 @@ class PypeCommands:
# safer to start logging here, launch might be broken altogether
_id = start_webpublish_log(dbcon, batch_id, user_email)
batches_in_progress = list(dbcon.find({"status": "in_progress"}))
batches_in_progress = list(dbcon.find({"status": IN_PROGRESS_STATUS}))
if len(batches_in_progress) > 1:
fail_batch(_id, batches_in_progress, dbcon)
print("Another batch running, probably stuck, ask admin for help")
@ -314,7 +315,7 @@ class PypeCommands:
dbcon = get_webpublish_conn()
_id = start_webpublish_log(dbcon, batch_id, user_email)
publish_and_log(dbcon, _id, log)
publish_and_log(dbcon, _id, log, batch_id=batch_id)
log.info("Publish finished.")