diff --git a/openpype/hosts/webpublisher/plugins/publish/collect_batch_data.py b/openpype/hosts/webpublisher/plugins/publish/collect_batch_data.py index a710fcb3e8..062c5ce0da 100644 --- a/openpype/hosts/webpublisher/plugins/publish/collect_batch_data.py +++ b/openpype/hosts/webpublisher/plugins/publish/collect_batch_data.py @@ -12,7 +12,7 @@ from openpype.lib.plugin_tools import ( parse_json, get_batch_asset_task_info ) -from openpype.lib.remote_publish import get_webpublish_conn +from openpype.lib.remote_publish import get_webpublish_conn, IN_PROGRESS_STATUS class CollectBatchData(pyblish.api.ContextPlugin): @@ -74,7 +74,7 @@ class CollectBatchData(pyblish.api.ContextPlugin): dbcon.update_one( { "batch_id": batch_id, - "status": "in_progress" + "status": IN_PROGRESS_STATUS }, { "$set": { diff --git a/openpype/hosts/webpublisher/webserver_service/webpublish_routes.py b/openpype/hosts/webpublisher/webserver_service/webpublish_routes.py index 30399a6ba7..de09899104 100644 --- a/openpype/hosts/webpublisher/webserver_service/webpublish_routes.py +++ b/openpype/hosts/webpublisher/webserver_service/webpublish_routes.py @@ -11,10 +11,14 @@ from avalon.api import AvalonMongoDB from openpype.lib import OpenPypeMongoConnection from openpype_modules.avalon_apps.rest_api import _RestApiEndpoint -from openpype.lib.remote_publish import get_task_data from openpype.settings import get_project_settings from openpype.lib import PypeLogger +from openpype.lib.remote_publish import ( + get_task_data, + ERROR_STATUS, + REPROCESS_STATUS +) log = PypeLogger.get_logger("WebServer") @@ -61,7 +65,7 @@ class OpenPypeRestApiResource(RestApiResource): self.dbcon = mongo_client[database_name]["webpublishes"] -class WebpublisherProjectsEndpoint(_RestApiEndpoint): +class ProjectsEndpoint(_RestApiEndpoint): """Returns list of dict with project info (id, name).""" async def get(self) -> Response: output = [] @@ -82,7 +86,7 @@ class WebpublisherProjectsEndpoint(_RestApiEndpoint): ) -class WebpublisherHiearchyEndpoint(_RestApiEndpoint): +class HiearchyEndpoint(_RestApiEndpoint): """Returns dictionary with context tree from assets.""" async def get(self, project_name) -> Response: query_projection = { @@ -181,7 +185,7 @@ class TaskNode(Node): self["attributes"] = {} -class WebpublisherBatchPublishEndpoint(_RestApiEndpoint): +class BatchPublishEndpoint(_RestApiEndpoint): """Triggers headless publishing of batch.""" async def post(self, request) -> Response: # Validate existence of openpype executable @@ -190,7 +194,7 @@ class WebpublisherBatchPublishEndpoint(_RestApiEndpoint): msg = "Non existent OpenPype executable {}".format(openpype_app) raise RuntimeError(msg) - log.info("WebpublisherBatchPublishEndpoint called") + log.info("BatchPublishEndpoint called") content = await request.json() # Each filter have extensions which are checked on first task item @@ -286,7 +290,7 @@ class WebpublisherBatchPublishEndpoint(_RestApiEndpoint): ) -class WebpublisherTaskPublishEndpoint(_RestApiEndpoint): +class TaskPublishEndpoint(_RestApiEndpoint): """Prepared endpoint triggered after each task - for future development.""" async def post(self, request) -> Response: return Response( @@ -301,21 +305,37 @@ class BatchStatusEndpoint(_RestApiEndpoint): async def get(self, batch_id) -> Response: output = self.dbcon.find_one({"batch_id": batch_id}) + if output: + status = 200 + else: + output = {"msg": "Batch id {} not found".format(batch_id), + "status": "queued", + "progress": 0} + status = 404 + body = self.resource.encode(output) return Response( - status=200, - body=self.resource.encode(output), + status=status, + body=body, content_type="application/json" ) -class PublishesStatusEndpoint(_RestApiEndpoint): +class UserReportEndpoint(_RestApiEndpoint): """Returns list of dict with batch info for user (email address).""" async def get(self, user) -> Response: - output = list(self.dbcon.find({"user": user})) + output = list(self.dbcon.find({"user": user}, + projection={"log": False})) + + if output: + status = 200 + else: + output = {"msg": "User {} not found".format(user)} + status = 404 + body = self.resource.encode(output) return Response( - status=200, - body=self.resource.encode(output), + status=status, + body=body, content_type="application/json" ) @@ -351,3 +371,28 @@ class ConfiguredExtensionsEndpoint(_RestApiEndpoint): body=self.resource.encode(dict(configured)), content_type="application/json" ) + + +class BatchReprocessEndpoint(_RestApiEndpoint): + """Marks latest 'batch_id' for reprocessing, returns 404 if not found.""" + async def post(self, batch_id) -> Response: + batches = self.dbcon.find({"batch_id": batch_id, + "status": ERROR_STATUS}).sort("_id", -1) + + if batches: + self.dbcon.update_one( + {"_id": batches[0]["_id"]}, + {"$set": {"status": REPROCESS_STATUS}} + ) + output = [{"msg": "Batch id {} set to reprocess".format(batch_id)}] + status = 200 + else: + output = [{"msg": "Batch id {} not found".format(batch_id)}] + status = 404 + body = self.resource.encode(output) + + return Response( + status=status, + body=body, + content_type="application/json" + ) diff --git a/openpype/hosts/webpublisher/webserver_service/webserver_cli.py b/openpype/hosts/webpublisher/webserver_service/webserver_cli.py index a8b1cd11b8..909ea38bc6 100644 --- a/openpype/hosts/webpublisher/webserver_service/webserver_cli.py +++ b/openpype/hosts/webpublisher/webserver_service/webserver_cli.py @@ -11,13 +11,19 @@ from openpype.lib import PypeLogger from .webpublish_routes import ( RestApiResource, OpenPypeRestApiResource, - WebpublisherBatchPublishEndpoint, - WebpublisherTaskPublishEndpoint, - WebpublisherHiearchyEndpoint, - WebpublisherProjectsEndpoint, + HiearchyEndpoint, + ProjectsEndpoint, + ConfiguredExtensionsEndpoint, + BatchPublishEndpoint, + BatchReprocessEndpoint, BatchStatusEndpoint, - PublishesStatusEndpoint, - ConfiguredExtensionsEndpoint + TaskPublishEndpoint, + UserReportEndpoint +) +from openpype.lib.remote_publish import ( + ERROR_STATUS, + REPROCESS_STATUS, + SENT_REPROCESSING_STATUS ) @@ -41,14 +47,14 @@ def run_webserver(*args, **kwargs): upload_dir=kwargs["upload_dir"], executable=kwargs["executable"], studio_task_queue=studio_task_queue) - projects_endpoint = WebpublisherProjectsEndpoint(resource) + projects_endpoint = ProjectsEndpoint(resource) server_manager.add_route( "GET", "/api/projects", projects_endpoint.dispatch ) - hiearchy_endpoint = WebpublisherHiearchyEndpoint(resource) + hiearchy_endpoint = HiearchyEndpoint(resource) server_manager.add_route( "GET", "/api/hierarchy/{project_name}", @@ -64,7 +70,7 @@ def run_webserver(*args, **kwargs): # triggers publish webpublisher_task_publish_endpoint = \ - WebpublisherBatchPublishEndpoint(resource) + BatchPublishEndpoint(resource) server_manager.add_route( "POST", "/api/webpublish/batch", @@ -72,7 +78,7 @@ def run_webserver(*args, **kwargs): ) webpublisher_batch_publish_endpoint = \ - WebpublisherTaskPublishEndpoint(resource) + TaskPublishEndpoint(resource) server_manager.add_route( "POST", "/api/webpublish/task", @@ -88,13 +94,21 @@ def run_webserver(*args, **kwargs): batch_status_endpoint.dispatch ) - user_status_endpoint = PublishesStatusEndpoint(openpype_resource) + user_status_endpoint = UserReportEndpoint(openpype_resource) server_manager.add_route( "GET", "/api/publishes/{user}", user_status_endpoint.dispatch ) + webpublisher_batch_reprocess_endpoint = \ + BatchReprocessEndpoint(openpype_resource) + server_manager.add_route( + "POST", + "/api/webpublish/reprocess/{batch_id}", + webpublisher_batch_reprocess_endpoint.dispatch + ) + server_manager.start_server() last_reprocessed = time.time() while True: @@ -116,8 +130,12 @@ def reprocess_failed(upload_dir, webserver_url): database_name = os.environ["OPENPYPE_DATABASE_NAME"] dbcon = mongo_client[database_name]["webpublishes"] - results = dbcon.find({"status": "reprocess"}) + results = dbcon.find({"status": REPROCESS_STATUS}) + reprocessed_batches = set() for batch in results: + if batch["batch_id"] in reprocessed_batches: + continue + batch_url = os.path.join(upload_dir, batch["batch_id"], "manifest.json") @@ -130,7 +148,7 @@ def reprocess_failed(upload_dir, webserver_url): {"$set": { "finish_date": datetime.now(), - "status": "error", + "status": ERROR_STATUS, "progress": 100, "log": batch.get("log") + msg }} @@ -141,18 +159,24 @@ def reprocess_failed(upload_dir, webserver_url): with open(batch_url) as f: data = json.loads(f.read()) + dbcon.update_many( + { + "batch_id": batch["batch_id"], + "status": {"$in": [ERROR_STATUS, REPROCESS_STATUS]} + }, + { + "$set": { + "finish_date": datetime.now(), + "status": SENT_REPROCESSING_STATUS, + "progress": 100 + } + } + ) + try: r = requests.post(server_url, json=data) log.info("response{}".format(r)) except Exception: log.info("exception", exc_info=True) - dbcon.update_one( - {"_id": batch["_id"]}, - {"$set": - { - "finish_date": datetime.now(), - "status": "sent_for_reprocessing", - "progress": 100 - }} - ) + reprocessed_batches.add(batch["batch_id"]) diff --git a/openpype/lib/remote_publish.py b/openpype/lib/remote_publish.py index dd5a3e2864..9632e63ea0 100644 --- a/openpype/lib/remote_publish.py +++ b/openpype/lib/remote_publish.py @@ -11,6 +11,13 @@ from openpype import uninstall from openpype.lib.mongo import OpenPypeMongoConnection from openpype.lib.plugin_tools import parse_json +ERROR_STATUS = "error" +IN_PROGRESS_STATUS = "in_progress" +REPROCESS_STATUS = "reprocess" +SENT_REPROCESSING_STATUS = "sent_for_reprocessing" +FINISHED_REPROCESS_STATUS = "republishing_finished" +FINISHED_OK_STATUS = "finished_ok" + def headless_publish(log, close_plugin_name=None, is_test=False): """Runs publish in a opened host with a context and closes Python process. @@ -26,7 +33,7 @@ def headless_publish(log, close_plugin_name=None, is_test=False): "batch will be unfinished!") return - publish_and_log(dbcon, _id, log, close_plugin_name) + publish_and_log(dbcon, _id, log, close_plugin_name=close_plugin_name) else: publish(log, close_plugin_name) @@ -52,7 +59,7 @@ def start_webpublish_log(dbcon, batch_id, user): "batch_id": batch_id, "start_date": datetime.now(), "user": user, - "status": "in_progress", + "status": IN_PROGRESS_STATUS, "progress": 0 # integer 0-100, percentage }).inserted_id @@ -84,13 +91,14 @@ def publish(log, close_plugin_name=None): sys.exit(1) -def publish_and_log(dbcon, _id, log, close_plugin_name=None): +def publish_and_log(dbcon, _id, log, close_plugin_name=None, batch_id=None): """Loops through all plugins, logs ok and fails into OP DB. Args: dbcon (OpenPypeMongoConnection) - _id (str) + _id (str) - id of current job in DB log (OpenPypeLogger) + batch_id (str) - id sent from frontend close_plugin_name (str): name of plugin with responsibility to close host app """ @@ -121,7 +129,7 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None): {"$set": { "finish_date": datetime.now(), - "status": "error", + "status": ERROR_STATUS, "log": os.linesep.join(log_lines) }} @@ -143,15 +151,29 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None): ) # final update + if batch_id: + dbcon.update_many( + {"batch_id": batch_id, "status": SENT_REPROCESSING_STATUS}, + { + "$set": + { + "finish_date": datetime.now(), + "status": FINISHED_REPROCESS_STATUS, + } + } + ) + dbcon.update_one( {"_id": _id}, - {"$set": - { - "finish_date": datetime.now(), - "status": "finished_ok", - "progress": 100, - "log": os.linesep.join(log_lines) - }} + { + "$set": + { + "finish_date": datetime.now(), + "status": FINISHED_OK_STATUS, + "progress": 100, + "log": os.linesep.join(log_lines) + } + } ) @@ -168,7 +190,7 @@ def fail_batch(_id, batches_in_progress, dbcon): {"$set": { "finish_date": datetime.now(), - "status": "error", + "status": ERROR_STATUS, "log": msg }} diff --git a/openpype/pype_commands.py b/openpype/pype_commands.py index c9612d8915..de0336be2b 100644 --- a/openpype/pype_commands.py +++ b/openpype/pype_commands.py @@ -14,7 +14,8 @@ from openpype.lib.remote_publish import ( publish_and_log, fail_batch, find_variant_key, - get_task_data + get_task_data, + IN_PROGRESS_STATUS ) @@ -209,7 +210,7 @@ class PypeCommands: # safer to start logging here, launch might be broken altogether _id = start_webpublish_log(dbcon, batch_id, user_email) - batches_in_progress = list(dbcon.find({"status": "in_progress"})) + batches_in_progress = list(dbcon.find({"status": IN_PROGRESS_STATUS})) if len(batches_in_progress) > 1: fail_batch(_id, batches_in_progress, dbcon) print("Another batch running, probably stuck, ask admin for help") @@ -314,7 +315,7 @@ class PypeCommands: dbcon = get_webpublish_conn() _id = start_webpublish_log(dbcon, batch_id, user_email) - publish_and_log(dbcon, _id, log) + publish_and_log(dbcon, _id, log, batch_id=batch_id) log.info("Publish finished.")