ayon-core/openpype/hosts/webpublisher/lib.py
2023-03-30 13:59:22 +02:00

269 lines
7.8 KiB
Python

import os
from datetime import datetime
import collections
import json
from bson.objectid import ObjectId
import pyblish.util
import pyblish.api
from openpype.client.mongo import OpenPypeMongoConnection
from openpype.settings import get_project_settings
from openpype.lib import Logger
from openpype.lib.profiles_filtering import filter_profiles
from openpype.pipeline.publish.lib import find_close_plugin
ERROR_STATUS = "error"
IN_PROGRESS_STATUS = "in_progress"
REPROCESS_STATUS = "reprocess"
SENT_REPROCESSING_STATUS = "sent_for_reprocessing"
FINISHED_REPROCESS_STATUS = "republishing_finished"
FINISHED_OK_STATUS = "finished_ok"
log = Logger.get_logger(__name__)
def parse_json(path):
"""Parses json file at 'path' location
Returns:
(dict) or None if unparsable
Raises:
AssertionError if 'path' doesn't exist
"""
path = path.strip('\"')
assert os.path.isfile(path), (
"Path to json file doesn't exist. \"{}\"".format(path)
)
data = None
with open(path, "r") as json_file:
try:
data = json.load(json_file)
except Exception as exc:
log.error(
"Error loading json: {} - Exception: {}".format(path, exc)
)
return data
def get_batch_asset_task_info(ctx):
"""Parses context data from webpublisher's batch metadata
Returns:
(tuple): asset, task_name (Optional), task_type
"""
task_type = "default_task_type"
task_name = None
asset = None
if ctx["type"] == "task":
items = ctx["path"].split('/')
asset = items[-2]
task_name = ctx["name"]
task_type = ctx["attributes"]["type"]
else:
asset = ctx["name"]
return asset, task_name, task_type
def get_webpublish_conn():
"""Get connection to OP 'webpublishes' collection."""
mongo_client = OpenPypeMongoConnection.get_mongo_client()
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
return mongo_client[database_name]["webpublishes"]
def start_webpublish_log(dbcon, batch_id, user):
"""Start new log record for 'batch_id'
Args:
dbcon (OpenPypeMongoConnection)
batch_id (str)
user (str)
Returns
(ObjectId) from DB
"""
return dbcon.insert_one({
"batch_id": batch_id,
"start_date": datetime.now(),
"user": user,
"status": IN_PROGRESS_STATUS,
"progress": 0 # integer 0-100, percentage
}).inserted_id
def publish_and_log(dbcon, _id, log, close_plugin_name=None, batch_id=None):
"""Loops through all plugins, logs ok and fails into OP DB.
Args:
dbcon (OpenPypeMongoConnection)
_id (str) - id of current job in DB
log (openpype.lib.Logger)
batch_id (str) - id sent from frontend
close_plugin_name (str): name of plugin with responsibility to
close host app
"""
# Error exit as soon as any error occurs.
error_format = "Failed {plugin.__name__}: {error} -- {error.traceback}\n"
error_format += "-" * 80 + "\n"
close_plugin = find_close_plugin(close_plugin_name, log)
if isinstance(_id, str):
_id = ObjectId(_id)
log_lines = []
processed = 0
log_every = 5
for result in pyblish.util.publish_iter():
for record in result["records"]:
log_lines.append("{}: {}".format(
result["plugin"].label, record.msg))
processed += 1
if result["error"]:
log.error(error_format.format(**result))
log_lines = [error_format.format(**result)] + log_lines
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": ERROR_STATUS,
"log": os.linesep.join(log_lines)
}}
)
if close_plugin: # close host app explicitly after error
context = pyblish.api.Context()
close_plugin().process(context)
return
elif processed % log_every == 0:
# pyblish returns progress in 0.0 - 2.0
progress = min(round(result["progress"] / 2 * 100), 99)
dbcon.update_one(
{"_id": _id},
{"$set":
{
"progress": progress,
"log": os.linesep.join(log_lines)
}}
)
# final update
if batch_id:
dbcon.update_many(
{"batch_id": batch_id, "status": SENT_REPROCESSING_STATUS},
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_REPROCESS_STATUS,
}
}
)
dbcon.update_one(
{"_id": _id},
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_OK_STATUS,
"progress": 100,
"log": os.linesep.join(log_lines)
}
}
)
def fail_batch(_id, dbcon, msg):
"""Set current batch as failed as there is some problem.
Raises:
ValueError
"""
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": ERROR_STATUS,
"log": msg
}}
)
raise ValueError(msg)
def find_variant_key(application_manager, host):
"""Searches for latest installed variant for 'host'
Args:
application_manager (ApplicationManager)
host (str)
Returns
(string) (optional)
Raises:
(ValueError) if no variant found
"""
app_group = application_manager.app_groups.get(host)
if not app_group or not app_group.enabled:
raise ValueError("No application {} configured".format(host))
found_variant_key = None
# finds most up-to-date variant if any installed
sorted_variants = collections.OrderedDict(
sorted(app_group.variants.items()))
for variant_key, variant in sorted_variants.items():
for executable in variant.executables:
if executable.exists():
found_variant_key = variant_key
if not found_variant_key:
raise ValueError("No executable for {} found".format(host))
return found_variant_key
def get_task_data(batch_dir):
"""Return parsed data from first task manifest.json
Used for `remotepublishfromapp` command where batch contains only
single task with publishable workfile.
Returns:
(dict)
Throws:
(ValueError) if batch or task manifest not found or broken
"""
batch_data = parse_json(os.path.join(batch_dir, "manifest.json"))
if not batch_data:
raise ValueError(
"Cannot parse batch meta in {} folder".format(batch_dir))
task_dir_name = batch_data["tasks"][0]
task_data = parse_json(os.path.join(batch_dir, task_dir_name,
"manifest.json"))
if not task_data:
raise ValueError(
"Cannot parse batch meta in {} folder".format(task_data))
return task_data
def get_timeout(project_name, host_name, task_type):
"""Returns timeout(seconds) from Setting profile."""
filter_data = {
"task_types": task_type,
"hosts": host_name
}
timeout_profiles = (get_project_settings(project_name)["webpublisher"]
["timeout_profiles"])
matching_item = filter_profiles(timeout_profiles, filter_data)
timeout = 3600
if matching_item:
timeout = matching_item["timeout"]
return timeout