removed tests in client codebase

This commit is contained in:
Jakub Trllo 2024-02-12 15:25:05 +01:00
parent c88a4bb8fa
commit 0057e4509f
7 changed files with 0 additions and 508 deletions

View file

@ -1,4 +0,0 @@
Tests for Pype
--------------
Trigger by:
`pype test --pype`

View file

@ -1,88 +0,0 @@
import os
import sys
import shutil
import tempfile
import contextlib
import pyblish
import pyblish.plugin
from pyblish.vendor import six
# Setup
HOST = 'python'
FAMILY = 'test.family'
REGISTERED = pyblish.plugin.registered_paths()
PACKAGEPATH = pyblish.lib.main_package_path()
ENVIRONMENT = os.environ.get("PYBLISHPLUGINPATH", "")
PLUGINPATH = os.path.join(PACKAGEPATH, '..', 'tests', 'plugins')
def setup():
pyblish.plugin.deregister_all_paths()
def setup_empty():
"""Disable all plug-ins"""
setup()
pyblish.plugin.deregister_all_plugins()
pyblish.plugin.deregister_all_paths()
pyblish.plugin.deregister_all_hosts()
pyblish.plugin.deregister_all_callbacks()
pyblish.plugin.deregister_all_targets()
pyblish.api.deregister_all_discovery_filters()
def teardown():
"""Restore previously REGISTERED paths"""
pyblish.plugin.deregister_all_paths()
for path in REGISTERED:
pyblish.plugin.register_plugin_path(path)
os.environ["PYBLISHPLUGINPATH"] = ENVIRONMENT
pyblish.api.deregister_all_plugins()
pyblish.api.deregister_all_hosts()
pyblish.api.deregister_all_discovery_filters()
pyblish.api.deregister_test()
pyblish.api.__init__()
@contextlib.contextmanager
def captured_stdout():
"""Temporarily reassign stdout to a local variable"""
try:
sys.stdout = six.StringIO()
yield sys.stdout
finally:
sys.stdout = sys.__stdout__
@contextlib.contextmanager
def captured_stderr():
"""Temporarily reassign stderr to a local variable"""
try:
sys.stderr = six.StringIO()
yield sys.stderr
finally:
sys.stderr = sys.__stderr__
@contextlib.contextmanager
def tempdir():
"""Provide path to temporary directory"""
try:
tempdir = tempfile.mkdtemp()
yield tempdir
finally:
shutil.rmtree(tempdir)
def is_in_tests():
"""Returns if process is running in automatic tests mode.
In tests mode different source DB is used, some plugins might be disabled
etc.
"""
return os.environ.get("IS_TEST") == '1'

View file

@ -1,288 +0,0 @@
import pymongo
import bson
import random
from datetime import datetime
import os
class TestPerformance():
'''
Class for testing performance of representation and their 'files'
parts.
Discussion is if embedded array:
'files' : [ {'_id': '1111', 'path':'....},
{'_id'...}]
OR documents:
'files' : {
'1111': {'path':'....'},
'2222': {'path':'...'}
}
is faster.
Current results:
without additional partial index documents is 3x faster
With index is array 50x faster then document
Partial index something like:
db.getCollection('performance_test').createIndex
({'files._id': 1},
{partialFilterExpresion: {'files': {'$exists': true}}})
!DIDNT work for me, had to create manually in Compass
'''
MONGO_URL = 'mongodb://localhost:27017'
MONGO_DB = 'performance_test'
MONGO_COLLECTION = 'performance_test'
MAX_FILE_SIZE_B = 5000
MAX_NUMBER_OF_SITES = 50
ROOT_DIR = "C:/projects"
inserted_ids = []
def __init__(self, version='array'):
'''
It creates and fills collection, based on value of 'version'.
:param version: 'array' - files as embedded array,
'doc' - as document
'''
self.client = pymongo.MongoClient(self.MONGO_URL)
self.db = self.client[self.MONGO_DB]
self.collection_name = self.MONGO_COLLECTION
self.version = version
if self.version != 'array':
self.collection_name = self.MONGO_COLLECTION + '_doc'
self.collection = self.db[self.collection_name]
self.ids = [] # for testing
self.inserted_ids = []
def prepare(self, no_of_records=100000, create_files=False):
'''
Produce 'no_of_records' of representations with 'files' segment.
It depends on 'version' value in constructor, 'arrray' or 'doc'
:return:
'''
print('Purging {} collection'.format(self.collection_name))
self.collection.delete_many({})
id = bson.objectid.ObjectId()
insert_recs = []
for i in range(no_of_records):
file_id = bson.objectid.ObjectId()
file_id2 = bson.objectid.ObjectId()
file_id3 = bson.objectid.ObjectId()
self.inserted_ids.extend([file_id, file_id2, file_id3])
version_str = "v{:03d}".format(i + 1)
file_name = "test_Cylinder_workfileLookdev_{}.mb".\
format(version_str)
document = {"files": self.get_files(self.version, i + 1,
file_id, file_id2, file_id3,
create_files)
,
"context": {
"subset": "workfileLookdev",
"username": "petrk",
"task": "lookdev",
"family": "workfile",
"hierarchy": "Assets",
"project": {"code": "test", "name": "Test"},
"version": i + 1,
"asset": "Cylinder",
"representation": "mb",
"root": self.ROOT_DIR
},
"dependencies": [],
"name": "mb",
"parent": {"oid": '{}'.format(id)},
"data": {
"path": "C:\\projects\\test_performance\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\{}\\{}".format(version_str, file_name), # noqa: E501
"template": "{root[work]}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}" # noqa: E501
},
"type": "representation",
"schema": "openpype:representation-2.0"
}
insert_recs.append(document)
print('Prepared {} records in {} collection'.
format(no_of_records, self.collection_name))
self.collection.insert_many(insert_recs)
# TODO refactore to produce real array and not needeing ugly regex
self.collection.insert_one({"inserted_id": self.inserted_ids})
print('-' * 50)
def run(self, queries=1000, loops=3):
'''
Run X'queries' that are searching collection Y'loops' times
:param queries: how many times do ..find(...)
:param loops: loop of testing X queries
:return: None
'''
print('Testing version {} on {}'.format(self.version,
self.collection_name))
print('Queries rung {} in {} loops'.format(queries, loops))
inserted_ids = list(self.collection.
find({"inserted_id": {"$exists": True}}))
import re
self.ids = re.findall("'[0-9a-z]*'", str(inserted_ids))
import time
found_cnt = 0
for _ in range(loops):
print('Starting loop {}'.format(_))
start = time.time()
for _ in range(queries):
# val = random.choice(self.ids)
# val = val.replace("'", '')
val = random.randint(0, 50)
print(val)
if (self.version == 'array'):
# prepared for partial index, without 'files': exists
# wont engage
found = self.collection.\
find({'files': {"$exists": True},
'files.sites.name': "local_{}".format(val)}).\
count()
else:
key = "files.{}".format(val)
found = self.collection.find_one({key: {"$exists": True}})
print("found {} records".format(found))
# if found:
# found_cnt += len(list(found))
end = time.time()
print('duration per loop {}'.format(end - start))
print("found_cnt {}".format(found_cnt))
def get_files(self, mode, i, file_id, file_id2, file_id3,
create_files=False):
'''
Wrapper to decide if 'array' or document version should be used
:param mode: 'array'|'doc'
:param i: step number
:param file_id: ObjectId of first dummy file
:param file_id2: ..
:param file_id3: ..
:return:
'''
if mode == 'array':
return self.get_files_array(i, file_id, file_id2, file_id3,
create_files)
else:
return self.get_files_doc(i, file_id, file_id2, file_id3)
def get_files_array(self, i, file_id, file_id2, file_id3,
create_files=False):
ret = [
{
"path": "{root[work]}" + "{root[work]}/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_A_workfileLookdev_v{:03d}.dat".format(i, i), # noqa: E501
"_id": '{}'.format(file_id),
"hash": "temphash",
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
},
{
"path": "{root[work]}" + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_B_workfileLookdev_v{:03d}.dat".format(i, i), # noqa: E501
"_id": '{}'.format(file_id2),
"hash": "temphash",
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
},
{
"path": "{root[work]}" + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_C_workfileLookdev_v{:03d}.dat".format(i, i), # noqa: E501
"_id": '{}'.format(file_id3),
"hash": "temphash",
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
}
]
if create_files:
for f in ret:
path = f.get("path").replace("{root[work]}", self.ROOT_DIR)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as fp:
fp.write(os.urandom(f.get("size")))
return ret
def get_files_doc(self, i, file_id, file_id2, file_id3):
ret = {}
ret['{}'.format(file_id)] = {
"path": "{root[work]}" +
"/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" # noqa: E501
"v{:03d}/test_CylinderA_workfileLookdev_v{:03d}.mb".format(i, i), # noqa: E501
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
ret['{}'.format(file_id2)] = {
"path": "{root[work]}" +
"/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" # noqa: E501
"v{:03d}/test_CylinderB_workfileLookdev_v{:03d}.mb".format(i, i), # noqa: E501
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
ret['{}'.format(file_id3)] = {
"path": "{root[work]}" +
"/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" # noqa: E501
"v{:03d}/test_CylinderC_workfileLookdev_v{:03d}.mb".format(i, i), # noqa: E501
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
return ret
def get_sites(self, number_of_sites=50):
"""
Return array of sites declaration.
Currently on 1st site has "created_dt" fillled, which should
trigger upload to 'gdrive' site.
'gdrive' site is appended, its destination for syncing for
Sync Server
Args:
number_of_sites:
Returns:
"""
sites = []
for i in range(number_of_sites):
site = {'name': "local_{}".format(i)}
# do not create null 'created_dt' field, Mongo doesnt like it
if i == 0:
site['created_dt'] = datetime.now()
sites.append(site)
sites.append({'name': "gdrive"})
return sites
if __name__ == '__main__':
tp = TestPerformance('array')
tp.prepare(no_of_records=10000, create_files=True)
# tp.run(10, 3)
# print('-'*50)
#
# tp = TestPerformance('doc')
# tp.prepare() # enable to prepare data
# tp.run(1000, 3)

View file

@ -1,43 +0,0 @@
from ayon_core.pipeline import (
install_host,
LegacyCreator,
register_creator_plugin,
discover_creator_plugins,
)
class MyTestCreator(LegacyCreator):
my_test_property = "A"
def __init__(self, name, asset, options=None, data=None):
super(MyTestCreator, self).__init__(self, name, asset,
options=None, data=None)
# this is hack like no other - we need to inject our own avalon host
# and bypass all its validation. Avalon hosts are modules that needs
# `ls` callable as attribute. Voila:
class Test:
__name__ = "test"
ls = len
@staticmethod
def install():
register_creator_plugin(MyTestCreator)
def test_avalon_plugin_presets(monkeypatch, printer):
install_host(Test)
plugins = discover_creator_plugins()
printer("Test if we got our test plugin")
assert MyTestCreator in plugins
for p in plugins:
if p.__name__ == "MyTestCreator":
printer("Test if we have overridden existing property")
assert p.my_test_property == "B"
printer("Test if we have overridden superclass property")
assert p.active is False
printer("Test if we have added new property")
assert p.new_property == "new"

View file

@ -1,25 +0,0 @@
# Test for backward compatibility of restructure of lib.py into lib library
# Contains simple imports that should still work
def test_backward_compatibility(printer):
printer("Test if imports still work")
try:
from ayon_core.lib import execute_hook
from ayon_core.lib import PypeHook
from ayon_core.lib import ApplicationLaunchFailed
from ayon_core.lib import get_ffmpeg_tool_path
from ayon_core.lib import get_last_version_from_path
from ayon_core.lib import get_paths_from_environ
from ayon_core.lib import get_version_from_path
from ayon_core.lib import version_up
from ayon_core.lib import get_ffprobe_streams
from ayon_core.lib import source_hash
from ayon_core.lib import run_subprocess
except ImportError as e:
raise

View file

@ -1,60 +0,0 @@
import os
import pyblish.api
import pyblish.util
import pyblish.plugin
from ayon_core.pipeline.publish.lib import filter_pyblish_plugins
from . import lib
def test_pyblish_plugin_filter_modifier(printer, monkeypatch):
"""
Test if pyblish filter can filter and modify plugins on-the-fly.
"""
lib.setup_empty()
monkeypatch.setitem(os.environ, 'PYBLISHPLUGINPATH', '')
plugins = pyblish.api.registered_plugins()
printer("Test if we have no registered plugins")
assert len(plugins) == 0
paths = pyblish.api.registered_paths()
printer("Test if we have no registered plugin paths")
assert len(paths) == 0
class MyTestPlugin(pyblish.api.InstancePlugin):
my_test_property = 1
label = "Collect Renderable Camera(s)"
hosts = ["test"]
families = ["default"]
pyblish.api.register_host("test")
pyblish.api.register_plugin(MyTestPlugin)
pyblish.api.register_discovery_filter(filter_pyblish_plugins)
plugins = pyblish.api.discover()
printer("Test if only one plugin was discovered")
assert len(plugins) == 1
printer("Test if properties are modified correctly")
assert plugins[0].label == "loaded from preset"
assert plugins[0].families == ["changed", "by", "preset"]
assert plugins[0].optional is True
lib.teardown()
def test_pyblish_plugin_filter_removal(monkeypatch):
""" Test that plugin can be removed by filter """
lib.setup_empty()
monkeypatch.setitem(os.environ, 'PYBLISHPLUGINPATH', '')
plugins = pyblish.api.registered_plugins()
class MyTestRemovedPlugin(pyblish.api.InstancePlugin):
my_test_property = 1
label = "Collect Renderable Camera(s)"
hosts = ["test"]
families = ["default"]
pyblish.api.register_host("test")
pyblish.api.register_plugin(MyTestRemovedPlugin)
pyblish.api.register_discovery_filter(filter_pyblish_plugins)
plugins = pyblish.api.discover()
assert len(plugins) == 0