mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 12:54:40 +01:00
353 lines
10 KiB
Python
353 lines
10 KiB
Python
"""Prepares server package from addon repo to upload to server.
|
|
|
|
Requires Python 3.9. (Or at least 3.8+).
|
|
|
|
This script should be called from cloned addon repo.
|
|
|
|
It will produce 'package' subdirectory which could be pasted into server
|
|
addon directory directly (eg. into `ayon-backend/addons`).
|
|
|
|
Format of package folder:
|
|
ADDON_REPO/package/{addon name}/{addon version}
|
|
|
|
You can specify `--output_dir` in arguments to change output directory where
|
|
package will be created. Existing package directory will always be purged if
|
|
already present! This could be used to create package directly in server folder
|
|
if available.
|
|
|
|
Package contains server side files directly,
|
|
client side code zipped in `private` subfolder.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import shutil
|
|
import argparse
|
|
import platform
|
|
import logging
|
|
import collections
|
|
import zipfile
|
|
import hashlib
|
|
|
|
from typing import Optional
|
|
|
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
PACKAGE_PATH = os.path.join(CURRENT_DIR, "package.py")
|
|
package_content = {}
|
|
with open(PACKAGE_PATH, "r") as stream:
|
|
exec(stream.read(), package_content)
|
|
|
|
ADDON_VERSION = package_content["version"]
|
|
ADDON_NAME = package_content["name"]
|
|
ADDON_CLIENT_DIR = package_content["client_dir"]
|
|
CLIENT_VERSION_CONTENT = '''# -*- coding: utf-8 -*-
|
|
"""Package declaring AYON core addon version."""
|
|
__version__ = "{}"
|
|
'''
|
|
|
|
# Patterns of directories to be skipped for server part of addon
|
|
IGNORE_DIR_PATTERNS = [
|
|
re.compile(pattern)
|
|
for pattern in {
|
|
# Skip directories starting with '.'
|
|
r"^\.",
|
|
# Skip any pycache folders
|
|
"^__pycache__$"
|
|
}
|
|
]
|
|
|
|
# Patterns of files to be skipped for server part of addon
|
|
IGNORE_FILE_PATTERNS = [
|
|
re.compile(pattern)
|
|
for pattern in {
|
|
# Skip files starting with '.'
|
|
# NOTE this could be an issue in some cases
|
|
r"^\.",
|
|
# Skip '.pyc' files
|
|
r"\.pyc$"
|
|
}
|
|
]
|
|
|
|
|
|
def calculate_file_checksum(filepath, hash_algorithm, chunk_size=10000):
|
|
func = getattr(hashlib, hash_algorithm)
|
|
hash_obj = func()
|
|
with open(filepath, "rb") as f:
|
|
for chunk in iter(lambda: f.read(chunk_size), b""):
|
|
hash_obj.update(chunk)
|
|
return hash_obj.hexdigest()
|
|
|
|
|
|
class ZipFileLongPaths(zipfile.ZipFile):
|
|
"""Allows longer paths in zip files.
|
|
|
|
Regular DOS paths are limited to MAX_PATH (260) characters, including
|
|
the string's terminating NUL character.
|
|
That limit can be exceeded by using an extended-length path that
|
|
starts with the '\\?\' prefix.
|
|
"""
|
|
_is_windows = platform.system().lower() == "windows"
|
|
|
|
def _extract_member(self, member, tpath, pwd):
|
|
if self._is_windows:
|
|
tpath = os.path.abspath(tpath)
|
|
if tpath.startswith("\\\\"):
|
|
tpath = "\\\\?\\UNC\\" + tpath[2:]
|
|
else:
|
|
tpath = "\\\\?\\" + tpath
|
|
|
|
return super(ZipFileLongPaths, self)._extract_member(
|
|
member, tpath, pwd
|
|
)
|
|
|
|
|
|
def safe_copy_file(src_path, dst_path):
|
|
"""Copy file and make sure destination directory exists.
|
|
|
|
Ignore if destination already contains directories from source.
|
|
|
|
Args:
|
|
src_path (str): File path that will be copied.
|
|
dst_path (str): Path to destination file.
|
|
"""
|
|
|
|
if src_path == dst_path:
|
|
return
|
|
|
|
dst_dir = os.path.dirname(dst_path)
|
|
try:
|
|
os.makedirs(dst_dir)
|
|
except Exception:
|
|
pass
|
|
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
|
|
def _value_match_regexes(value, regexes):
|
|
for regex in regexes:
|
|
if regex.search(value):
|
|
return True
|
|
return False
|
|
|
|
|
|
def find_files_in_subdir(
|
|
src_path,
|
|
ignore_file_patterns=None,
|
|
ignore_dir_patterns=None
|
|
):
|
|
if ignore_file_patterns is None:
|
|
ignore_file_patterns = IGNORE_FILE_PATTERNS
|
|
|
|
if ignore_dir_patterns is None:
|
|
ignore_dir_patterns = IGNORE_DIR_PATTERNS
|
|
output = []
|
|
|
|
hierarchy_queue = collections.deque()
|
|
hierarchy_queue.append((src_path, []))
|
|
while hierarchy_queue:
|
|
item = hierarchy_queue.popleft()
|
|
dirpath, parents = item
|
|
for name in os.listdir(dirpath):
|
|
path = os.path.join(dirpath, name)
|
|
if os.path.isfile(path):
|
|
if not _value_match_regexes(name, ignore_file_patterns):
|
|
items = list(parents)
|
|
items.append(name)
|
|
output.append((path, os.path.sep.join(items)))
|
|
continue
|
|
|
|
if not _value_match_regexes(name, ignore_dir_patterns):
|
|
items = list(parents)
|
|
items.append(name)
|
|
hierarchy_queue.append((path, items))
|
|
|
|
return output
|
|
|
|
|
|
def copy_server_content(addon_output_dir, current_dir, log):
|
|
"""Copies server side folders to 'addon_package_dir'
|
|
|
|
Args:
|
|
addon_output_dir (str): package dir in addon repo dir
|
|
current_dir (str): addon repo dir
|
|
log (logging.Logger)
|
|
"""
|
|
|
|
log.info("Copying server content")
|
|
|
|
filepaths_to_copy = []
|
|
server_dirpath = os.path.join(current_dir, "server")
|
|
|
|
for item in find_files_in_subdir(server_dirpath):
|
|
src_path, dst_subpath = item
|
|
dst_path = os.path.join(addon_output_dir, "server", dst_subpath)
|
|
filepaths_to_copy.append((src_path, dst_path))
|
|
|
|
# Copy files
|
|
for src_path, dst_path in filepaths_to_copy:
|
|
safe_copy_file(src_path, dst_path)
|
|
|
|
|
|
def _update_client_version(client_addon_dir):
|
|
"""Write version.py file to 'client' directory.
|
|
|
|
Make sure the version in client dir is the same as in package.py.
|
|
|
|
Args:
|
|
client_addon_dir (str): Directory path of client addon.
|
|
"""
|
|
|
|
dst_version_path = os.path.join(client_addon_dir, "version.py")
|
|
with open(dst_version_path, "w") as stream:
|
|
stream.write(CLIENT_VERSION_CONTENT.format(ADDON_VERSION))
|
|
|
|
|
|
def zip_client_side(addon_package_dir, current_dir, log):
|
|
"""Copy and zip `client` content into 'addon_package_dir'.
|
|
|
|
Args:
|
|
addon_package_dir (str): Output package directory path.
|
|
current_dir (str): Directory path of addon source.
|
|
log (logging.Logger): Logger object.
|
|
"""
|
|
|
|
client_dir = os.path.join(current_dir, "client")
|
|
client_addon_dir = os.path.join(client_dir, ADDON_CLIENT_DIR)
|
|
if not os.path.isdir(client_addon_dir):
|
|
raise ValueError(
|
|
f"Failed to find client directory '{client_addon_dir}'"
|
|
)
|
|
|
|
log.info("Preparing client code zip")
|
|
private_dir = os.path.join(addon_package_dir, "private")
|
|
|
|
if not os.path.exists(private_dir):
|
|
os.makedirs(private_dir)
|
|
|
|
_update_client_version(client_addon_dir)
|
|
|
|
zip_filepath = os.path.join(os.path.join(private_dir, "client.zip"))
|
|
with ZipFileLongPaths(zip_filepath, "w", zipfile.ZIP_DEFLATED) as zipf:
|
|
# Add client code content to zip
|
|
for path, sub_path in find_files_in_subdir(client_addon_dir):
|
|
sub_path = os.path.join(ADDON_CLIENT_DIR, sub_path)
|
|
zipf.write(path, sub_path)
|
|
|
|
shutil.copy(os.path.join(client_dir, "pyproject.toml"), private_dir)
|
|
|
|
|
|
def create_server_package(
|
|
output_dir: str,
|
|
addon_output_dir: str,
|
|
log: logging.Logger
|
|
):
|
|
"""Create server package zip file.
|
|
|
|
The zip file can be installed to a server using UI or rest api endpoints.
|
|
|
|
Args:
|
|
output_dir (str): Directory path to output zip file.
|
|
addon_output_dir (str): Directory path to addon output directory.
|
|
log (logging.Logger): Logger object.
|
|
"""
|
|
|
|
log.info("Creating server package")
|
|
output_path = os.path.join(
|
|
output_dir, f"{ADDON_NAME}-{ADDON_VERSION}.zip"
|
|
)
|
|
with ZipFileLongPaths(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
|
# Move addon content to zip into 'addon' directory
|
|
addon_output_dir_offset = len(addon_output_dir) + 1
|
|
for root, _, filenames in os.walk(addon_output_dir):
|
|
if not filenames:
|
|
continue
|
|
|
|
dst_root = None
|
|
if root != addon_output_dir:
|
|
dst_root = root[addon_output_dir_offset:]
|
|
for filename in filenames:
|
|
src_path = os.path.join(root, filename)
|
|
dst_path = filename
|
|
if dst_root:
|
|
dst_path = os.path.join(dst_root, dst_path)
|
|
zipf.write(src_path, dst_path)
|
|
|
|
log.info(f"Output package can be found: {output_path}")
|
|
|
|
|
|
def main(
|
|
output_dir: Optional[str]=None,
|
|
skip_zip: bool=False,
|
|
keep_sources: bool=False
|
|
):
|
|
log = logging.getLogger("create_package")
|
|
log.info("Start creating package")
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
if not output_dir:
|
|
output_dir = os.path.join(current_dir, "package")
|
|
|
|
|
|
new_created_version_dir = os.path.join(
|
|
output_dir, ADDON_NAME, ADDON_VERSION
|
|
)
|
|
if os.path.isdir(new_created_version_dir):
|
|
log.info(f"Purging {new_created_version_dir}")
|
|
shutil.rmtree(output_dir)
|
|
|
|
log.info(f"Preparing package for {ADDON_NAME}-{ADDON_VERSION}")
|
|
|
|
addon_output_root = os.path.join(output_dir, ADDON_NAME)
|
|
addon_output_dir = os.path.join(addon_output_root, ADDON_VERSION)
|
|
if not os.path.exists(addon_output_dir):
|
|
os.makedirs(addon_output_dir)
|
|
|
|
copy_server_content(addon_output_dir, current_dir, log)
|
|
safe_copy_file(
|
|
PACKAGE_PATH,
|
|
os.path.join(addon_output_dir, os.path.basename(PACKAGE_PATH))
|
|
)
|
|
zip_client_side(addon_output_dir, current_dir, log)
|
|
|
|
# Skip server zipping
|
|
if not skip_zip:
|
|
create_server_package(output_dir, addon_output_dir, log)
|
|
# Remove sources only if zip file is created
|
|
if not keep_sources:
|
|
log.info("Removing source files for server package")
|
|
shutil.rmtree(addon_output_root)
|
|
log.info("Package creation finished")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--skip-zip",
|
|
dest="skip_zip",
|
|
action="store_true",
|
|
help=(
|
|
"Skip zipping server package and create only"
|
|
" server folder structure."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--keep-sources",
|
|
dest="keep_sources",
|
|
action="store_true",
|
|
help=(
|
|
"Keep folder structure when server package is created."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--output",
|
|
dest="output_dir",
|
|
default=None,
|
|
help=(
|
|
"Directory path where package will be created"
|
|
" (Will be purged if already exists!)"
|
|
)
|
|
)
|
|
|
|
args = parser.parse_args(sys.argv[1:])
|
|
main(args.output_dir, args.skip_zip, args.keep_sources)
|