ayon-core/tests/lib/db_handler.py
2021-07-13 15:33:51 +02:00

242 lines
9 KiB
Python

"""
Helper class for automatic testing, provides dump and restore via command
line utilities.
Expect mongodump and mongorestore present at MONGODB_UTILS_DIR
"""
import os
import pymongo
import subprocess
class DBHandler():
# vendorize ??
MONGODB_UTILS_DIR = "c:\\Program Files\\MongoDB\\Server\\4.4\\bin"
def __init__(self, uri=None, host=None, port=None,
user=None, password=None):
"""'uri' or rest of separate credentials"""
if uri:
self.uri = uri
if host:
if all([user, password]):
host = "{}:{}@{}".format(user, password, host)
uri = 'mongodb://{}:{}'.format(host, port or 27017)
assert uri, "Must have uri to MongoDB"
self.client = pymongo.MongoClient(uri)
self.db = None
def setup_empty(self, name):
# not much sense
self.db = self.client[name]
def setup_from_sql(self, db_name, sql_dir, collection=None,
drop=True, mode=None):
"""
Restores 'db_name' from 'sql_url'.
Works with directory with .json files,
if 'collection' arg is empty, name
of .json file is used as name of target collection.
Args:
db_name (str): source DB name
sql_dir (str): folder with json files
collection (str): if all sql files are meant for single coll.
drop (bool): True if drop whole collection
mode (str): "insert" - fails on duplicates
"upsert" - modifies existing
"merge" - updates existing
"delete" - removes in DB present if file
"""
if not os.path.exists(sql_dir):
raise RuntimeError(
"Backup folder {} doesn't exist".format(sql_dir))
for (dirpath, dirnames, filenames) in os.walk(sql_dir):
for file_name in filenames:
sql_url = os.path.join(dirpath, file_name)
query = self._import_query(self.uri, sql_url,
db_name=db_name,
collection=collection,
drop=drop,
mode=mode)
print("mongoimport query:: {}".format(query))
subprocess.run(query)
def setup_from_sql_file(self, db_name, sql_url,
collection=None, drop=True, mode=None):
"""
Restores 'db_name' from 'sql_url'.
Works with single .json file.
If 'collection' arg is empty, name
of .json file is used as name of target collection.
Args:
db_name (str): source DB name
sql_file (str): folder with json files
collection (str): name of target collection
drop (bool): True if drop collection
mode (str): "insert" - fails on duplicates
"upsert" - modifies existing
"merge" - updates existing
"delete" - removes in DB present if file
"""
if not os.path.exists(sql_url):
raise RuntimeError(
"Sql file {} doesn't exist".format(sql_url))
query = self._import_query(self.uri, sql_url,
db_name=db_name,
collection=collection,
drop=drop,
mode=mode)
print("mongoimport query:: {}".format(query))
subprocess.run(query)
def setup_from_dump(self, db_name, dump_dir, overwrite=False,
collection=None, db_name_out=None):
"""
Restores 'db_name' from 'dump_dir'.
Works with BSON folders exported by mongodump
Args:
db_name (str): source DB name
dump_dir (str): folder with dumped subfolders
overwrite (bool): True if overwrite target
collection (str): name of source project
db_name_out (str): name of target DB, if empty restores to
source 'db_name'
"""
db_name_out = db_name_out or db_name
if self._db_exists(db_name) and not overwrite:
raise RuntimeError("DB {} already exists".format(db_name_out) +
"Run with overwrite=True")
dir_path = os.path.join(dump_dir, db_name)
if not os.path.exists(dir_path):
raise RuntimeError(
"Backup folder {} doesn't exist".format(dir_path))
query = self._restore_query(self.uri, dump_dir,
db_name=db_name, db_name_out=db_name_out,
collection=collection)
print("mongorestore query:: {}".format(query))
subprocess.run(query)
def teardown(self, db_name):
"""Drops 'db_name' if exists."""
if not self._db_exists(db_name):
print("{} doesn't exist".format(db_name))
return
print("Dropping {} database".format(db_name))
self.client.drop_database(db_name)
def backup_to_dump(self, db_name, dump_dir, overwrite=False):
"""
Helper class for running mongodump for specific 'db_name'
"""
if not self._db_exists(db_name) and not overwrite:
raise RuntimeError("DB {} doesn't exists".format(db_name))
dir_path = os.path.join(dump_dir, db_name)
if os.path.exists(dir_path) and not overwrite:
raise RuntimeError("Backup already exists, "
"run with overwrite=True")
query = self._dump_query(self.uri, dump_dir, db_name=db_name)
print("Mongodump query:: {}".format(query))
subprocess.run(query)
def _db_exists(self, db_name):
return db_name in self.client.list_database_names()
def _dump_query(self, uri,
output_path,
db_name=None, collection=None):
utility_path = os.path.join(self.MONGODB_UTILS_DIR, "mongodump")
db_part = coll_part = ""
if db_name:
db_part = "--db={}".format(db_name)
if collection:
if not db_name:
raise ValueError("db_name must be present")
coll_part = "--nsInclude={}.{}".format(db_name, collection)
query = "\"{}\" --uri=\"{}\" --out={} {} {}".format(
utility_path, uri, output_path, db_part, coll_part
)
return query
def _restore_query(self, uri, dump_dir,
db_name=None, db_name_out=None,
collection=None, drop=True):
utility_path = os.path.join(self.MONGODB_UTILS_DIR, "mongorestore")
db_part = coll_part = drop_part = ""
if db_name:
db_part = "--nsInclude={}.* --nsFrom={}.*".format(db_name, db_name)
if collection:
assert db_name, "Must provide db name too"
db_part = "--nsInclude={}.{} --nsFrom={}.{}".format(db_name,
collection,
db_name,
collection)
if drop:
drop_part = "--drop"
if db_name_out:
db_part += " --nsTo={}.*".format(db_name_out)
query = "\"{}\" --uri=\"{}\" --dir=\"{}\" {} {} {}".format(
utility_path, uri, dump_dir, db_part, coll_part, drop_part
)
return query
def _import_query(self, uri, sql_url,
db_name=None,
collection=None, drop=True, mode=None):
utility_path = os.path.join(self.MONGODB_UTILS_DIR, "mongoimport")
db_part = coll_part = drop_part = mode_part = ""
if db_name:
db_part = "--db {}".format(db_name)
if collection:
assert db_name, "Must provide db name too"
coll_part = "--collection {}".format(collection)
if drop:
drop_part = "--drop"
if mode:
mode_part = "--mode {}".format(mode)
query = \
"\"{}\" --legacy --uri=\"{}\" --file=\"{}\" {} {} {} {}".format(
utility_path, uri, sql_url,
db_part, coll_part, drop_part, mode_part)
return query
# handler = DBHandler(uri="mongodb://localhost:27017")
#
# backup_dir = "c:\\projects\\dumps"
# #
# # handler.backup_to_dump("openpype", backup_dir, True)
# # handler.setup_from_dump("test_db", backup_dir, True)
# # handler.setup_from_sql_file("test_db", "c:\\projects\\sql\\item.sql",
# # collection="test_project",
# # drop=False, mode="upsert")
# handler.setup_from_sql("test_db", "c:\\projects\\sql",
# collection="test_project",
# drop=False, mode="upsert")