""" Helper class for automatic testing, provides dump and restore via command line utilities. Expect mongodump, mongoimport and mongorestore present at PATH """ import os import pymongo import subprocess class DBHandler: def __init__(self, uri=None, host=None, port=None, user=None, password=None): """'uri' or rest of separate credentials""" if uri: self.uri = uri if host: if all([user, password]): host = "{}:{}@{}".format(user, password, host) self.uri = 'mongodb://{}:{}'.format(host, port or 27017) assert self.uri, "Must have uri to MongoDB" self.client = pymongo.MongoClient(uri) self.db = None def setup_empty(self, name): # not much sense self.db = self.client[name] def setup_from_sql(self, db_name, sql_dir, collection=None, drop=True, mode=None): """ Restores 'db_name' from 'sql_url'. Works with directory with .json files, if 'collection' arg is empty, name of .json file is used as name of target collection. Args: db_name (str): source DB name sql_dir (str): folder with json files collection (str): if all sql files are meant for single coll. drop (bool): True if drop whole collection mode (str): "insert" - fails on duplicates "upsert" - modifies existing "merge" - updates existing "delete" - removes in DB present if file """ if not os.path.exists(sql_dir): raise RuntimeError( "Backup folder {} doesn't exist".format(sql_dir)) for (dirpath, _dirnames, filenames) in os.walk(sql_dir): for file_name in filenames: sql_url = os.path.join(dirpath, file_name) query = self._import_query(self.uri, sql_url, db_name=db_name, collection=collection, drop=drop, mode=mode) print("mongoimport query:: {}".format(query)) subprocess.run(query) def setup_from_sql_file(self, db_name, sql_url, collection=None, drop=True, mode=None): """ Restores 'db_name' from 'sql_url'. Works with single .json file. If 'collection' arg is empty, name of .json file is used as name of target collection. Args: db_name (str): source DB name sql_file (str): folder with json files collection (str): name of target collection drop (bool): True if drop collection mode (str): "insert" - fails on duplicates "upsert" - modifies existing "merge" - updates existing "delete" - removes in DB present if file """ if not os.path.exists(sql_url): raise RuntimeError( "Sql file {} doesn't exist".format(sql_url)) query = self._import_query(self.uri, sql_url, db_name=db_name, collection=collection, drop=drop, mode=mode) print("mongoimport query:: {}".format(query)) subprocess.run(query) def setup_from_dump(self, db_name, dump_dir, overwrite=False, collection=None, db_name_out=None): """ Restores 'db_name' from 'dump_dir'. Works with BSON folders exported by mongodump Args: db_name (str): source DB name dump_dir (str): folder with dumped subfolders overwrite (bool): True if overwrite target collection (str): name of source project db_name_out (str): name of target DB, if empty restores to source 'db_name' """ db_name_out = db_name_out or db_name if self._db_exists(db_name_out): if not overwrite: raise RuntimeError("DB {} already exists".format(db_name_out) + "Run with overwrite=True") else: if collection: coll = self.client[db_name_out].get(collection) if coll: coll.drop() else: self.teardown(db_name_out) dir_path = os.path.join(dump_dir, db_name) if not os.path.exists(dir_path): raise RuntimeError( "Backup folder {} doesn't exist".format(dir_path)) query = self._restore_query(self.uri, dump_dir, db_name=db_name, db_name_out=db_name_out, collection=collection) print("mongorestore query:: {}".format(query)) subprocess.run(query) def teardown(self, db_name): """Drops 'db_name' if exists.""" if not self._db_exists(db_name): print("{} doesn't exist".format(db_name)) return print("Dropping {} database".format(db_name)) self.client.drop_database(db_name) def backup_to_dump(self, db_name, dump_dir, overwrite=False, collection=None): """ Helper method for running mongodump for specific 'db_name' """ if not self._db_exists(db_name) and not overwrite: raise RuntimeError("DB {} doesn't exists".format(db_name)) dir_path = os.path.join(dump_dir, db_name) if os.path.exists(dir_path) and not overwrite: raise RuntimeError("Backup already exists, " "run with overwrite=True") query = self._dump_query(self.uri, dump_dir, db_name=db_name, collection=collection) print("Mongodump query:: {}".format(query)) subprocess.run(query) def _db_exists(self, db_name): return db_name in self.client.list_database_names() def _dump_query(self, uri, output_path, db_name=None, collection=None): """Prepares dump query based on 'db_name' or 'collection'.""" db_part = coll_part = "" if db_name: db_part = "--db={}".format(db_name) if collection: if not db_name: raise ValueError("db_name must be present") coll_part = "--collection={}".format(collection) query = "\"{}\" --uri=\"{}\" --out={} {} {}".format( "mongodump", uri, output_path, db_part, coll_part ) return query def _restore_query(self, uri, dump_dir, db_name=None, db_name_out=None, collection=None, drop=True): """Prepares query for mongorestore base on arguments""" db_part = coll_part = drop_part = "" if db_name: db_part = "--nsInclude={}.* --nsFrom={}.*".format(db_name, db_name) if collection: assert db_name, "Must provide db name too" db_part = "--nsInclude={}.{} --nsFrom={}.{}".format(db_name, collection, db_name, collection) if drop: drop_part = "--drop" if db_name_out: collection_str = collection or '*' db_part += " --nsTo={}.{}".format(db_name_out, collection_str) query = "\"{}\" --uri=\"{}\" --dir=\"{}\" {} {} {}".format( "mongorestore", uri, dump_dir, db_part, coll_part, drop_part ) return query def _import_query(self, uri, sql_url, db_name=None, collection=None, drop=True, mode=None): db_part = coll_part = drop_part = mode_part = "" if db_name: db_part = "--db {}".format(db_name) if collection: assert db_name, "Must provide db name too" coll_part = "--collection {}".format(collection) if drop: drop_part = "--drop" if mode: mode_part = "--mode {}".format(mode) query = \ "\"{}\" --legacy --uri=\"{}\" --file=\"{}\" {} {} {} {}".format( "mongoimport", uri, sql_url, db_part, coll_part, drop_part, mode_part) return query # Examples # handler = DBHandler(uri="mongodb://localhost:27017") # # # backup_dir = "c:\\projects\\test_nuke_publish\\input\\dumps" # # # # handler.backup_to_dump("avalon", backup_dir, True, collection="test_project") # handler.setup_from_dump("test_db", backup_dir, True, db_name_out="avalon", collection="test_project") # handler.setup_from_sql_file("test_db", "c:\\projects\\sql\\item.sql", # collection="test_project", # drop=False, mode="upsert") # handler.setup_from_sql("test_db", "c:\\projects\\sql", # collection="test_project", # drop=False, mode="upsert")