Merge pull request #876 from pypeclub/feature/817-syncserver-gui

Feature #817 syncserver gui
This commit is contained in:
Milan Kolar 2021-01-20 11:17:33 +01:00 committed by GitHub
commit bbbeb522d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1756 additions and 65 deletions

View file

@ -8,6 +8,7 @@ from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from pype.api import Logger
from pype.api import get_system_settings
from ..utils import time_function
import time
SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly',
'https://www.googleapis.com/auth/drive.file',
@ -42,6 +43,7 @@ class GDriveHandler(AbstractProvider):
"""
FOLDER_STR = 'application/vnd.google-apps.folder'
MY_DRIVE_STR = 'My Drive' # name of root folder of regular Google drive
CHUNK_SIZE = 2097152 # must be divisible by 256!
def __init__(self, site_name, tree=None, presets=None):
self.presets = None
@ -277,7 +279,9 @@ class GDriveHandler(AbstractProvider):
path = new_path_key
return folder_id
def upload_file(self, source_path, path, overwrite=False):
def upload_file(self, source_path, path,
server, collection, file, representation, site,
overwrite=False):
"""
Uploads single file from 'source_path' to destination 'path'.
It creates all folders on the path if are not existing.
@ -287,6 +291,13 @@ class GDriveHandler(AbstractProvider):
path (string): absolute path with or without name of the file
overwrite (boolean): replace existing file
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
Returns:
(string) file_id of created/modified file ,
throws FileExistsError, FileNotFoundError exceptions
@ -302,8 +313,8 @@ class GDriveHandler(AbstractProvider):
path = os.path.dirname(path)
else:
target_name = os.path.basename(source_path)
file = self.file_path_exists(path + "/" + target_name)
if file and not overwrite:
target_file = self.file_path_exists(path + "/" + target_name)
if target_file and not overwrite:
raise FileExistsError("File already exists, "
"use 'overwrite' argument")
@ -316,23 +327,45 @@ class GDriveHandler(AbstractProvider):
}
media = MediaFileUpload(source_path,
mimetype='application/octet-stream',
chunksize=self.CHUNK_SIZE,
resumable=True)
try:
if not file:
if not target_file:
# update doesnt like parent
file_metadata['parents'] = [folder_id]
file = self.service.files().create(body=file_metadata,
supportsAllDrives=True,
media_body=media,
fields='id').execute()
request = self.service.files().create(body=file_metadata,
supportsAllDrives=True,
media_body=media,
fields='id')
else:
file = self.service.files().update(fileId=file["id"],
body=file_metadata,
supportsAllDrives=True,
media_body=media,
fields='id').execute()
request = self.service.files().update(fileId=target_file["id"],
body=file_metadata,
supportsAllDrives=True,
media_body=media,
fields='id')
media.stream()
log.debug("Start Upload! {}".format(source_path))
last_tick = status = response = None
status_val = 0
while response is None:
if status:
status_val = float(status.progress())
if not last_tick or \
time.time() - last_tick >= server.LOG_PROGRESS_SEC:
last_tick = time.time()
log.debug("Uploaded %d%%." %
int(status_val * 100))
server.update_db(collection=collection,
new_file_id=None,
file=file,
representation=representation,
site=site,
progress=status_val
)
status, response = request.next_chunk()
except errors.HttpError as ex:
if ex.resp['status'] == '404':
@ -344,13 +377,14 @@ class GDriveHandler(AbstractProvider):
log.warning("Forbidden received, hit quota. "
"Injecting 60s delay.")
import time
time.sleep(60)
return False
raise
return file["id"]
return response['id']
def download_file(self, source_path, local_path, overwrite=False):
def download_file(self, source_path, local_path,
server, collection, file, representation, site,
overwrite=False):
"""
Downloads single file from 'source_path' (remote) to 'local_path'.
It creates all folders on the local_path if are not existing.
@ -361,6 +395,13 @@ class GDriveHandler(AbstractProvider):
local_path (string): absolute path with or without name of the file
overwrite (boolean): replace existing file
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
Returns:
(string) file_id of created/modified file ,
throws FileExistsError, FileNotFoundError exceptions
@ -378,9 +419,9 @@ class GDriveHandler(AbstractProvider):
else: # just folder, get file name from source
target_name = os.path.basename(source_path)
file = os.path.isfile(local_path + "/" + target_name)
local_file = os.path.isfile(local_path + "/" + target_name)
if file and not overwrite:
if local_file and not overwrite:
raise FileExistsError("File already exists, "
"use 'overwrite' argument")
@ -389,9 +430,24 @@ class GDriveHandler(AbstractProvider):
with open(local_path + "/" + target_name, "wb") as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
last_tick = status = response = None
status_val = 0
while response is None:
if status:
status_val = float(status.progress())
if not last_tick or \
time.time() - last_tick >= server.LOG_PROGRESS_SEC:
last_tick = time.time()
log.debug("Downloaded %d%%." %
int(status_val * 100))
server.update_db(collection=collection,
new_file_id=None,
file=file,
representation=representation,
site=site,
progress=status_val
)
status, response = downloader.next_chunk()
return target_name

Binary file not shown.

After

Width:  |  Height:  |  Size: 975 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 557 B

View file

@ -18,7 +18,7 @@ from .utils import time_function
import six
from pype.lib import PypeLogger
from .. import PypeModule, ITrayService
from .. import PypeModule, ITrayModule
if six.PY2:
web = asyncio = STATIC_DIR = WebSocketAsync = None
@ -34,7 +34,7 @@ class SyncStatus(Enum):
DO_DOWNLOAD = 2
class SyncServer(PypeModule, ITrayService):
class SyncServer(PypeModule, ITrayModule):
"""
Synchronization server that is syncing published files from local to
any of implemented providers (like GDrive, S3 etc.)
@ -92,6 +92,7 @@ class SyncServer(PypeModule, ITrayService):
# set 0 to no limit
REPRESENTATION_LIMIT = 100
DEFAULT_SITE = 'studio'
LOG_PROGRESS_SEC = 5 # how often log progress to DB
name = "sync_server"
label = "Sync Server"
@ -116,6 +117,8 @@ class SyncServer(PypeModule, ITrayService):
self.presets = None # settings for all enabled projects for sync
self.sync_server_thread = None # asyncio requires new thread
self.action_show_widget = None
def connect_with_modules(self, *_a, **kw):
return
@ -131,21 +134,26 @@ class SyncServer(PypeModule, ITrayService):
self.presets = None
self.lock = threading.Lock()
self.connection = AvalonMongoDB()
self.connection.install()
try:
self.presets = self.get_synced_presets()
self.set_active_sites(self.presets)
self.sync_server_thread = SyncServerThread(self)
from .tray.app import SyncServerWindow
self.widget = SyncServerWindow(self)
except ValueError:
log.info("No system setting for sync. Not syncing.")
log.info("No system setting for sync. Not syncing.", exc_info=True)
self.enabled = False
except KeyError:
log.info((
"There are not set presets for SyncServer OR "
"Credentials provided are invalid, "
"no syncing possible").
format(str(self.presets)), exc_info=True)
self.enabled = False
def tray_start(self):
"""
@ -185,6 +193,19 @@ class SyncServer(PypeModule, ITrayService):
exc_info=True
)
def tray_menu(self, parent_menu):
if not self.enabled:
return
from Qt import QtWidgets
"""Add menu or action to Tray(or parent)'s menu"""
action = QtWidgets.QAction("SyncServer", parent_menu)
action.triggered.connect(self.show_widget)
parent_menu.addAction(action)
parent_menu.addSeparator()
self.action_show_widget = action
@property
def is_running(self):
return self.sync_server_thread.is_running
@ -245,7 +266,8 @@ class SyncServer(PypeModule, ITrayService):
settings = get_project_settings(project_name)
sync_settings = settings.get("global")["sync_server"]
if not sync_settings:
log.info("No project setting for Sync Server, not syncing.")
log.info("No project setting for {}, not syncing.".
format(project_name))
return {}
if sync_settings.get("enabled"):
return sync_settings
@ -406,8 +428,8 @@ class SyncServer(PypeModule, ITrayService):
return SyncStatus.DO_NOTHING
async def upload(self, file, representation, provider_name, site_name,
tree=None, preset=None):
async def upload(self, collection, file, representation, provider_name,
site_name, tree=None, preset=None):
"""
Upload single 'file' of a 'representation' to 'provider'.
Source url is taken from 'file' portion, where {root} placeholder
@ -418,6 +440,7 @@ class SyncServer(PypeModule, ITrayService):
from GDrive), 'created_dt' - time of upload
Args:
collection (str): source collection
file (dictionary): of file from representation in Mongo
representation (dictionary): of representation
provider_name (string): gdrive, gdc etc.
@ -447,21 +470,28 @@ class SyncServer(PypeModule, ITrayService):
err = "Folder {} wasn't created. Check permissions.".\
format(target_folder)
raise NotADirectoryError(err)
_, remote_site = self.get_sites_for_project(collection)
loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None,
handler.upload_file,
local_file,
remote_file,
True)
self,
collection,
file,
representation,
remote_site,
True
)
return file_id
async def download(self, file, representation, provider_name,
async def download(self, collection, file, representation, provider_name,
site_name, tree=None, preset=None):
"""
Downloads file to local folder denoted in representation.Context.
Args:
collection (str): source collection
file (dictionary) : info about processed file
representation (dictionary): repr that 'file' belongs to
provider_name (string): 'gdrive' etc
@ -485,26 +515,37 @@ class SyncServer(PypeModule, ITrayService):
local_folder = os.path.dirname(local_file)
os.makedirs(local_folder, exist_ok=True)
local_site, _ = self.get_sites_for_project(collection)
loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None,
handler.download_file,
remote_file,
local_file,
False)
False,
self,
collection,
file,
representation,
local_site
)
return file_id
def update_db(self, new_file_id, file, representation, provider_name,
error=None):
def update_db(self, collection, new_file_id, file, representation,
site, error=None, progress=None):
"""
Update 'provider' portion of records in DB with success (file_id)
or error (exception)
Args:
collection (string): name of project - force to db connection as
each file might come from different collection
new_file_id (string):
file (dictionary): info about processed file (pulled from DB)
representation (dictionary): parent repr of file (from DB)
provider_name (string): label ('gdrive', 'S3')
site (string): label ('gdrive', 'S3')
error (string): exception message
progress (float): 0-1 of progress of upload/download
Returns:
None
@ -518,26 +559,33 @@ class SyncServer(PypeModule, ITrayService):
file_index, _ = self._get_file_info(representation.get('files', []),
file_id)
site_index, _ = self._get_provider_rec(file.get('sites', []),
provider_name)
site)
update = {}
if new_file_id:
update["$set"] = self._get_success_dict(file_index, site_index,
new_file_id)
# reset previous errors if any
update["$unset"] = self._get_error_dict(file_index, site_index,
"", "")
"", "", "")
elif progress is not None:
update["$set"] = self._get_progress_dict(file_index, site_index,
progress)
else:
tries = self._get_tries_count(file, provider_name)
tries = self._get_tries_count(file, site)
tries += 1
update["$set"] = self._get_error_dict(file_index, site_index,
error, tries)
self.connection.Session["AVALON_PROJECT"] = collection
self.connection.update_one(
query,
update
)
if progress is not None:
return
status = 'failed'
error_str = 'with error {}'.format(error)
if new_file_id:
@ -553,7 +601,7 @@ class SyncServer(PypeModule, ITrayService):
def _get_file_info(self, files, _id):
"""
Return record from list of records which name matches to 'provider'
Could be possibly refactored with '_get_file_info' together.
Could be possibly refactored with '_get_provider_rec' together.
Args:
files (list): of dictionaries with info about published files
@ -590,7 +638,7 @@ class SyncServer(PypeModule, ITrayService):
return -1, None
def reset_provider_for_file(self, collection, representation_id,
file_id, site_name):
file_id, side):
"""
Reset information about synchronization for particular 'file_id'
and provider.
@ -599,7 +647,7 @@ class SyncServer(PypeModule, ITrayService):
collection (string): name of project (eg. collection) in DB
representation_id(string): _id of representation
file_id (string): file _id in representation
site_name (string): 'gdrive', 'S3' etc
side (string): local or remote side
Returns:
None
"""
@ -607,19 +655,25 @@ class SyncServer(PypeModule, ITrayService):
query = {
"_id": ObjectId(representation_id)
}
self.connection.Session["AVALON_PROJECT"] = collection
representation = list(self.connection.find(query))
representation = list(self.connection.database[collection].find(query))
if not representation:
raise ValueError("Representation {} not found in {}".
format(representation_id, collection))
local_site, remote_site = self.get_sites_for_project(collection)
if side == 'local':
site_name = local_site
else:
site_name = remote_site
files = representation[0].get('files', [])
file_index, _ = self._get_file_info(files,
file_id)
site_index, _ = self._get_provider_rec(files[file_index].
get('sites', []),
site_name)
if file_index > 0 and site_index > 0:
if file_index >= 0 and site_index >= 0:
elem = {"name": site_name}
update = {
"$set": {"files.{}.sites.{}".format(file_index, site_index):
@ -627,7 +681,7 @@ class SyncServer(PypeModule, ITrayService):
}
}
self.connection.update_one(
self.connection.database[collection].update_one(
query,
update
)
@ -641,6 +695,10 @@ class SyncServer(PypeModule, ITrayService):
"""
return int(self.presets[project_name]["config"]["loop_delay"])
def show_widget(self):
"""Show dialog to enter credentials"""
self.widget.show()
def _get_success_dict(self, file_index, site_index, new_file_id):
"""
Provide success metadata ("id", "created_dt") to be stored in Db.
@ -660,7 +718,8 @@ class SyncServer(PypeModule, ITrayService):
datetime.utcnow()}
return val
def _get_error_dict(self, file_index, site_index, error="", tries=""):
def _get_error_dict(self, file_index, site_index,
error="", tries="", progress=""):
"""
Provide error metadata to be stored in Db.
Used for set (error and tries provided) or unset mode.
@ -675,7 +734,9 @@ class SyncServer(PypeModule, ITrayService):
val = {"files.{}.sites.{}.last_failed_dt".
format(file_index, site_index): datetime.utcnow(),
"files.{}.sites.{}.error".format(file_index, site_index): error,
"files.{}.sites.{}.tries".format(file_index, site_index): tries
"files.{}.sites.{}.tries".format(file_index, site_index): tries,
"files.{}.sites.{}.progress".format(file_index, site_index):
progress
}
return val
@ -703,6 +764,22 @@ class SyncServer(PypeModule, ITrayService):
_, rec = self._get_provider_rec(file.get("sites", []), provider)
return rec.get("tries", 0)
def _get_progress_dict(self, file_index, site_index, progress):
"""
Provide progress metadata to be stored in Db.
Used during upload/download for GUI to show.
Args:
file_index: (int) - index of modified file
site_index: (int) - index of modified site of modified file
progress: (float) - 0-1 progress of upload/download
Returns:
(dictionary)
"""
val = {"files.{}.sites.{}.progress".
format(file_index, site_index): progress
}
return val
def _get_local_file_path(self, file, local_root):
"""
Auxiliary function for replacing rootless path with real path
@ -848,23 +925,27 @@ class SyncServerThread(threading.Thread):
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.upload(file,
self.module.upload(collection,
file,
sync,
provider,
site,
tree,
site_preset))
task_files_to_process.append(task)
# store info for exception handling
# store info for exception handlingy
files_processed_info.append((file,
sync,
site))
site,
collection
))
processed_file_path.add(file_path)
if status == SyncStatus.DO_DOWNLOAD:
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.download(file,
self.module.download(collection,
file,
sync,
provider,
site,
@ -874,7 +955,9 @@ class SyncServerThread(threading.Thread):
files_processed_info.append((file,
sync,
local))
local,
collection
))
processed_file_path.add(file_path)
log.debug("Sync tasks count {}".
@ -884,12 +967,13 @@ class SyncServerThread(threading.Thread):
return_exceptions=True)
for file_id, info in zip(files_created,
files_processed_info):
file, representation, site = info
file, representation, site, collection = info
error = None
if isinstance(file_id, BaseException):
error = str(file_id)
file_id = None
self.module.update_db(file_id,
self.module.update_db(collection,
file_id,
file,
representation,
site,

File diff suppressed because it is too large Load diff

View file

@ -908,7 +908,8 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
file_info = self.prepare_file_info(path,
integrated_file_sizes[dest],
file_hash)
file_hash,
instance=instance)
output_resources.append(file_info)
return output_resources
@ -928,7 +929,8 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
dest += '.{}'.format(self.TMP_FILE_EXT)
return dest
def prepare_file_info(self, path, size=None, file_hash=None, sites=None):
def prepare_file_info(self, path, size=None, file_hash=None,
sites=None, instance=None):
""" Prepare information for one file (asset or resource)
Arguments:
@ -938,6 +940,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
sites(optional): array of published locations,
[ {'name':'studio', 'created_dt':date} by default
keys expected ['studio', 'site1', 'gdrive1']
instance(dict, optional): to get collected settings
Returns:
rec: dictionary with filled info
"""
@ -945,15 +948,18 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
remote_site = None
sync_server_presets = None
# manager = ModulesManager()
# sync_server = manager.modules_by_name["sync_server"]
# try:
# if sync_server.enabled:
# local_site, remote_site = sync_server.get_sites_for_project()
# except ValueError:
# log.debug(("There are not set presets for SyncServer."
# " No credentials provided, no synching possible").
# format(str(sync_server_presets)))
if (instance.context.data["system_settings"]
["modules"]
["sync_server"]
["enabled"]):
sync_server_presets = (instance.context.data["project_settings"]
["global"]
["sync_server"])
if sync_server_presets["enabled"]:
local_site = sync_server_presets["config"].\
get("active_site", "studio").strip()
remote_site = sync_server_presets["config"].get("remote_site")
rec = {
"_id": io.ObjectId(),

View file

@ -353,3 +353,39 @@ QScrollBar::up-arrow:vertical, QScrollBar::down-arrow:vertical {
QScrollBar::add-page:vertical, QScrollBar::sub-page:vertical {
background: none;
}
QTableView
{
border: 1px solid #444;
gridline-color: #6c6c6c;
background-color: #201F1F;
alternate-background-color:#21252B;
}
QHeaderView
{
border: 1px transparent;
border-radius: 2px;
margin: 0px;
padding: 0px;
}
QHeaderView::section {
background-color: #21252B;
/*color: silver;*/
padding: 4px;
border: 1px solid #6c6c6c;
border-radius: 0px;
text-align: center;
color: #969b9e;
font-weight: bold;
}
QTableView::item:pressed, QListView::item:pressed, QTreeView::item:pressed {
background: #78879b;
color: #FFFFFF;
}
QTableView::item:selected:active, QTreeView::item:selected:active, QListView::item:selected:active {
background: #3d8ec9;
}