Init commit of synchronization server for cloud destination

WIP - implemented GDrive upload and update in MongoDB
credentials.json file ommitted for security reasons, to be decided
which flow and how credentials should be stored.
This commit is contained in:
petr.kalis 2020-07-16 20:24:52 +02:00
parent 19dadf5b0b
commit c3219554c0
5 changed files with 716 additions and 0 deletions

View file

@ -0,0 +1,5 @@
from .sync_server import SyncServer
def tray_init(tray_widget, main_widget):
return SyncServer()

View file

@ -0,0 +1,428 @@
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import random
# If modifying these scopes, delete the file token.pickle.
from googleapiclient.http import MediaFileUpload
SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly',
'https://www.googleapis.com/auth/drive.file'] # for write|delete
files = [
'c:\\projects\\Test\\Assets\\Cylinder\\publish\\look\\lookMain\\v001\\test_Cylinder_lookMain_v001.ma']
class GDriveHandler():
FOLDER_STR = 'application/vnd.google-apps.folder'
def __init__(self):
self.service = self._get_gd_service()
self.root = self.service.files().get(fileId='root').execute()
self.tree = self._build_tree(self.list_folders())
def _get_gd_service(self):
"""
Authorize client with 'credentials.json', stores token into
'token.pickle'.
Produces service that communicates with GDrive API.
:return:
"""
creds = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
os.path.dirname(__file__) + '/credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
service = build('drive', 'v3', credentials=creds)
return service
def _build_tree(self, folders):
"""
Create in-memory structure resolving paths to folder id as recursive
quering might be slower.
Initialized in the time of class initialization.
Maybe should be persisted
:param folders: list of dictionaries with folder metadata
:return: <dictionary> - path as a key, folder id as a value
"""
tree = {}
tree["/"] = {"id": self.root["id"]}
ending_by = {}
ending_by[self.root["id"]] = "/" + self.root["name"]
not_changed_times = 0
folders_cnt = len(folders) * 5
# exit loop for weird unresolved folders, raise ValueError, safety
while folders and not_changed_times < folders_cnt:
folder = folders.pop(0)
parents = folder.get("parents", [])
# weird cases, shared folders, etc, parent under root
if not parents:
parent = self.root["id"]
else:
parent = parents[0]
if folder["id"] == self.root["id"]: # do not process root
continue
if parent in ending_by:
path_key = ending_by[parent] + "/" + folder["name"]
ending_by[folder["id"]] = path_key
tree[path_key] = {"id": folder["id"]}
else:
not_changed_times += 1
if not_changed_times % 10 == 0: # try to reshuffle deadlocks
random.shuffle(folders)
folders.append(folder) # dont know parent, wait until shows up
if len(folders) > 0:
raise ValueError("Some folders path are not resolved {}"
.format(folders))
return tree
def get_root_name(self):
"""
Return name of root folder. Needs to be used as a beginning of
absolute gdrive path
:return: <string> - plain name, no '/'
"""
return self.root["name"]
def create_folder(self, path):
"""
Create all nonexistent folders and subfolders in 'path'.
Updates self.tree structure with new paths
:param path: absolute path, starts with GDrive root
:return: <string> folder id of lowest subfolder from 'path'
"""
folder_id = self.folder_path_exists(path)
if folder_id:
return folder_id
parts = path.split('/')
folders_to_create = []
while parts:
folders_to_create.append(parts.pop())
path = '/'.join(parts)
folder_id = self.folder_path_exists(path) # lowest common path
if folder_id:
while folders_to_create:
new_folder_name = folders_to_create.pop()
folder_metadata = {
'name': new_folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [folder_id]
}
folder = self.service.files().create(body=folder_metadata,
fields='id').execute()
folder_id = folder["id"]
new_path_key = path + '/' + new_folder_name
self.tree[new_path_key] = {"id": folder_id}
path = new_path_key
return folder_id
def upload_file(self, source_path, path, overwrite=False):
"""
Uploads single file from 'source_path' to destination 'path'.
It creates all folders on the path if are not existing.
:param source_path:
:param path: absolute path with or without name of the file
:param overwrite: replace existing file
:return: <string> file_id of created/modified file
"""
if not os.path.isfile(source_path):
raise ValueError("Source file {} doesn't exist.".format(source_path))
root, ext = os.path.splitext(path)
if ext:
# full path
target_name = os.path.basename(path)
path = os.path.dirname(path)
else:
target_name = os.path.basename(source_path)
file = self.file_path_exists(path + "/" + target_name)
if file and not overwrite:
raise ValueError("File already exists, "
"use 'overwrite' argument")
folder_id = self.create_folder(path)
file_metadata = {
'name': target_name
}
media = MediaFileUpload(source_path,
mimetype='application/octet-stream',
resumable=True)
if not file:
file_metadata['parents'] = [folder_id] # update doesnt like parent
file = self.service.files().create(body=file_metadata,
media_body=media,
fields='id').execute()
else:
file = self.service.files().update(fileId=file["id"],
body=file_metadata,
media_body=media,
fields='id').execute()
return file["id"]
def delete_folder(self, path, force=False):
"""
Deletes folder on GDrive. Checks if folder contains any files or
subfolders. In that case raises error, could be overriden by
'force' argument.
In that case deletes folder on 'path' and all its children.
:param path: absolute path on GDrive
:param force: delete even if children in folder
:return: None
"""
folder_id = self.folder_path_exists(path)
if not folder_id:
raise ValueError("Not valid folder path {}".format(path))
fields = 'nextPageToken, files(id, name, parents)'
q = self._handle_q("'{}' in parents ".format(folder_id))
response = self.service.files().list(
q=q,
spaces='drive',
pageSize='1',
fields=fields).execute()
children = response.get('files', [])
if children and not force:
raise ValueError("Folder {} is not empty, use 'force'".format(path))
self.service.files().delete(fileId=folder_id).execute()
def delete_file(self, path):
"""
Deletes file from 'path'. Expects path to specific file.
:param path: absolute path to particular file
:return: None
"""
file = self.file_path_exists(path)
if not file:
raise ValueError("File {} doesn't exist")
self.service.files().delete(fileId=file["id"]).execute()
def _get_folder_metadata(self, path):
"""
Get info about folder with 'path'
:param id: <string>
:return: <dictionary> with metadata or raises ValueError
"""
try:
return self.tree[path]
except:
raise ValueError("Uknown folder id {}".format(id))
def list_folders(self):
""" Lists all folders in GDrive.
Used to build in-memory structure of path to folder ids model.
:return: list of dictionaries('id', 'name', [parents])
"""
folders = []
page_token = None
fields = 'nextPageToken, files(id, name, parents)'
while True:
q = self._handle_q("mimeType='application/vnd.google-apps.folder'")
response = self.service.files().list(q=q,
spaces='drive',
fields=fields,
pageToken=page_token).execute()
folders.extend(response.get('files', []))
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return folders
def list_files(self):
""" Lists all files in GDrive
Runs loop through possibly multiple pages. Result could be large,
if it would be a problem, change it to generator
:return: list of dictionaries('id', 'name', [parents])
"""
files = []
page_token = None
fields = 'nextPageToken, files(id, name, parents)'
while True:
q = self._handle_q("")
response = self.service.files().\
list(q=q,
spaces='drive',
fields=fields,
pageToken=page_token).execute()
files.extend(response.get('files', []))
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return files
def folder_path_exists(self, file_path):
"""
Checks if path from 'file_path' exists. If so, return its folder id.
:param file_path: gdrive path with / as a separator
:return: <string> folder id or False
"""
if not file_path:
return False
root, ext = os.path.splitext(file_path)
if not ext:
file_path += '/'
dir_path = os.path.dirname(file_path)
path = self.tree.get(dir_path, None)
if path:
return path["id"]
return False
def file_path_exists(self, file_path):
"""
Checks if 'file_path' exists on GDrive
:param file_path: separated by '/', from root, with file name
:return: file metadata | False if not found
"""
folder_id = self.folder_path_exists(file_path)
if folder_id:
return self.file_exists(os.path.basename(file_path), folder_id)
return False
def file_exists(self, file_name, folder_id):
"""
Checks if 'file_name' exists in 'folder_id'
:param file_name:
:param folder_id: google drive folder id
:return: file metadata, False if not found
"""
q = self._handle_q("name = '{}' and '{}' in parents"
.format(file_name, folder_id)
)
response = self.service.files().list(
q=q,
spaces='drive',
fields='nextPageToken, files(id, name, parents, '
'mimeType, modifiedTime,size,md5Checksum)').execute()
if len(response.get('files')) > 1:
raise ValueError("Too many files returned")
file = response.get('files', [])
if not file:
return False
return file[0]
def _handle_q(self, q, trashed=False, hidden=False):
""" API list call contain trashed and hidden files/folder by default.
Usually we dont want those, must be included in query explicitly.
:param q: <string> query portion
:param trashed: False|True
:param hidden: False|True
:return: <string>
"""
parts = [q]
if not trashed:
parts.append(" trashed = false ")
# if not hidden:
# parts.append(" hidden = false ")
return " and ".join(parts)
def _iterfiles(self, name=None, is_folder=None, parent=None,
order_by='folder,name,createdTime'):
"""
Function to list resourses in folders, used by _walk
:param name:
:param is_folder:
:param parent:
:param order_by:
:return:
"""
q = []
if name is not None:
q.append("name = '%s'" % name.replace("'", "\\'"))
if is_folder is not None:
q.append("mimeType %s '%s'" % (
'=' if is_folder else '!=', self.FOLDER_STR))
if parent is not None:
q.append("'%s' in parents" % parent.replace("'", "\\'"))
params = {'pageToken': None, 'orderBy': order_by}
if q:
params['q'] = ' and '.join(q)
while True:
response = self.service.files().list(**params).execute()
for f in response['files']:
yield f
try:
params['pageToken'] = response['nextPageToken']
except KeyError:
return
def _walk(self, top='root', by_name=False):
"""
Recurcively walk through folders, could be api requests expensive.
:param top: <string> folder id to start walking, 'root' is total root
:param by_name:
:return: <generator>
"""
if by_name:
top, = self.iterfiles(name=top, is_folder=True)
else:
top = self.service.files().get(fileId=top).execute()
if top['mimeType'] != self.FOLDER_STR:
raise ValueError('not a folder: %r' % top)
stack = [((top['name'],), top)]
while stack:
path, top = stack.pop()
dirs, files = is_file = [], []
for f in self.iterfiles(parent=top['id']):
is_file[f['mimeType'] != self.FOLDER_STR].append(f)
yield path, top, dirs, files
if dirs:
stack.extend((path + (d['name'],), d) for d in reversed(dirs))
if __name__ == '__main__':
gd = GDriveHandler()
# print(gd.list_folders())
# print(gd.walk())
# print(len(gd.list_folders()))
# print((gd.list_folders()[0]))
print(gd.get_folder('d'))
print(gd.root)
#print(gd.get_subfolders('Test'))
# print(gd.get_folder('2017454654645'))
print(gd.tree)
# print(gd.folder_path_exists('/My Drive/Test'))
# print(gd.file_path_exists('/My Drive/Clover/clouser.txt'))
#print(gd.create_folder('/My Drive/Test/new/new/new/new'))
print(gd.upload_file(files[0], '/My Drive/Test/new/new/new/new_file.ma', overwrite=True))
print(gd.delete_file('/My Drive/Test/new/new/new/new_file.ma'))
print(gd.delete_folder('/My Drive/Test/new/new/new/'))

View file

@ -0,0 +1,27 @@
from enum import Enum
from .gdrive import GDriveHandler
class Providers(Enum):
GDRIVE = 'gdrive'
class ProviderFactory:
"""
Factory class as a creator of multiple cloud destination.
Each new implementation needs to be registered and added to Providers
enum.
"""
def __init__(self):
self.providers = {}
def register_provider(self, provider, creator):
self.providers[provider] = creator
def get_provider(self, provider):
creator = self.providers.get(provider)
if not creator:
raise ValueError("provider")
return creator()
factory = ProviderFactory()
factory.register_provider('gdrive', GDriveHandler)

View file

@ -0,0 +1,251 @@
from pype.api import config, Logger
from avalon import io
import threading
from aiohttp import web
import asyncio
from enum import Enum
import datetime
from .providers import providers
log = Logger().get_logger("SyncServer")
# test object 5eeb25e411e06a16209ab78e
class SyncStatus(Enum):
DO_NOTHING = 0
DO_UPLOAD = 1
DO_DOWNLOAD = 2
class SyncServer():
"""
WIP
Synchronization server that is synching published files from local to
any of implemented providers (like GDrive, S3 etc.)
Runs in the background and checks all representations, looks for files
that are marked to be in different location than 'studio' (temporary),
checks if 'created_dt' field is present denoting successfull synch
with provider destination.
''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive
"files" : [
{
"path" : "{root}/Test/Assets/Cylinder/publish/look/lookMain/v010/
test_Cylinder_lookMain_v010.ma",
"_id" : ObjectId("5eeb25e411e06a16209ab78f"),
"hash" : "test_Cylinder_lookMain_v010,ma|1592468963,24|4822",
"size" : NumberLong(4822),
"sites" : {
"studio" : {
"created_dt" : ISODate("2020-05-22T08:05:44.000Z")
},
"gdrive" : {
"id" : ObjectId("5eeb25e411e06a16209ab78f"),
"created_dt" : ISODate("2020-07-16T17:54:35.833Z")
}
}
},
'''
It is expected that multiple providers will be implemented in separate
classes and registered in 'providers.py'.
"""
def __init__(self):
self.qaction = None
self.failed_icon = None
self._is_running = False
self.presets = None
if not io.Session:
io.install()
io.Session['AVALON_PROJECT'] = 'Test'
try:
self.presets = config.get_presets()["services"]["sync_server"]
except Exception:
log.debug((
"There are not set presets for SyncServer."
" No credentials provided, no synching possible"
).format(str(self.presets)))
self.sync_server_thread = SynchServerThread(self)
def get_sync_representations(self):
"""
Get representations.
TODO: filter out representations that shouldnt be synced
:return: <list>
"""
representations = io.find({
"type": "representation"
})
return representations
def check_status(self, file, representation, provider):
"""
Check synchronization status for single 'file' of single
'representation' by single 'provider'.
(Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive
:param file: <dictionary> of file from representation in Mongo
:param representation: <dictionary> of representation
:param provider: <string> - gdrive, gdc etc.
:return: <string> - one of SyncStatus
"""
sites = file.get("sites") or {}
if isinstance(sites, list): # temporary, old format of 'sites'
return SyncStatus.DO_NOTHING
provider = sites.get(provider) or {}
if provider:
created_dt = provider.get("created_dt")
if not created_dt:
return SyncStatus.DO_UPLOAD
return SyncStatus.DO_NOTHING
async def upload(self, file, representation, provider):
"""
Upload single 'file' of a 'representation' to 'provider'.
Source url is taken from 'file' portion, where {root} placeholder
is replaced by 'representation.Context.root'
Provider could be one of implemented in provider.py.
Updates MongoDB, fills in id of file from provider (ie. file_id
from GDrive), 'created_dt' - time of upload
:param file: <dictionary> of file from representation in Mongo
:param representation: <dictionary> of representation
:param provider: <string> - gdrive, gdc etc.
:return:
"""
await asyncio.sleep(0.1)
handler = providers.factory.get_provider(provider)
local_root = representation.get("context", {}).get("root")
if not local_root:
raise ValueError("Unknown local root for file {}")
source_file = file.get("path", "").replace('{root}', local_root)
target_root = '/{}'.format(handler.get_root_name())
target_file = file.get("path", "").replace('{root}', target_root)
new_file_id = handler.upload_file(source_file,
target_file,
overwrite=True)
if new_file_id:
representation_id = representation.get("_id")
file_id = file.get("_id")
filter = {
"_id": representation_id,
"files._id": file_id
}
io.update_many(
filter
,
{"$set": {"files.$.sites.gdrive.id": new_file_id,
"files.$.sites.gdrive.created_dt":
datetime.datetime.utcnow()}}
)
log.info("file {} uploaded {}".format(source_file, new_file_id))
async def download(self, file, representation, provider):
pass
def tray_start(self):
self.sync_server_thread.start()
def tray_exit(self):
self.stop()
@property
def is_running(self):
return self.sync_server_thread.is_running
def stop(self):
if not self.is_running:
return
try:
log.debug("Stopping synch server server")
self.sync_server_thread.is_running = False
self.sync_server_thread.stop()
except Exception:
log.warning(
"Error has happened during Killing synchserver server",
exc_info=True
)
def thread_stopped(self):
self._is_running = False
class SynchServerThread(threading.Thread):
def __init__(self, module):
super(SynchServerThread, self).__init__()
self.module = module
self.loop = None
def run(self):
self.is_running = True
try:
log.info("Starting synchserver server")
self.loop = asyncio.new_event_loop() # create new loop for thread
asyncio.set_event_loop(self.loop)
asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
asyncio.ensure_future(self.sync_loop(), loop=self.loop)
self.loop.run_forever()
except Exception:
log.warning(
"Synch Server service has failed", exc_info=True
)
finally:
self.loop.close() # optional
async def sync_loop(self):
while self.is_running:
sync_representations = self.module.get_sync_representations()
for provider in providers.factory.providers: # TODO clumsy
for sync in sync_representations:
files = sync.get("files") or {}
if files:
for file in files:
status = self.module.check_status(file, sync,
provider)
if status == SyncStatus.DO_UPLOAD:
await self.module.upload(file, sync, provider)
if status == SyncStatus.DO_DOWNLOAD:
await self.module.download(file, sync, provider)
await asyncio.sleep(60)
def stop(self):
"""Sets is_running flag to false, 'check_shutdown' shuts server down"""
self.is_running = False
async def check_shutdown(self):
""" Future that is running and checks if server should be running
periodically.
"""
while self.is_running:
await asyncio.sleep(0.5)
tasks = [task for task in asyncio.all_tasks() if
task is not asyncio.current_task()]
list(map(lambda task: task.cancel(), tasks)) # cancel all the tasks
results = await asyncio.gather(*tasks, return_exceptions=True)
log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
await self.loop.shutdown_asyncgens()
# to really make sure everything else has time to stop
await asyncio.sleep(0.07)
self.loop.stop()

View file

@ -54,5 +54,10 @@
"type": "module",
"import_path": "pype.modules.adobe_communicator",
"fromlist": ["pype", "modules"]
}, {
"title": "Sync Server",
"type": "module",
"import_path": "pype.modules.sync_server",
"fromlist": ["pype","modules"]
}
]