Merge pull request #3989 from pypeclub/feature/OP-3426_Add-support-for-Deadline-for-automatic-tests

This commit is contained in:
Ondřej Samohel 2022-12-02 18:30:44 +01:00 committed by GitHub
commit 0ef117214b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 955 additions and 120 deletions

View file

@ -153,7 +153,8 @@ def get_openpype_global_settings(url: str) -> dict:
# Create mongo connection
client = MongoClient(url, **kwargs)
# Access settings collection
col = client["openpype"]["settings"]
openpype_db = os.environ.get("OPENPYPE_DATABASE_NAME") or "openpype"
col = client[openpype_db]["settings"]
# Query global settings
global_settings = col.find_one({"type": "global_settings"}) or {}
# Close Mongo connection

View file

@ -22,7 +22,9 @@ from .pype_commands import PypeCommands
@click.option("--debug", is_flag=True, expose_value=False,
help="Enable debug")
@click.option("--verbose", expose_value=False,
help="Change OpenPype log level (debug - critical or 0-50)")
help=("Change OpenPype log level (debug - critical or 0-50)"))
@click.option("--automatic-tests", is_flag=True, expose_value=False,
help=("Run in automatic tests mode"))
def main(ctx):
"""Pype is main command serving as entry point to pipeline system.

View file

@ -13,6 +13,7 @@ from openpype.pipeline import install_host
from openpype.modules import ModulesManager
from openpype.tools.utils import host_tools
from openpype.tests.lib import is_in_tests
from .launch_logic import ProcessLauncher, get_stub
log = logging.getLogger(__name__)
@ -47,7 +48,7 @@ def main(*subprocess_args):
webpublisher_addon.headless_publish,
log,
"CloseAE",
os.environ.get("IS_TEST")
is_in_tests()
)
)

View file

@ -427,6 +427,8 @@ class ExporterReviewMov(ExporterReview):
# create nk path
path = os.path.splitext(self.path)[0] + ".nk"
# save file to the path
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
shutil.copyfile(self.instance.context.data["currentFile"], path)
self.log.info("Nodes exported...")

View file

@ -9,6 +9,7 @@ from openpype.lib import env_value_to_bool, Logger
from openpype.modules import ModulesManager
from openpype.pipeline import install_host
from openpype.tools.utils import host_tools
from openpype.tests.lib import is_in_tests
from .launch_logic import ProcessLauncher, stub
@ -42,7 +43,7 @@ def main(*subprocess_args):
webpublisher_addon.headless_publish,
log,
"ClosePS",
os.environ.get("IS_TEST")
is_in_tests()
)
elif env_value_to_bool("AVALON_PHOTOSHOP_WORKFILES_ON_LAUNCH",
default=True):

View file

@ -22,6 +22,7 @@ from openpype_modules.webpublisher.lib import (
get_batch_asset_task_info,
parse_json
)
from openpype.tests.lib import is_in_tests
class CollectBatchData(pyblish.api.ContextPlugin):
@ -39,7 +40,7 @@ class CollectBatchData(pyblish.api.ContextPlugin):
def process(self, context):
self.log.info("CollectBatchData")
batch_dir = os.environ.get("OPENPYPE_PUBLISH_DATA")
if os.environ.get("IS_TEST"):
if is_in_tests():
self.log.debug("Automatic testing, no batch data, skipping")
return

View file

@ -6,6 +6,7 @@ import pyblish.api
from openpype.lib import prepare_template_data
from openpype.hosts.photoshop import api as photoshop
from openpype.settings import get_project_settings
from openpype.tests.lib import is_in_tests
class CollectColorCodedInstances(pyblish.api.ContextPlugin):
@ -46,7 +47,7 @@ class CollectColorCodedInstances(pyblish.api.ContextPlugin):
def process(self, context):
self.log.info("CollectColorCodedInstances")
batch_dir = os.environ.get("OPENPYPE_PUBLISH_DATA")
if (os.environ.get("IS_TEST") and
if (is_in_tests() and
(not batch_dir or not os.path.exists(batch_dir))):
self.log.debug("Automatic testing, no batch data, skipping")
return

View file

@ -2,6 +2,7 @@ import os
import attr
import getpass
import pyblish.api
from datetime import datetime
from openpype.lib import (
env_value_to_bool,
@ -10,6 +11,7 @@ from openpype.lib import (
from openpype.pipeline import legacy_io
from openpype_modules.deadline import abstract_submit_deadline
from openpype_modules.deadline.abstract_submit_deadline import DeadlineJobInfo
from openpype.tests.lib import is_in_tests
@attr.s
@ -48,9 +50,11 @@ class AfterEffectsSubmitDeadline(
context = self._instance.context
batch_name = os.path.basename(self._instance.data["source"])
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
dln_job_info.Name = self._instance.data["name"]
dln_job_info.BatchName = os.path.basename(self._instance.
data["source"])
dln_job_info.BatchName = batch_name
dln_job_info.Plugin = "AfterEffects"
dln_job_info.UserName = context.data.get(
"deadlineUser", getpass.getuser())
@ -83,7 +87,8 @@ class AfterEffectsSubmitDeadline(
"AVALON_APP_NAME",
"OPENPYPE_DEV",
"OPENPYPE_LOG_NO_COLORS",
"OPENPYPE_VERSION"
"OPENPYPE_VERSION",
"IS_TEST"
]
# Add mongo url if it's enabled
if self._instance.context.data.get("deadlinePassMongoUrl"):

View file

@ -5,6 +5,7 @@ from pathlib import Path
from collections import OrderedDict
from zipfile import ZipFile, is_zipfile
import re
from datetime import datetime
import attr
import pyblish.api
@ -12,6 +13,7 @@ import pyblish.api
from openpype.pipeline import legacy_io
from openpype_modules.deadline import abstract_submit_deadline
from openpype_modules.deadline.abstract_submit_deadline import DeadlineJobInfo
from openpype.tests.lib import is_in_tests
class _ZipFile(ZipFile):
@ -261,7 +263,10 @@ class HarmonySubmitDeadline(
job_info.Pool = self._instance.data.get("primaryPool")
job_info.SecondaryPool = self._instance.data.get("secondaryPool")
job_info.ChunkSize = self.chunk_size
job_info.BatchName = os.path.basename(self._instance.data["source"])
batch_name = os.path.basename(self._instance.data["source"])
if is_in_tests:
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
job_info.BatchName = batch_name
job_info.Department = self.department
job_info.Group = self.group
@ -275,7 +280,8 @@ class HarmonySubmitDeadline(
"AVALON_APP_NAME",
"OPENPYPE_DEV",
"OPENPYPE_LOG_NO_COLORS",
"OPENPYPE_VERSION"
"OPENPYPE_VERSION",
"IS_TEST"
]
# Add mongo url if it's enabled
if self._instance.context.data.get("deadlinePassMongoUrl"):

View file

@ -1,5 +1,6 @@
import os
import json
from datetime import datetime
import requests
import hou
@ -7,6 +8,7 @@ import hou
import pyblish.api
from openpype.pipeline import legacy_io
from openpype.tests.lib import is_in_tests
class HoudiniSubmitPublishDeadline(pyblish.api.ContextPlugin):
@ -60,6 +62,8 @@ class HoudiniSubmitPublishDeadline(pyblish.api.ContextPlugin):
job_name = "{scene} [PUBLISH]".format(scene=scenename)
batch_name = "{code} - {scene}".format(code=code, scene=scenename)
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
deadline_user = "roy" # todo: get deadline user dynamically
# Get only major.minor version of Houdini, ignore patch version

View file

@ -1,6 +1,7 @@
import os
import json
import getpass
from datetime import datetime
import requests
import pyblish.api
@ -8,6 +9,7 @@ import pyblish.api
# import hou ???
from openpype.pipeline import legacy_io
from openpype.tests.lib import is_in_tests
class HoudiniSubmitRenderDeadline(pyblish.api.InstancePlugin):
@ -45,6 +47,9 @@ class HoudiniSubmitRenderDeadline(pyblish.api.InstancePlugin):
if code:
batch_name = "{0} - {1}".format(code, batch_name)
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
# Output driver to render
driver = instance[0]

View file

@ -37,6 +37,7 @@ from openpype.hosts.maya.api.lib import get_attr_in_layer
from openpype_modules.deadline import abstract_submit_deadline
from openpype_modules.deadline.abstract_submit_deadline import DeadlineJobInfo
from openpype.tests.lib import is_in_tests
def _validate_deadline_bool_value(instance, attribute, value):
@ -121,6 +122,9 @@ class MayaSubmitDeadline(abstract_submit_deadline.AbstractSubmitDeadline):
src_filepath = context.data["currentFile"]
src_filename = os.path.basename(src_filepath)
if is_in_tests():
src_filename += datetime.now().strftime("%d%m%Y%H%M%S")
job_info.Name = "%s - %s" % (src_filename, instance.name)
job_info.BatchName = src_filename
job_info.Plugin = instance.data.get("mayaRenderPlugin", "MayaBatch")
@ -161,7 +165,8 @@ class MayaSubmitDeadline(abstract_submit_deadline.AbstractSubmitDeadline):
"AVALON_TASK",
"AVALON_APP_NAME",
"OPENPYPE_DEV",
"OPENPYPE_VERSION"
"OPENPYPE_VERSION",
"IS_TEST"
]
# Add mongo url if it's enabled
if self._instance.context.data.get("deadlinePassMongoUrl"):

View file

@ -1,10 +1,12 @@
import os
import requests
from datetime import datetime
from maya import cmds
from openpype.pipeline import legacy_io, PublishXmlValidationError
from openpype.settings import get_project_settings
from openpype.tests.lib import is_in_tests
import pyblish.api
@ -57,6 +59,8 @@ class MayaSubmitRemotePublishDeadline(pyblish.api.InstancePlugin):
job_name = "{scene} [PUBLISH]".format(scene=scenename)
batch_name = "{code} - {scene}".format(code=project_name,
scene=scenename)
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
# Generate the payload for Deadline submission
payload = {

View file

@ -2,12 +2,14 @@ import os
import re
import json
import getpass
from datetime import datetime
import requests
import pyblish.api
import nuke
from openpype.pipeline import legacy_io
from openpype.tests.lib import is_in_tests
class NukeSubmitDeadline(pyblish.api.InstancePlugin):
@ -141,8 +143,11 @@ class NukeSubmitDeadline(pyblish.api.InstancePlugin):
responce_data=None
):
render_dir = os.path.normpath(os.path.dirname(render_path))
script_name = os.path.basename(script_path)
jobname = "%s - %s" % (script_name, instance.name)
batch_name = os.path.basename(script_path)
jobname = "%s - %s" % (batch_name, instance.name)
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
output_filename_0 = self.preview_fname(render_path)
@ -176,7 +181,7 @@ class NukeSubmitDeadline(pyblish.api.InstancePlugin):
payload = {
"JobInfo": {
# Top-level group name
"BatchName": script_name,
"BatchName": batch_name,
# Asset dependency to wait for at least the scene file to sync.
# "AssetDependency0": script_path,

View file

@ -18,6 +18,7 @@ from openpype.pipeline import (
get_representation_path,
legacy_io,
)
from openpype.tests.lib import is_in_tests
from openpype.pipeline.farm.patterning import match_aov_pattern
@ -142,7 +143,9 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
"OPENPYPE_RENDER_JOB",
"OPENPYPE_PUBLISH_JOB",
"OPENPYPE_MONGO",
"OPENPYPE_VERSION"
"OPENPYPE_VERSION",
"IS_TEST"
]
# custom deadline attributes
@ -212,6 +215,8 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
more universal code. Muster post job is sent directly by Muster
submitter, so this type of code isn't necessary for it.
Returns:
(str): deadline_publish_job_id
"""
data = instance.data.copy()
subset = data["subset"]
@ -245,6 +250,7 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
environment["OPENPYPE_USERNAME"] = instance.context.data["user"]
environment["OPENPYPE_PUBLISH_JOB"] = "1"
environment["OPENPYPE_RENDER_JOB"] = "0"
environment["IS_TEST"] = is_in_tests()
# Add mongo url if it's enabled
if instance.context.data.get("deadlinePassMongoUrl"):
mongo_url = os.environ.get("OPENPYPE_MONGO")
@ -261,6 +267,9 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
"--targets", "farm"
]
if is_in_tests():
args.append("--automatic-tests")
# Generate the payload for Deadline submission
payload = {
"JobInfo": {
@ -331,6 +340,10 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
if not response.ok:
raise Exception(response.text)
deadline_publish_job_id = response.json()["_id"]
return deadline_publish_job_id
def _copy_extend_frames(self, instance, representation):
"""Copy existing frames from latest version.
@ -991,7 +1004,8 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
self.deadline_url = instance.data.get("deadlineUrl")
assert self.deadline_url, "Requires Deadline Webservice URL"
self._submit_deadline_post_job(instance, render_job, instances)
deadline_publish_job_id = \
self._submit_deadline_post_job(instance, render_job, instances)
# publish job file
publish_job = {
@ -1009,6 +1023,9 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
"instances": instances
}
if deadline_publish_job_id:
publish_job["deadline_publish_job_id"] = deadline_publish_job_id
# add audio to metadata file if available
audio_file = context.data.get("audioFile")
if audio_file and os.path.isfile(audio_file):

View file

@ -333,10 +333,13 @@ def inject_openpype_environment(deadlinePlugin):
"app": job.GetJobEnvironmentKeyValue("AVALON_APP_NAME"),
"envgroup": "farm"
}
if job.GetJobEnvironmentKeyValue('IS_TEST'):
args.append("--automatic-tests")
if all(add_kwargs.values()):
for key, value in add_kwargs.items():
args.extend(["--{}".format(key), value])
else:
raise RuntimeError((
"Missing required env vars: AVALON_PROJECT, AVALON_ASSET,"

View file

@ -5,6 +5,8 @@ import shutil
import pyblish.api
import re
from openpype.tests.lib import is_in_tests
class CleanUp(pyblish.api.InstancePlugin):
"""Cleans up the staging directory after a successful publish.
@ -44,6 +46,9 @@ class CleanUp(pyblish.api.InstancePlugin):
def process(self, instance):
"""Plugin entry point."""
if is_in_tests():
# let automatic test process clean up temporary data
return
# Get the errored instances
failed = []
for result in instance.context.data["results"]:

View file

@ -4,7 +4,9 @@
import os
import pyblish.api
from openpype.pipeline import legacy_io
from openpype.host import IPublishHost
from openpype.pipeline import legacy_io, registered_host
from openpype.pipeline.create import CreateContext
class CollectFromCreateContext(pyblish.api.ContextPlugin):
@ -15,7 +17,11 @@ class CollectFromCreateContext(pyblish.api.ContextPlugin):
def process(self, context):
create_context = context.data.pop("create_context", None)
# Skip if create context is not available
if not create_context:
host = registered_host()
if isinstance(host, IPublishHost):
create_context = CreateContext(host)
if not create_context:
return
@ -31,6 +37,7 @@ class CollectFromCreateContext(pyblish.api.ContextPlugin):
context.data["projectName"] = project_name
for created_instance in create_context.instances:
self.log.info(f"created_instance:: {created_instance}")
instance_data = created_instance.data_to_store()
if instance_data["active"]:
thumbnail_path = thumbnail_paths_by_instance_id.get(

View file

@ -2,6 +2,7 @@ import os
import pyblish.api
from openpype.lib import get_version_from_path
from openpype.tests.lib import is_in_tests
class CollectSceneVersion(pyblish.api.ContextPlugin):
@ -36,7 +37,7 @@ class CollectSceneVersion(pyblish.api.ContextPlugin):
# tests should be close to regular publish as possible
if (
os.environ.get("HEADLESS_PUBLISH")
and not os.environ.get("IS_TEST")
and not is_in_tests()
and context.data["hostName"] in self.skip_hosts_headless_publish):
self.log.debug("Skipping for headless publishing")
return

View file

@ -299,7 +299,7 @@ class PypeCommands:
if pyargs:
args.extend(["--pyargs", pyargs])
if persist:
if test_data_folder:
args.extend(["--test_data_folder", test_data_folder])
if persist:

View file

@ -78,3 +78,12 @@ def tempdir():
yield tempdir
finally:
shutil.rmtree(tempdir)
def is_in_tests():
"""Returns if process is running in automatic tests mode.
In tests mode different source DB is used, some plugins might be disabled
etc.
"""
return os.environ.get("IS_TEST") == '1'

View file

@ -242,6 +242,10 @@ if "--debug" in sys.argv:
sys.argv.remove("--debug")
os.environ["OPENPYPE_DEBUG"] = "1"
if "--automatic-tests" in sys.argv:
sys.argv.remove("--automatic-tests")
os.environ["IS_TEST"] = "1"
if "--use-staging" in sys.argv:
sys.argv.remove("--use-staging")
os.environ["OPENPYPE_USE_STAGING"] = "1"
@ -987,6 +991,14 @@ def boot():
os.environ["OPENPYPE_DATABASE_NAME"] = \
os.environ.get("OPENPYPE_DATABASE_NAME") or "openpype"
if os.environ.get("IS_TEST") == "1":
# change source DBs to predefined ones set for automatic testing
if "_tests" not in os.environ["OPENPYPE_DATABASE_NAME"]:
os.environ["OPENPYPE_DATABASE_NAME"] += "_tests"
avalon_db = os.environ.get("AVALON_DB") or "avalon"
if "_tests" not in avalon_db:
os.environ["AVALON_DB"] = avalon_db + "_tests"
global_settings = get_openpype_global_settings(openpype_mongo)
_print(">>> run disk mapping command ...")

View file

@ -1,5 +1,15 @@
Automatic tests for OpenPype
============================
Requirements:
============
Tests are recreating fresh DB for each run, so `mongorestore`, `mongodump` and `mongoimport` command line tools must be installed and on Path.
You can find intallers here: https://www.mongodb.com/docs/database-tools/installation/installation/
You can test that `mongorestore` is available by running this in console, or cmd:
```mongorestore --version```
Structure:
- integration - end to end tests, slow (see README.md in the integration folder for more info)
- openpype/modules/MODULE_NAME - structure follow directory structure in code base

View file

@ -43,3 +43,15 @@ def app_variant(request):
@pytest.fixture(scope="module")
def timeout(request):
return request.config.getoption("--timeout")
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
setattr(item, "rep_" + rep.when, rep)

View file

@ -2,10 +2,14 @@ import os
import pytest
import shutil
from tests.lib.testing_classes import HostFixtures
from tests.lib.testing_classes import (
HostFixtures,
PublishTest,
DeadlinePublishTest
)
class AfterEffectsTestClass(HostFixtures):
class AEHostFixtures(HostFixtures):
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data, output_folder_url):
"""Get last_workfile_path from source data.
@ -15,15 +19,15 @@ class AfterEffectsTestClass(HostFixtures):
src_path = os.path.join(download_test_data,
"input",
"workfile",
"test_project_test_asset_TestTask_v001.aep")
dest_folder = os.path.join(download_test_data,
"test_project_test_asset_test_task_v001.aep")
dest_folder = os.path.join(output_folder_url,
self.PROJECT,
self.ASSET,
"work",
self.TASK)
os.makedirs(dest_folder)
dest_path = os.path.join(dest_folder,
"test_project_test_asset_TestTask_v001.aep")
"test_project_test_asset_test_task_v001.aep")
shutil.copy(src_path, dest_path)
yield dest_path
@ -32,3 +36,16 @@ class AfterEffectsTestClass(HostFixtures):
def startup_scripts(self, monkeypatch_session, download_test_data):
"""Points Maya to userSetup file from input data"""
pass
@pytest.fixture(scope="module")
def skip_compare_folders(self):
# skip folder that contain "Logs", these come only from Deadline
return ["Logs", "Auto-Save"]
class AELocalPublishTestClass(AEHostFixtures, PublishTest):
"""Testing class for local publishes."""
class AEDeadlinePublishTestClass(AEHostFixtures, DeadlinePublishTest):
"""Testing class for Deadline publishes."""

View file

@ -0,0 +1,93 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.aftereffects.lib import AEDeadlinePublishTestClass
log = logging.getLogger("test_publish_in_aftereffects")
class TestDeadlinePublishInAfterEffects(AEDeadlinePublishTestClass):
"""Basic test case for DL publishing in AfterEffects
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
Opens AfterEffects, run DL publish on prepared workile.
Test zip file sets 3 required env vars:
- HEADLESS_PUBLISH - this triggers publish immediately app is open
- IS_TEST - this differentiate between regular webpublish
- PYBLISH_TARGETS
Waits for publish job on DL is finished.
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
"""
PERSIST = False
TEST_FILES = [
("1xhd2ij2ixyjCyTjZgcJEPAIiBHLU1FEY",
"test_aftereffects_publish.zip",
"")
]
APP_GROUP = "aftereffects"
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 2))
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "aep"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "png_png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":
test_case = TestDeadlinePublishInAfterEffects()

View file

@ -0,0 +1,121 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.aftereffects.lib import AEDeadlinePublishTestClass
log = logging.getLogger("test_publish_in_aftereffects")
class TestDeadlinePublishInAfterEffectsMultiComposition(AEDeadlinePublishTestClass): # noqa
"""est case for DL publishing in AfterEffects with multiple compositions.
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
Opens AfterEffects, run DL publish on prepared workile.
Test zip file sets 3 required env vars:
- HEADLESS_PUBLISH - this triggers publish immediately app is open
- IS_TEST - this differentiate between regular webpublish
- PYBLISH_TARGETS
As there are multiple render and publish jobs, it waits for publish job
of later render job. Depends on date created of metadata.json.
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
"""
PERSIST = False
TEST_FILES = [
("16xIm3U5P7WQJXpa9E06jWebMK9QKUATN",
"test_aftereffects_deadline_publish_multicomposition.zip",
"")
]
APP_GROUP = "aftereffects"
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 2))
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 3))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain2"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 7))
additional_args = {"context.subset": "workfileTest_task",
"context.ext": "aep"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
# renderTest_taskMain
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "png_png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
# renderTest_taskMain2
additional_args = {"context.subset": "renderTest_taskMain2",
"context.ext": "exr"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain2",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain2",
"name": "png_exr"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":
test_case = TestDeadlinePublishInAfterEffectsMultiComposition()

View file

@ -1,12 +1,12 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.aftereffects.lib import AfterEffectsTestClass
from tests.integration.hosts.aftereffects.lib import AELocalPublishTestClass
log = logging.getLogger("test_publish_in_aftereffects")
class TestPublishInAfterEffects(AfterEffectsTestClass):
class TestPublishInAfterEffects(AELocalPublishTestClass):
"""Basic test case for publishing in AfterEffects
Uses generic TestCase to prepare fixtures for test data, testing DBs,
@ -32,10 +32,10 @@ class TestPublishInAfterEffects(AfterEffectsTestClass):
"")
]
APP = "aftereffects"
APP_GROUP = "aftereffects"
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP, APP_VARIANT)
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
@ -49,27 +49,41 @@ class TestPublishInAfterEffects(AfterEffectsTestClass):
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="imageMainBackgroundcopy"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="reviewTesttask"))
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderTestTaskDefault",
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "aep"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "png_png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)

View file

@ -0,0 +1,93 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.aftereffects.lib import AELocalPublishTestClass
log = logging.getLogger("test_publish_in_aftereffects")
class TestPublishInAfterEffects(AELocalPublishTestClass):
"""Basic test case for publishing in AfterEffects
Uses old Pyblish schema of created instances.
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
Opens AfterEffects, run publish on prepared workile.
Test zip file sets 3 required env vars:
- HEADLESS_PUBLISH - this triggers publish immediately app is open
- IS_TEST - this differentiate between regular webpublish
- PYBLISH_TARGETS
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
"""
PERSIST = False
TEST_FILES = [
("1jqI_uG2NusKFvZZF7C0ScHjxFJrlc9F-",
"test_aftereffects_publish_legacy.zip",
"")
]
APP_GROUP = "aftereffects"
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 2))
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "aep"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "png_png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":
test_case = TestPublishInAfterEffects()

View file

@ -1,15 +1,15 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.aftereffects.lib import AfterEffectsTestClass
from tests.integration.hosts.aftereffects.lib import AELocalPublishTestClass
log = logging.getLogger("test_publish_in_aftereffects")
class TestPublishInAfterEffects(AfterEffectsTestClass):
class TestPublishInAfterEffects(AELocalPublishTestClass):
"""Basic test case for publishing in AfterEffects
Should publish 5 frames
Should publish 10 frames
"""
PERSIST = True
@ -19,10 +19,10 @@ class TestPublishInAfterEffects(AfterEffectsTestClass):
"")
]
APP = "aftereffects"
APP_GROUP = "aftereffects"
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP, APP_VARIANT)
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
@ -36,27 +36,41 @@ class TestPublishInAfterEffects(AfterEffectsTestClass):
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="imageMainBackgroundcopy"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="reviewTesttask"))
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderTestTaskDefault",
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "aep"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "thumbnail"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain",
"name": "h264_png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)

View file

@ -2,10 +2,14 @@ import os
import pytest
import shutil
from tests.lib.testing_classes import HostFixtures
from tests.lib.testing_classes import (
HostFixtures,
PublishTest,
DeadlinePublishTest
)
class MayaTestClass(HostFixtures):
class MayaHostFixtures(HostFixtures):
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data, output_folder_url):
"""Get last_workfile_path from source data.
@ -15,7 +19,7 @@ class MayaTestClass(HostFixtures):
src_path = os.path.join(download_test_data,
"input",
"workfile",
"test_project_test_asset_TestTask_v001.mb")
"test_project_test_asset_test_task_v001.mb")
dest_folder = os.path.join(output_folder_url,
self.PROJECT,
self.ASSET,
@ -23,7 +27,7 @@ class MayaTestClass(HostFixtures):
self.TASK)
os.makedirs(dest_folder)
dest_path = os.path.join(dest_folder,
"test_project_test_asset_TestTask_v001.mb")
"test_project_test_asset_test_task_v001.mb")
shutil.copy(src_path, dest_path)
yield dest_path
@ -39,3 +43,15 @@ class MayaTestClass(HostFixtures):
"{}{}{}".format(startup_path,
os.pathsep,
original_pythonpath))
@pytest.fixture(scope="module")
def skip_compare_folders(self):
yield []
class MayaLocalPublishTestClass(MayaHostFixtures, PublishTest):
"""Testing class for local publishes."""
class MayaDeadlinePublishTestClass(MayaHostFixtures, DeadlinePublishTest):
"""Testing class for Deadline publishes."""

View file

@ -0,0 +1,102 @@
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.maya.lib import MayaDeadlinePublishTestClass
class TestDeadlinePublishInMaya(MayaDeadlinePublishTestClass):
"""Basic test case for publishing in Maya
Always pulls and uses test data from GDrive!
Opens Maya, runs publish on prepared workile.
Sends file to be rendered on Deadline.
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
How to run:
(in cmd with activated {OPENPYPE_ROOT}/.venv)
{OPENPYPE_ROOT}/.venv/Scripts/python.exe {OPENPYPE_ROOT}/start.py runtests ../tests/integration/hosts/maya # noqa: E501
"""
PERSIST = True
TEST_FILES = [
("1dDY7CbdFXfRksGVoiuwjhnPoTRCCf5ea",
"test_maya_deadline_publish.zip", "")
]
APP_GROUP = "maya"
# keep empty to locate latest installed variant or explicit
APP_VARIANT = ""
TIMEOUT = 120 # publish timeout
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 3))
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="modelMain"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain_beauty"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(DBAssert.count_of_types(dbcon, "representation", 8))
# hero included
additional_args = {"context.subset": "modelMain",
"context.ext": "abc"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 2,
additional_args=additional_args))
# hero included
additional_args = {"context.subset": "modelMain",
"context.ext": "ma"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 2,
additional_args=additional_args))
additional_args = {"context.subset": "modelMain",
"context.ext": "mb"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 0,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain_beauty",
"context.ext": "exr"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain_beauty",
"context.ext": "jpg"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
additional_args = {"context.subset": "renderTest_taskMain_beauty",
"context.ext": "png"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":
test_case = TestDeadlinePublishInMaya()

View file

@ -1,7 +1,8 @@
from tests.integration.hosts.maya.lib import MayaTestClass
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.maya.lib import MayaLocalPublishTestClass
class TestPublishInMaya(MayaTestClass):
class TestPublishInMaya(MayaLocalPublishTestClass):
"""Basic test case for publishing in Maya
Shouldnt be running standalone only via 'runtests' pype command! (??)
@ -28,7 +29,7 @@ class TestPublishInMaya(MayaTestClass):
("1BTSIIULJTuDc8VvXseuiJV_fL6-Bu7FP", "test_maya_publish.zip", "")
]
APP = "maya"
APP_GROUP = "maya"
# keep empty to locate latest installed variant or explicit
APP_VARIANT = ""
@ -37,33 +38,41 @@ class TestPublishInMaya(MayaTestClass):
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
assert 5 == dbcon.count_documents({"type": "version"}), \
"Not expected no of versions"
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 2))
assert 0 == dbcon.count_documents({"type": "version",
"name": {"$ne": 1}}), \
"Only versions with 1 expected"
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
assert 1 == dbcon.count_documents({"type": "subset",
"name": "modelMain"}), \
"modelMain subset must be present"
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="modelMain"))
assert 1 == dbcon.count_documents({"type": "subset",
"name": "workfileTest_task"}), \
"workfileTest_task subset must be present"
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
assert 11 == dbcon.count_documents({"type": "representation"}), \
"Not expected no of representations"
failures.append(DBAssert.count_of_types(dbcon, "representation", 5))
assert 2 == dbcon.count_documents({"type": "representation",
"context.subset": "modelMain",
"context.ext": "abc"}), \
"Not expected no of representations with ext 'abc'"
additional_args = {"context.subset": "modelMain",
"context.ext": "abc"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 2,
additional_args=additional_args))
assert 2 == dbcon.count_documents({"type": "representation",
"context.subset": "modelMain",
"context.ext": "ma"}), \
"Not expected no of representations with ext 'abc'"
additional_args = {"context.subset": "modelMain",
"context.ext": "ma"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 2,
additional_args=additional_args))
additional_args = {"context.subset": "workfileTest_task",
"context.ext": "mb"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":

View file

@ -1,17 +1,21 @@
import os
import pytest
import shutil
import re
from tests.lib.testing_classes import HostFixtures
from tests.lib.testing_classes import (
HostFixtures,
PublishTest,
DeadlinePublishTest
)
class NukeTestClass(HostFixtures):
class NukeHostFixtures(HostFixtures):
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data, output_folder_url):
"""Get last_workfile_path from source data.
"""
source_file_name = "test_project_test_asset_CompositingInNuke_v001.nk"
source_file_name = "test_project_test_asset_test_task_v001.nk"
src_path = os.path.join(download_test_data,
"input",
"workfile",
@ -27,7 +31,16 @@ class NukeTestClass(HostFixtures):
dest_path = os.path.join(dest_folder,
source_file_name)
shutil.copy(src_path, dest_path)
# rewrite old root with temporary file
# TODO - using only C:/projects seems wrong - but where to get root ?
replace_pattern = re.compile(re.escape("C:/projects"), re.IGNORECASE)
with open(src_path, "r") as fp:
updated = fp.read()
updated = replace_pattern.sub(output_folder_url.replace("\\", '/'),
updated)
with open(dest_path, "w") as fp:
fp.write(updated)
yield dest_path
@ -41,4 +54,15 @@ class NukeTestClass(HostFixtures):
monkeypatch_session.setenv("NUKE_PATH",
"{}{}{}".format(startup_path,
os.pathsep,
original_nuke_path))
original_nuke_path))
@pytest.fixture(scope="module")
def skip_compare_folders(self):
yield []
class NukeLocalPublishTestClass(NukeHostFixtures, PublishTest):
"""Testing class for local publishes."""
class NukeDeadlinePublishTestClass(NukeHostFixtures, DeadlinePublishTest):
"""Testing class for Deadline publishes."""

View file

@ -0,0 +1,84 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.nuke.lib import NukeDeadlinePublishTestClass
log = logging.getLogger("test_publish_in_nuke")
class TestDeadlinePublishInNuke(NukeDeadlinePublishTestClass):
"""Basic test case for publishing in Nuke
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
!!!
It expects modified path in WriteNode,
use '[python {nuke.script_directory()}]' instead of regular root
dir (eg. instead of `c:/projects`).
Access file path by selecting WriteNode group, CTRL+Enter, update file
input
!!!
Opens Nuke, run publish on prepared workile.
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
How to run:
(in cmd with activated {OPENPYPE_ROOT}/.venv)
{OPENPYPE_ROOT}/.venv/Scripts/python.exe {OPENPYPE_ROOT}/start.py
runtests ../tests/integration/hosts/nuke # noqa: E501
To check log/errors from launched app's publish process keep PERSIST
to True and check `test_openpype.logs` collection.
"""
# https://drive.google.com/file/d/1SUurHj2aiQ21ZIMJfGVBI2KjR8kIjBGI/view?usp=sharing # noqa: E501
TEST_FILES = [
("1SeWprClKhWMv2xVC9AcnekIJFExxnp_b",
"test_nuke_deadline_publish.zip", "")
]
APP_GROUP = "nuke"
TIMEOUT = 180 # publish timeout
# could be overwritten by command line arguments
# keep empty to locate latest installed variant or explicit
APP_VARIANT = ""
PERSIST = False # True - keep test_db, test_openpype, outputted test files
TEST_DATA_FOLDER = None
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
failures = []
failures.append(DBAssert.count_of_types(dbcon, "version", 2))
failures.append(
DBAssert.count_of_types(dbcon, "version", 0, name={"$ne": 1}))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "exr"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,
additional_args=additional_args))
assert not any(failures)
if __name__ == "__main__":
test_case = TestDeadlinePublishInNuke()

View file

@ -1,17 +1,25 @@
import logging
from tests.lib.assert_classes import DBAssert
from tests.integration.hosts.nuke.lib import NukeTestClass
from tests.integration.hosts.nuke.lib import NukeLocalPublishTestClass
log = logging.getLogger("test_publish_in_nuke")
class TestPublishInNuke(NukeTestClass):
class TestPublishInNuke(NukeLocalPublishTestClass):
"""Basic test case for publishing in Nuke
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
!!!
It expects modified path in WriteNode,
use '[python {nuke.script_directory()}]' instead of regular root
dir (eg. instead of `c:/projects/test_project/test_asset/test_task`).
Access file path by selecting WriteNode group, CTRL+Enter, update file
input
!!!
Opens Nuke, run publish on prepared workile.
Then checks content of DB (if subset, version, representations were
@ -20,7 +28,8 @@ class TestPublishInNuke(NukeTestClass):
How to run:
(in cmd with activated {OPENPYPE_ROOT}/.venv)
{OPENPYPE_ROOT}/.venv/Scripts/python.exe {OPENPYPE_ROOT}/start.py runtests ../tests/integration/hosts/nuke # noqa: E501
{OPENPYPE_ROOT}/.venv/Scripts/python.exe {OPENPYPE_ROOT}/start.py
runtests ../tests/integration/hosts/nuke # noqa: E501
To check log/errors from launched app's publish process keep PERSIST
to True and check `test_openpype.logs` collection.
@ -30,14 +39,14 @@ class TestPublishInNuke(NukeTestClass):
("1SUurHj2aiQ21ZIMJfGVBI2KjR8kIjBGI", "test_Nuke_publish.zip", "")
]
APP = "nuke"
APP_GROUP = "nuke"
TIMEOUT = 120 # publish timeout
TIMEOUT = 50 # publish timeout
# could be overwritten by command line arguments
# keep empty to locate latest installed variant or explicit
APP_VARIANT = ""
PERSIST = True # True - keep test_db, test_openpype, outputted test files
PERSIST = False # True - keep test_db, test_openpype, outputted test files
TEST_DATA_FOLDER = None
def test_db_asserts(self, dbcon, publish_finished):
@ -52,7 +61,7 @@ class TestPublishInNuke(NukeTestClass):
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
name="renderCompositingInNukeMain"))
name="renderTest_taskMain"))
failures.append(
DBAssert.count_of_types(dbcon, "subset", 1,
@ -61,7 +70,7 @@ class TestPublishInNuke(NukeTestClass):
failures.append(
DBAssert.count_of_types(dbcon, "representation", 4))
additional_args = {"context.subset": "renderCompositingInNukeMain",
additional_args = {"context.subset": "renderTest_taskMain",
"context.ext": "exr"}
failures.append(
DBAssert.count_of_types(dbcon, "representation", 1,

View file

@ -2,10 +2,13 @@ import os
import pytest
import shutil
from tests.lib.testing_classes import HostFixtures
from tests.lib.testing_classes import (
HostFixtures,
PublishTest
)
class PhotoshopTestClass(HostFixtures):
class PhotoshopTestClass(HostFixtures, PublishTest):
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data, output_folder_url):
"""Get last_workfile_path from source data.
@ -32,3 +35,7 @@ class PhotoshopTestClass(HostFixtures):
def startup_scripts(self, monkeypatch_session, download_test_data):
"""Points Maya to userSetup file from input data"""
pass
@pytest.fixture(scope="module")
def skip_compare_folders(self):
yield []

View file

@ -41,11 +41,11 @@ class TestPublishInPhotoshop(PhotoshopTestClass):
("1zD2v5cBgkyOm_xIgKz3WKn8aFB_j8qC-", "test_photoshop_publish.zip", "")
]
APP = "photoshop"
APP_GROUP = "photoshop"
# keep empty to locate latest installed variant or explicit
APP_VARIANT = ""
APP_NAME = "{}/{}".format(APP, APP_VARIANT)
APP_NAME = "{}/{}".format(APP_GROUP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
@ -72,7 +72,7 @@ class TestPublishInPhotoshop(PhotoshopTestClass):
name="workfileTest_task"))
failures.append(
DBAssert.count_of_types(dbcon, "representation", 8))
DBAssert.count_of_types(dbcon, "representation", 6))
additional_args = {"context.subset": "imageMainForeground",
"context.ext": "png"}

View file

@ -118,9 +118,8 @@ class DBHandler:
"Run with overwrite=True")
else:
if collection:
coll = self.client[db_name_out].get(collection)
if coll:
coll.drop()
if collection in self.client[db_name_out].list_collection_names(): # noqa
self.client[db_name_out][collection].drop()
else:
self.teardown(db_name_out)
@ -133,7 +132,11 @@ class DBHandler:
db_name=db_name, db_name_out=db_name_out,
collection=collection)
print("mongorestore query:: {}".format(query))
subprocess.run(query)
try:
subprocess.run(query)
except FileNotFoundError:
raise RuntimeError("'mongorestore' utility must be on path."
"Please install it.")
def teardown(self, db_name):
"""Drops 'db_name' if exists."""
@ -231,13 +234,15 @@ class DBHandler:
# Examples
# handler = DBHandler(uri="mongodb://localhost:27017")
# #
# backup_dir = "c:\\projects\\test_nuke_publish\\input\\dumps"
# backup_dir = "c:\\projects\\test_zips\\test_nuke_deadline_publish\\input\\dumps" # noqa
# # #
# handler.backup_to_dump("avalon", backup_dir, True, collection="test_project")
# handler.setup_from_dump("test_db", backup_dir, True, db_name_out="avalon", collection="test_project")
# handler.setup_from_sql_file("test_db", "c:\\projects\\sql\\item.sql",
# handler.backup_to_dump("avalon_tests", backup_dir, True, collection="test_project") # noqa
#handler.backup_to_dump("openpype_tests", backup_dir, True, collection="settings") # noqa
# handler.setup_from_dump("avalon_tests", backup_dir, True, db_name_out="avalon_tests", collection="test_project") # noqa
# handler.setup_from_sql_file("avalon_tests", "c:\\projects\\sql\\item.sql",
# collection="test_project",
# drop=False, mode="upsert")
# handler.setup_from_sql("test_db", "c:\\projects\\sql",
# handler.setup_from_sql("avalon_tests", "c:\\projects\\sql",
# collection="test_project",
# drop=False, mode="upsert")

View file

@ -8,9 +8,12 @@ import tempfile
import shutil
import glob
import platform
import requests
import re
from tests.lib.db_handler import DBHandler
from common.openpype_common.distribution.file_handler import RemoteFileHandler
from openpype.modules import ModulesManager
class BaseTest:
@ -36,9 +39,9 @@ class ModuleUnitTest(BaseTest):
PERSIST = False # True to not purge temporary folder nor test DB
TEST_OPENPYPE_MONGO = "mongodb://localhost:27017"
TEST_DB_NAME = "test_db"
TEST_DB_NAME = "avalon_tests"
TEST_PROJECT_NAME = "test_project"
TEST_OPENPYPE_NAME = "test_openpype"
TEST_OPENPYPE_NAME = "openpype_tests"
TEST_FILES = []
@ -57,7 +60,7 @@ class ModuleUnitTest(BaseTest):
m.undo()
@pytest.fixture(scope="module")
def download_test_data(self, test_data_folder, persist=False):
def download_test_data(self, test_data_folder, persist, request):
test_data_folder = test_data_folder or self.TEST_DATA_FOLDER
if test_data_folder:
print("Using existing folder {}".format(test_data_folder))
@ -78,7 +81,8 @@ class ModuleUnitTest(BaseTest):
print("Temporary folder created:: {}".format(tmpdir))
yield tmpdir
persist = persist or self.PERSIST
persist = (persist or self.PERSIST or
self.is_test_failed(request))
if not persist:
print("Removing {}".format(tmpdir))
shutil.rmtree(tmpdir)
@ -125,7 +129,8 @@ class ModuleUnitTest(BaseTest):
monkeypatch_session.setenv("TEST_SOURCE_FOLDER", download_test_data)
@pytest.fixture(scope="module")
def db_setup(self, download_test_data, env_var, monkeypatch_session):
def db_setup(self, download_test_data, env_var, monkeypatch_session,
request):
"""Restore prepared MongoDB dumps into selected DB."""
backup_dir = os.path.join(download_test_data, "input", "dumps")
@ -135,13 +140,14 @@ class ModuleUnitTest(BaseTest):
overwrite=True,
db_name_out=self.TEST_DB_NAME)
db_handler.setup_from_dump("openpype", backup_dir,
db_handler.setup_from_dump(self.TEST_OPENPYPE_NAME, backup_dir,
overwrite=True,
db_name_out=self.TEST_OPENPYPE_NAME)
yield db_handler
if not self.PERSIST:
persist = self.PERSIST or self.is_test_failed(request)
if not persist:
db_handler.teardown(self.TEST_DB_NAME)
db_handler.teardown(self.TEST_OPENPYPE_NAME)
@ -166,6 +172,13 @@ class ModuleUnitTest(BaseTest):
mongo_client = OpenPypeMongoConnection.get_mongo_client()
yield mongo_client[self.TEST_OPENPYPE_NAME]["settings"]
def is_test_failed(self, request):
# if request.node doesn't have rep_call, something failed
try:
return request.node.rep_call.failed
except AttributeError:
return True
class PublishTest(ModuleUnitTest):
"""Test class for publishing in hosts.
@ -188,7 +201,7 @@ class PublishTest(ModuleUnitTest):
TODO: implement test on file size, file content
"""
APP = ""
APP_GROUP = ""
TIMEOUT = 120 # publish timeout
@ -210,10 +223,10 @@ class PublishTest(ModuleUnitTest):
if not app_variant:
variant = (
application_manager.find_latest_available_variant_for_group(
self.APP))
self.APP_GROUP))
app_variant = variant.name
yield "{}/{}".format(self.APP, app_variant)
yield "{}/{}".format(self.APP_GROUP, app_variant)
@pytest.fixture(scope="module")
def output_folder_url(self, download_test_data):
@ -310,7 +323,8 @@ class PublishTest(ModuleUnitTest):
yield True
def test_folder_structure_same(self, dbcon, publish_finished,
download_test_data, output_folder_url):
download_test_data, output_folder_url,
skip_compare_folders):
"""Check if expected and published subfolders contain same files.
Compares only presence, not size nor content!
@ -328,12 +342,96 @@ class PublishTest(ModuleUnitTest):
glob.glob(expected_dir_base + "\\**", recursive=True)
if f != expected_dir_base and os.path.exists(f))
not_matched = expected.symmetric_difference(published)
assert not not_matched, "Missing {} files".format(
"\n".join(sorted(not_matched)))
filtered_published = self._filter_files(published,
skip_compare_folders)
# filter out temp files also in expected
# could be polluted by accident by copying 'output' to zip file
filtered_expected = self._filter_files(expected, skip_compare_folders)
not_mtched = filtered_expected.symmetric_difference(filtered_published)
if not_mtched:
raise AssertionError("Missing {} files".format(
"\n".join(sorted(not_mtched))))
def _filter_files(self, source_files, skip_compare_folders):
"""Filter list of files according to regex pattern."""
filtered = set()
for file_path in source_files:
if skip_compare_folders:
if not any([re.search(val, file_path)
for val in skip_compare_folders]):
filtered.add(file_path)
else:
filtered.add(file_path)
return filtered
class HostFixtures(PublishTest):
class DeadlinePublishTest(PublishTest):
@pytest.fixture(scope="module")
def publish_finished(self, dbcon, launched_app, download_test_data,
timeout):
"""Dummy fixture waiting for publish to finish"""
import time
time_start = time.time()
timeout = timeout or self.TIMEOUT
timeout = float(timeout)
while launched_app.poll() is None:
time.sleep(0.5)
if time.time() - time_start > timeout:
launched_app.terminate()
raise ValueError("Timeout reached")
metadata_json = glob.glob(os.path.join(download_test_data,
"output",
"**/*_metadata.json"),
recursive=True)
if not metadata_json:
raise RuntimeError("No metadata file found. No job id.")
if len(metadata_json) > 1:
# depends on creation order of published jobs
metadata_json.sort(key=os.path.getmtime, reverse=True)
with open(metadata_json[0]) as fp:
job_info = json.load(fp)
deadline_job_id = job_info["deadline_publish_job_id"]
manager = ModulesManager()
deadline_module = manager.modules_by_name["deadline"]
deadline_url = deadline_module.deadline_urls["default"]
if not deadline_url:
raise ValueError("Must have default deadline url.")
url = "{}/api/jobs?JobId={}".format(deadline_url, deadline_job_id)
valid_date_finished = None
time_start = time.time()
while not valid_date_finished:
time.sleep(0.5)
if time.time() - time_start > timeout:
raise ValueError("Timeout for DL finish reached")
response = requests.get(url, timeout=10)
if not response.ok:
msg = "Couldn't connect to {}".format(deadline_url)
raise RuntimeError(msg)
if not response.json():
raise ValueError("Couldn't find {}".format(deadline_job_id))
# '0001-...' returned until job is finished
valid_date_finished = response.json()[0]["DateComp"][:4] != "0001"
# some clean exit test possible?
print("Publish finished")
yield True
class HostFixtures():
"""Host specific fixtures. Should be implemented once per host."""
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data, output_folder_url):
@ -344,3 +442,8 @@ class HostFixtures(PublishTest):
def startup_scripts(self, monkeypatch_session, download_test_data):
""""Adds init scripts (like userSetup) to expected location"""
raise NotImplementedError
@pytest.fixture(scope="module")
def skip_compare_folders(self):
"""Use list of regexs to filter out published folders from comparing"""
raise NotImplementedError

Binary file not shown.

View file

@ -14,6 +14,11 @@ But many tests should yet be created!
- installed DCC you want to test
- `mongorestore` on a PATH
You could check that `mongorestore` is available by running this in console (or cmd), it shouldn't fail and you should see version of utility:
```commandline
mongorestore --version
```
If you would like just to experiment with provided integration tests, and have particular DCC installed on your machine, you could run test for this host by:
- From source:
@ -23,7 +28,7 @@ If you would like just to experiment with provided integration tests, and have p
```
- From build:
```
- ${OPENPYPE_BUILD}/openpype_console run {ABSOLUTE_PATH_OPENPYPE_ROOT}/tests/integration/hosts/nuke`
- ${OPENPYPE_BUILD}/openpype_console runtests {ABSOLUTE_PATH_OPENPYPE_ROOT}/tests/integration/hosts/nuke`
```
Modify tests path argument to limit which tests should be run (`../tests/integration` will run all implemented integration tests).