From c3219554c027688d72e5db28f0ba00ddcc7d0000 Mon Sep 17 00:00:00 2001 From: "petr.kalis" Date: Thu, 16 Jul 2020 20:24:52 +0200 Subject: [PATCH] Init commit of synchronization server for cloud destination WIP - implemented GDrive upload and update in MongoDB credentials.json file ommitted for security reasons, to be decided which flow and how credentials should be stored. --- pype/modules/sync_server/__init__.py | 5 + pype/modules/sync_server/providers/gdrive.py | 428 ++++++++++++++++++ .../sync_server/providers/providers.py | 27 ++ pype/modules/sync_server/sync_server.py | 251 ++++++++++ pype/tools/tray/modules_imports.json | 5 + 5 files changed, 716 insertions(+) create mode 100644 pype/modules/sync_server/__init__.py create mode 100644 pype/modules/sync_server/providers/gdrive.py create mode 100644 pype/modules/sync_server/providers/providers.py create mode 100644 pype/modules/sync_server/sync_server.py diff --git a/pype/modules/sync_server/__init__.py b/pype/modules/sync_server/__init__.py new file mode 100644 index 0000000000..22a68bed9a --- /dev/null +++ b/pype/modules/sync_server/__init__.py @@ -0,0 +1,5 @@ +from .sync_server import SyncServer + + +def tray_init(tray_widget, main_widget): + return SyncServer() \ No newline at end of file diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py new file mode 100644 index 0000000000..20f304ee15 --- /dev/null +++ b/pype/modules/sync_server/providers/gdrive.py @@ -0,0 +1,428 @@ +from __future__ import print_function +import pickle +import os.path +from googleapiclient.discovery import build +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +import random + +# If modifying these scopes, delete the file token.pickle. +from googleapiclient.http import MediaFileUpload + +SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly', + 'https://www.googleapis.com/auth/drive.file'] # for write|delete + +files = [ + 'c:\\projects\\Test\\Assets\\Cylinder\\publish\\look\\lookMain\\v001\\test_Cylinder_lookMain_v001.ma'] + + +class GDriveHandler(): + FOLDER_STR = 'application/vnd.google-apps.folder' + + def __init__(self): + self.service = self._get_gd_service() + self.root = self.service.files().get(fileId='root').execute() + self.tree = self._build_tree(self.list_folders()) + + def _get_gd_service(self): + """ + Authorize client with 'credentials.json', stores token into + 'token.pickle'. + Produces service that communicates with GDrive API. + :return: + """ + creds = None + # The file token.pickle stores the user's access and refresh tokens, and is + # created automatically when the authorization flow completes for the first + # time. + if os.path.exists('token.pickle'): + with open('token.pickle', 'rb') as token: + creds = pickle.load(token) + # If there are no (valid) credentials available, let the user log in. + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file( + os.path.dirname(__file__) + '/credentials.json', SCOPES) + creds = flow.run_local_server(port=0) + # Save the credentials for the next run + with open('token.pickle', 'wb') as token: + pickle.dump(creds, token) + service = build('drive', 'v3', credentials=creds) + return service + + def _build_tree(self, folders): + """ + Create in-memory structure resolving paths to folder id as recursive + quering might be slower. + Initialized in the time of class initialization. + Maybe should be persisted + :param folders: list of dictionaries with folder metadata + :return: - path as a key, folder id as a value + """ + tree = {} + tree["/"] = {"id": self.root["id"]} + ending_by = {} + ending_by[self.root["id"]] = "/" + self.root["name"] + not_changed_times = 0 + folders_cnt = len(folders) * 5 + # exit loop for weird unresolved folders, raise ValueError, safety + while folders and not_changed_times < folders_cnt: + folder = folders.pop(0) + parents = folder.get("parents", []) + # weird cases, shared folders, etc, parent under root + if not parents: + parent = self.root["id"] + else: + parent = parents[0] + + if folder["id"] == self.root["id"]: # do not process root + continue + + if parent in ending_by: + path_key = ending_by[parent] + "/" + folder["name"] + ending_by[folder["id"]] = path_key + tree[path_key] = {"id": folder["id"]} + else: + not_changed_times += 1 + if not_changed_times % 10 == 0: # try to reshuffle deadlocks + random.shuffle(folders) + folders.append(folder) # dont know parent, wait until shows up + + if len(folders) > 0: + raise ValueError("Some folders path are not resolved {}" + .format(folders)) + + return tree + + def get_root_name(self): + """ + Return name of root folder. Needs to be used as a beginning of + absolute gdrive path + :return: - plain name, no '/' + """ + return self.root["name"] + + def create_folder(self, path): + """ + Create all nonexistent folders and subfolders in 'path'. + Updates self.tree structure with new paths + + :param path: absolute path, starts with GDrive root + :return: folder id of lowest subfolder from 'path' + """ + folder_id = self.folder_path_exists(path) + if folder_id: + return folder_id + + parts = path.split('/') + folders_to_create = [] + while parts: + folders_to_create.append(parts.pop()) + path = '/'.join(parts) + + folder_id = self.folder_path_exists(path) # lowest common path + if folder_id: + while folders_to_create: + new_folder_name = folders_to_create.pop() + folder_metadata = { + 'name': new_folder_name, + 'mimeType': 'application/vnd.google-apps.folder', + 'parents': [folder_id] + } + folder = self.service.files().create(body=folder_metadata, + fields='id').execute() + folder_id = folder["id"] + + new_path_key = path + '/' + new_folder_name + self.tree[new_path_key] = {"id": folder_id} + + path = new_path_key + + return folder_id + + def upload_file(self, source_path, path, overwrite=False): + """ + Uploads single file from 'source_path' to destination 'path'. + It creates all folders on the path if are not existing. + + :param source_path: + :param path: absolute path with or without name of the file + :param overwrite: replace existing file + :return: file_id of created/modified file + """ + if not os.path.isfile(source_path): + raise ValueError("Source file {} doesn't exist.".format(source_path)) + + root, ext = os.path.splitext(path) + + if ext: + # full path + target_name = os.path.basename(path) + path = os.path.dirname(path) + else: + target_name = os.path.basename(source_path) + + file = self.file_path_exists(path + "/" + target_name) + if file and not overwrite: + raise ValueError("File already exists, " + "use 'overwrite' argument") + + folder_id = self.create_folder(path) + file_metadata = { + 'name': target_name + } + media = MediaFileUpload(source_path, + mimetype='application/octet-stream', + resumable=True) + if not file: + file_metadata['parents'] = [folder_id] # update doesnt like parent + file = self.service.files().create(body=file_metadata, + media_body=media, + fields='id').execute() + else: + file = self.service.files().update(fileId=file["id"], + body=file_metadata, + media_body=media, + fields='id').execute() + + return file["id"] + + def delete_folder(self, path, force=False): + """ + Deletes folder on GDrive. Checks if folder contains any files or + subfolders. In that case raises error, could be overriden by + 'force' argument. + In that case deletes folder on 'path' and all its children. + + :param path: absolute path on GDrive + :param force: delete even if children in folder + :return: None + """ + folder_id = self.folder_path_exists(path) + if not folder_id: + raise ValueError("Not valid folder path {}".format(path)) + + fields = 'nextPageToken, files(id, name, parents)' + q = self._handle_q("'{}' in parents ".format(folder_id)) + response = self.service.files().list( + q=q, + spaces='drive', + pageSize='1', + fields=fields).execute() + children = response.get('files', []) + if children and not force: + raise ValueError("Folder {} is not empty, use 'force'".format(path)) + + self.service.files().delete(fileId=folder_id).execute() + + def delete_file(self, path): + """ + Deletes file from 'path'. Expects path to specific file. + :param path: absolute path to particular file + :return: None + """ + file = self.file_path_exists(path) + if not file: + raise ValueError("File {} doesn't exist") + self.service.files().delete(fileId=file["id"]).execute() + + def _get_folder_metadata(self, path): + """ + Get info about folder with 'path' + :param id: + :return: with metadata or raises ValueError + """ + try: + return self.tree[path] + except: + raise ValueError("Uknown folder id {}".format(id)) + + def list_folders(self): + """ Lists all folders in GDrive. + Used to build in-memory structure of path to folder ids model. + :return: list of dictionaries('id', 'name', [parents]) + """ + folders = [] + page_token = None + fields = 'nextPageToken, files(id, name, parents)' + while True: + q = self._handle_q("mimeType='application/vnd.google-apps.folder'") + response = self.service.files().list(q=q, + spaces='drive', + fields=fields, + pageToken=page_token).execute() + folders.extend(response.get('files', [])) + page_token = response.get('nextPageToken', None) + if page_token is None: + break + + return folders + + def list_files(self): + """ Lists all files in GDrive + Runs loop through possibly multiple pages. Result could be large, + if it would be a problem, change it to generator + :return: list of dictionaries('id', 'name', [parents]) + """ + files = [] + page_token = None + fields = 'nextPageToken, files(id, name, parents)' + while True: + q = self._handle_q("") + response = self.service.files().\ + list(q=q, + spaces='drive', + fields=fields, + pageToken=page_token).execute() + files.extend(response.get('files', [])) + page_token = response.get('nextPageToken', None) + if page_token is None: + break + + return files + + def folder_path_exists(self, file_path): + """ + Checks if path from 'file_path' exists. If so, return its folder id. + :param file_path: gdrive path with / as a separator + :return: folder id or False + """ + if not file_path: + return False + + root, ext = os.path.splitext(file_path) + if not ext: + file_path += '/' + + dir_path = os.path.dirname(file_path) + + path = self.tree.get(dir_path, None) + if path: + return path["id"] + + return False + + def file_path_exists(self, file_path): + """ + Checks if 'file_path' exists on GDrive + :param file_path: separated by '/', from root, with file name + :return: file metadata | False if not found + """ + folder_id = self.folder_path_exists(file_path) + if folder_id: + return self.file_exists(os.path.basename(file_path), folder_id) + return False + + def file_exists(self, file_name, folder_id): + """ + Checks if 'file_name' exists in 'folder_id' + :param file_name: + :param folder_id: google drive folder id + :return: file metadata, False if not found + """ + q = self._handle_q("name = '{}' and '{}' in parents" + .format(file_name, folder_id) + ) + response = self.service.files().list( + q=q, + spaces='drive', + fields='nextPageToken, files(id, name, parents, ' + 'mimeType, modifiedTime,size,md5Checksum)').execute() + if len(response.get('files')) > 1: + raise ValueError("Too many files returned") + + file = response.get('files', []) + if not file: + return False + return file[0] + + def _handle_q(self, q, trashed=False, hidden=False): + """ API list call contain trashed and hidden files/folder by default. + Usually we dont want those, must be included in query explicitly. + :param q: query portion + :param trashed: False|True + :param hidden: False|True + :return: + """ + parts = [q] + if not trashed: + parts.append(" trashed = false ") + # if not hidden: + # parts.append(" hidden = false ") + + return " and ".join(parts) + + def _iterfiles(self, name=None, is_folder=None, parent=None, + order_by='folder,name,createdTime'): + """ + Function to list resourses in folders, used by _walk + :param name: + :param is_folder: + :param parent: + :param order_by: + :return: + """ + q = [] + if name is not None: + q.append("name = '%s'" % name.replace("'", "\\'")) + if is_folder is not None: + q.append("mimeType %s '%s'" % ( + '=' if is_folder else '!=', self.FOLDER_STR)) + if parent is not None: + q.append("'%s' in parents" % parent.replace("'", "\\'")) + params = {'pageToken': None, 'orderBy': order_by} + if q: + params['q'] = ' and '.join(q) + while True: + response = self.service.files().list(**params).execute() + for f in response['files']: + yield f + try: + params['pageToken'] = response['nextPageToken'] + except KeyError: + return + + def _walk(self, top='root', by_name=False): + """ + Recurcively walk through folders, could be api requests expensive. + :param top: folder id to start walking, 'root' is total root + :param by_name: + :return: + """ + if by_name: + top, = self.iterfiles(name=top, is_folder=True) + else: + top = self.service.files().get(fileId=top).execute() + if top['mimeType'] != self.FOLDER_STR: + raise ValueError('not a folder: %r' % top) + stack = [((top['name'],), top)] + while stack: + path, top = stack.pop() + dirs, files = is_file = [], [] + for f in self.iterfiles(parent=top['id']): + is_file[f['mimeType'] != self.FOLDER_STR].append(f) + yield path, top, dirs, files + if dirs: + stack.extend((path + (d['name'],), d) for d in reversed(dirs)) + + +if __name__ == '__main__': + gd = GDriveHandler() + # print(gd.list_folders()) + # print(gd.walk()) + # print(len(gd.list_folders())) + # print((gd.list_folders()[0])) + print(gd.get_folder('d')) + print(gd.root) + #print(gd.get_subfolders('Test')) + # print(gd.get_folder('2017454654645')) + print(gd.tree) + # print(gd.folder_path_exists('/My Drive/Test')) + # print(gd.file_path_exists('/My Drive/Clover/clouser.txt')) + #print(gd.create_folder('/My Drive/Test/new/new/new/new')) + print(gd.upload_file(files[0], '/My Drive/Test/new/new/new/new_file.ma', overwrite=True)) + + print(gd.delete_file('/My Drive/Test/new/new/new/new_file.ma')) + print(gd.delete_folder('/My Drive/Test/new/new/new/')) diff --git a/pype/modules/sync_server/providers/providers.py b/pype/modules/sync_server/providers/providers.py new file mode 100644 index 0000000000..c537730f75 --- /dev/null +++ b/pype/modules/sync_server/providers/providers.py @@ -0,0 +1,27 @@ +from enum import Enum +from .gdrive import GDriveHandler + +class Providers(Enum): + GDRIVE = 'gdrive' + +class ProviderFactory: + """ + Factory class as a creator of multiple cloud destination. + Each new implementation needs to be registered and added to Providers + enum. + """ + def __init__(self): + self.providers = {} + + def register_provider(self, provider, creator): + self.providers[provider] = creator + + def get_provider(self, provider): + creator = self.providers.get(provider) + if not creator: + raise ValueError("provider") + + return creator() + +factory = ProviderFactory() +factory.register_provider('gdrive', GDriveHandler) \ No newline at end of file diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py new file mode 100644 index 0000000000..5aac98d83b --- /dev/null +++ b/pype/modules/sync_server/sync_server.py @@ -0,0 +1,251 @@ +from pype.api import config, Logger +from avalon import io + +import threading +from aiohttp import web +import asyncio + +from enum import Enum +import datetime + +from .providers import providers + +log = Logger().get_logger("SyncServer") + +# test object 5eeb25e411e06a16209ab78e + + +class SyncStatus(Enum): + DO_NOTHING = 0 + DO_UPLOAD = 1 + DO_DOWNLOAD = 2 + + +class SyncServer(): + """ + WIP + Synchronization server that is synching published files from local to + any of implemented providers (like GDrive, S3 etc.) + Runs in the background and checks all representations, looks for files + that are marked to be in different location than 'studio' (temporary), + checks if 'created_dt' field is present denoting successfull synch + with provider destination. + + ''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive + "files" : [ + { + "path" : "{root}/Test/Assets/Cylinder/publish/look/lookMain/v010/ + test_Cylinder_lookMain_v010.ma", + "_id" : ObjectId("5eeb25e411e06a16209ab78f"), + "hash" : "test_Cylinder_lookMain_v010,ma|1592468963,24|4822", + "size" : NumberLong(4822), + "sites" : { + "studio" : { + "created_dt" : ISODate("2020-05-22T08:05:44.000Z") + }, + "gdrive" : { + "id" : ObjectId("5eeb25e411e06a16209ab78f"), + "created_dt" : ISODate("2020-07-16T17:54:35.833Z") + } + } + }, + ''' + It is expected that multiple providers will be implemented in separate + classes and registered in 'providers.py'. + + """ + + + def __init__(self): + self.qaction = None + self.failed_icon = None + self._is_running = False + self.presets = None + + if not io.Session: + io.install() + + io.Session['AVALON_PROJECT'] = 'Test' + try: + self.presets = config.get_presets()["services"]["sync_server"] + except Exception: + + log.debug(( + "There are not set presets for SyncServer." + " No credentials provided, no synching possible" + ).format(str(self.presets))) + + self.sync_server_thread = SynchServerThread(self) + + def get_sync_representations(self): + """ + Get representations. + TODO: filter out representations that shouldnt be synced + :return: + """ + representations = io.find({ + "type": "representation" + }) + + return representations + + def check_status(self, file, representation, provider): + """ + Check synchronization status for single 'file' of single + 'representation' by single 'provider'. + (Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive + :param file: of file from representation in Mongo + :param representation: of representation + :param provider: - gdrive, gdc etc. + :return: - one of SyncStatus + """ + sites = file.get("sites") or {} + if isinstance(sites, list): # temporary, old format of 'sites' + return SyncStatus.DO_NOTHING + provider = sites.get(provider) or {} + if provider: + created_dt = provider.get("created_dt") + if not created_dt: + return SyncStatus.DO_UPLOAD + + return SyncStatus.DO_NOTHING + + async def upload(self, file, representation, provider): + """ + Upload single 'file' of a 'representation' to 'provider'. + Source url is taken from 'file' portion, where {root} placeholder + is replaced by 'representation.Context.root' + Provider could be one of implemented in provider.py. + + Updates MongoDB, fills in id of file from provider (ie. file_id + from GDrive), 'created_dt' - time of upload + + :param file: of file from representation in Mongo + :param representation: of representation + :param provider: - gdrive, gdc etc. + :return: + """ + await asyncio.sleep(0.1) + handler = providers.factory.get_provider(provider) + local_root = representation.get("context", {}).get("root") + if not local_root: + raise ValueError("Unknown local root for file {}") + source_file = file.get("path", "").replace('{root}', local_root) + target_root = '/{}'.format(handler.get_root_name()) + target_file = file.get("path", "").replace('{root}', target_root) + + new_file_id = handler.upload_file(source_file, + target_file, + overwrite=True) + if new_file_id: + representation_id = representation.get("_id") + file_id = file.get("_id") + filter = { + "_id": representation_id, + "files._id": file_id + } + + io.update_many( + filter + , + {"$set": {"files.$.sites.gdrive.id": new_file_id, + "files.$.sites.gdrive.created_dt": + datetime.datetime.utcnow()}} + ) + + log.info("file {} uploaded {}".format(source_file, new_file_id)) + + async def download(self, file, representation, provider): + pass + + def tray_start(self): + self.sync_server_thread.start() + + def tray_exit(self): + self.stop() + + @property + def is_running(self): + return self.sync_server_thread.is_running + + def stop(self): + if not self.is_running: + return + try: + log.debug("Stopping synch server server") + self.sync_server_thread.is_running = False + self.sync_server_thread.stop() + except Exception: + log.warning( + "Error has happened during Killing synchserver server", + exc_info=True + ) + + def thread_stopped(self): + self._is_running = False + +class SynchServerThread(threading.Thread): + + def __init__(self, module): + super(SynchServerThread, self).__init__() + self.module = module + self.loop = None + + def run(self): + self.is_running = True + + try: + log.info("Starting synchserver server") + self.loop = asyncio.new_event_loop() # create new loop for thread + asyncio.set_event_loop(self.loop) + + asyncio.ensure_future(self.check_shutdown(), loop=self.loop) + asyncio.ensure_future(self.sync_loop(), loop=self.loop) + self.loop.run_forever() + except Exception: + log.warning( + "Synch Server service has failed", exc_info=True + ) + finally: + self.loop.close() # optional + + async def sync_loop(self): + while self.is_running: + + sync_representations = self.module.get_sync_representations() + + for provider in providers.factory.providers: # TODO clumsy + for sync in sync_representations: + files = sync.get("files") or {} + if files: + for file in files: + status = self.module.check_status(file, sync, + provider) + + if status == SyncStatus.DO_UPLOAD: + await self.module.upload(file, sync, provider) + if status == SyncStatus.DO_DOWNLOAD: + await self.module.download(file, sync, provider) + + await asyncio.sleep(60) + + def stop(self): + """Sets is_running flag to false, 'check_shutdown' shuts server down""" + self.is_running = False + + async def check_shutdown(self): + """ Future that is running and checks if server should be running + periodically. + """ + while self.is_running: + await asyncio.sleep(0.5) + + tasks = [task for task in asyncio.all_tasks() if + task is not asyncio.current_task()] + list(map(lambda task: task.cancel(), tasks)) # cancel all the tasks + results = await asyncio.gather(*tasks, return_exceptions=True) + log.debug(f'Finished awaiting cancelled tasks, results: {results}...') + await self.loop.shutdown_asyncgens() + # to really make sure everything else has time to stop + await asyncio.sleep(0.07) + self.loop.stop() \ No newline at end of file diff --git a/pype/tools/tray/modules_imports.json b/pype/tools/tray/modules_imports.json index e9526dcddb..e7bdeda0d2 100644 --- a/pype/tools/tray/modules_imports.json +++ b/pype/tools/tray/modules_imports.json @@ -54,5 +54,10 @@ "type": "module", "import_path": "pype.modules.adobe_communicator", "fromlist": ["pype", "modules"] + }, { + "title": "Sync Server", + "type": "module", + "import_path": "pype.modules.sync_server", + "fromlist": ["pype","modules"] } ]