updated create package script

This commit is contained in:
Jakub Trllo 2024-10-02 14:40:01 +02:00
parent 616f8eece4
commit db41e53511

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
"""Prepares server package from addon repo to upload to server.
Requires Python 3.9. (Or at least 3.8+).
@ -22,32 +24,39 @@ client side code zipped in `private` subfolder.
import os
import sys
import re
import io
import shutil
import argparse
import platform
import argparse
import logging
import collections
import zipfile
import hashlib
import subprocess
from typing import Optional, Iterable, Pattern, Union, List, Tuple
from typing import Optional
import package
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PACKAGE_PATH = os.path.join(CURRENT_DIR, "package.py")
package_content = {}
with open(PACKAGE_PATH, "r") as stream:
exec(stream.read(), package_content)
FileMapping = Tuple[Union[str, io.BytesIO], str]
ADDON_NAME: str = package.name
ADDON_VERSION: str = package.version
ADDON_CLIENT_DIR: Union[str, None] = getattr(package, "client_dir", None)
ADDON_VERSION = package_content["version"]
ADDON_NAME = package_content["name"]
ADDON_CLIENT_DIR = package_content["client_dir"]
CLIENT_VERSION_CONTENT = '''# -*- coding: utf-8 -*-
"""Package declaring AYON core addon version."""
__version__ = "{}"
CURRENT_ROOT: str = os.path.dirname(os.path.abspath(__file__))
SERVER_ROOT: str = os.path.join(CURRENT_ROOT, "server")
FRONTEND_ROOT: str = os.path.join(CURRENT_ROOT, "frontend")
FRONTEND_DIST_ROOT: str = os.path.join(FRONTEND_ROOT, "dist")
DST_DIST_DIR: str = os.path.join("frontend", "dist")
PRIVATE_ROOT: str = os.path.join(CURRENT_ROOT, "private")
PUBLIC_ROOT: str = os.path.join(CURRENT_ROOT, "public")
CLIENT_ROOT: str = os.path.join(CURRENT_ROOT, "client")
VERSION_PY_CONTENT = f'''# -*- coding: utf-8 -*-
"""Package declaring AYON addon '{ADDON_NAME}' version."""
__version__ = "{ADDON_VERSION}"
'''
# Patterns of directories to be skipped for server part of addon
IGNORE_DIR_PATTERNS = [
IGNORE_DIR_PATTERNS: List[Pattern] = [
re.compile(pattern)
for pattern in {
# Skip directories starting with '.'
@ -58,7 +67,7 @@ IGNORE_DIR_PATTERNS = [
]
# Patterns of files to be skipped for server part of addon
IGNORE_FILE_PATTERNS = [
IGNORE_FILE_PATTERNS: List[Pattern] = [
re.compile(pattern)
for pattern in {
# Skip files starting with '.'
@ -70,15 +79,6 @@ IGNORE_FILE_PATTERNS = [
]
def calculate_file_checksum(filepath, hash_algorithm, chunk_size=10000):
func = getattr(hashlib, hash_algorithm)
hash_obj = func()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
class ZipFileLongPaths(zipfile.ZipFile):
"""Allows longer paths in zip files.
@ -97,12 +97,28 @@ class ZipFileLongPaths(zipfile.ZipFile):
else:
tpath = "\\\\?\\" + tpath
return super(ZipFileLongPaths, self)._extract_member(
member, tpath, pwd
)
return super()._extract_member(member, tpath, pwd)
def safe_copy_file(src_path, dst_path):
def _get_yarn_executable() -> Union[str, None]:
cmd = "which"
if platform.system().lower() == "windows":
cmd = "where"
for line in subprocess.check_output(
[cmd, "yarn"], encoding="utf-8"
).splitlines():
if not line or not os.path.exists(line):
continue
try:
subprocess.call([line, "--version"])
return line
except OSError:
continue
return None
def safe_copy_file(src_path: str, dst_path: str):
"""Copy file and make sure destination directory exists.
Ignore if destination already contains directories from source.
@ -115,210 +131,335 @@ def safe_copy_file(src_path, dst_path):
if src_path == dst_path:
return
dst_dir = os.path.dirname(dst_path)
try:
os.makedirs(dst_dir)
except Exception:
pass
dst_dir: str = os.path.dirname(dst_path)
os.makedirs(dst_dir, exist_ok=True)
shutil.copy2(src_path, dst_path)
def _value_match_regexes(value, regexes):
for regex in regexes:
if regex.search(value):
return True
return False
def _value_match_regexes(value: str, regexes: Iterable[Pattern]) -> bool:
return any(
regex.search(value)
for regex in regexes
)
def find_files_in_subdir(
src_path,
ignore_file_patterns=None,
ignore_dir_patterns=None
):
src_path: str,
ignore_file_patterns: Optional[List[Pattern]] = None,
ignore_dir_patterns: Optional[List[Pattern]] = None
) -> List[Tuple[str, str]]:
"""Find all files to copy in subdirectories of given path.
All files that match any of the patterns in 'ignore_file_patterns' will
be skipped and any directories that match any of the patterns in
'ignore_dir_patterns' will be skipped with all subfiles.
Args:
src_path (str): Path to directory to search in.
ignore_file_patterns (Optional[list[Pattern]]): List of regexes
to match files to ignore.
ignore_dir_patterns (Optional[list[Pattern]]): List of regexes
to match directories to ignore.
Returns:
list[tuple[str, str]]: List of tuples with path to file and parent
directories relative to 'src_path'.
"""
if ignore_file_patterns is None:
ignore_file_patterns = IGNORE_FILE_PATTERNS
if ignore_dir_patterns is None:
ignore_dir_patterns = IGNORE_DIR_PATTERNS
output = []
output: List[Tuple[str, str]] = []
if not os.path.exists(src_path):
return output
hierarchy_queue = collections.deque()
hierarchy_queue: collections.deque = collections.deque()
hierarchy_queue.append((src_path, []))
while hierarchy_queue:
item = hierarchy_queue.popleft()
item: Tuple[str, str] = hierarchy_queue.popleft()
dirpath, parents = item
for name in os.listdir(dirpath):
path = os.path.join(dirpath, name)
path: str = os.path.join(dirpath, name)
if os.path.isfile(path):
if not _value_match_regexes(name, ignore_file_patterns):
items = list(parents)
items: List[str] = list(parents)
items.append(name)
output.append((path, os.path.sep.join(items)))
continue
if not _value_match_regexes(name, ignore_dir_patterns):
items = list(parents)
items: List[str] = list(parents)
items.append(name)
hierarchy_queue.append((path, items))
return output
def copy_server_content(addon_output_dir, current_dir, log):
def update_client_version(logger):
"""Update version in client code if version.py is present."""
if not ADDON_CLIENT_DIR:
return
version_path: str = os.path.join(
CLIENT_ROOT, ADDON_CLIENT_DIR, "version.py"
)
if not os.path.exists(version_path):
logger.debug("Did not find version.py in client directory")
return
logger.info("Updating client version")
with open(version_path, "w") as stream:
stream.write(VERSION_PY_CONTENT)
def update_pyproject_toml(logger):
filepath = os.path.join(CURRENT_ROOT, "pyproject.toml")
new_lines = []
with open(filepath, "r") as stream:
version_found = False
for line in stream.readlines():
if not version_found and line.startswith("version ="):
line = f'version = "{ADDON_VERSION}"\n'
version_found = True
new_lines.append(line)
with open(filepath, "w") as stream:
stream.write("".join(new_lines))
def build_frontend():
yarn_executable = _get_yarn_executable()
if yarn_executable is None:
raise RuntimeError("Yarn executable was not found.")
subprocess.run([yarn_executable, "install"], cwd=FRONTEND_ROOT)
subprocess.run([yarn_executable, "build"], cwd=FRONTEND_ROOT)
if not os.path.exists(FRONTEND_DIST_ROOT):
raise RuntimeError(
"Frontend build failed. Did not find 'dist' folder."
)
def get_client_files_mapping() -> List[Tuple[str, str]]:
"""Mapping of source client code files to destination paths.
Example output:
[
(
"C:/addons/MyAddon/version.py",
"my_addon/version.py"
),
(
"C:/addons/MyAddon/client/my_addon/__init__.py",
"my_addon/__init__.py"
)
]
Returns:
list[tuple[str, str]]: List of path mappings to copy. The destination
path is relative to expected output directory.
"""
# Add client code content to zip
client_code_dir: str = os.path.join(CLIENT_ROOT, ADDON_CLIENT_DIR)
mapping = [
(path, os.path.join(ADDON_CLIENT_DIR, sub_path))
for path, sub_path in find_files_in_subdir(client_code_dir)
]
license_path = os.path.join(CURRENT_ROOT, "LICENSE")
if os.path.exists(license_path):
mapping.append((license_path, f"{ADDON_CLIENT_DIR}/LICENSE"))
return mapping
def get_client_zip_content(log) -> io.BytesIO:
log.info("Preparing client code zip")
files_mapping: List[Tuple[str, str]] = get_client_files_mapping()
stream = io.BytesIO()
with ZipFileLongPaths(stream, "w", zipfile.ZIP_DEFLATED) as zipf:
for src_path, subpath in files_mapping:
zipf.write(src_path, subpath)
stream.seek(0)
return stream
def get_base_files_mapping() -> List[FileMapping]:
filepaths_to_copy: List[FileMapping] = [
(
os.path.join(CURRENT_ROOT, "package.py"),
"package.py"
)
]
# Add license file to package if exists
license_path = os.path.join(CURRENT_ROOT, "LICENSE")
if os.path.exists(license_path):
filepaths_to_copy.append((license_path, "LICENSE"))
# Go through server, private and public directories and find all files
for dirpath in (SERVER_ROOT, PRIVATE_ROOT, PUBLIC_ROOT):
if not os.path.exists(dirpath):
continue
dirname = os.path.basename(dirpath)
for src_file, subpath in find_files_in_subdir(dirpath):
dst_subpath = os.path.join(dirname, subpath)
filepaths_to_copy.append((src_file, dst_subpath))
if os.path.exists(FRONTEND_DIST_ROOT):
for src_file, subpath in find_files_in_subdir(FRONTEND_DIST_ROOT):
dst_subpath = os.path.join(DST_DIST_DIR, subpath)
filepaths_to_copy.append((src_file, dst_subpath))
pyproject_toml = os.path.join(CLIENT_ROOT, "pyproject.toml")
if os.path.exists(pyproject_toml):
filepaths_to_copy.append(
(pyproject_toml, "private/pyproject.toml")
)
return filepaths_to_copy
def copy_client_code(output_dir: str, log: logging.Logger):
"""Copies server side folders to 'addon_package_dir'
Args:
addon_output_dir (str): package dir in addon repo dir
current_dir (str): addon repo dir
output_dir (str): Output directory path.
log (logging.Logger)
"""
log.info(f"Copying client for {ADDON_NAME}-{ADDON_VERSION}")
log.info("Copying server content")
full_output_path = os.path.join(
output_dir, f"{ADDON_NAME}_{ADDON_VERSION}"
)
if os.path.exists(full_output_path):
shutil.rmtree(full_output_path)
os.makedirs(full_output_path, exist_ok=True)
filepaths_to_copy = []
server_dirpath = os.path.join(current_dir, "server")
for item in find_files_in_subdir(server_dirpath):
src_path, dst_subpath = item
dst_path = os.path.join(addon_output_dir, "server", dst_subpath)
filepaths_to_copy.append((src_path, dst_path))
# Copy files
for src_path, dst_path in filepaths_to_copy:
for src_path, dst_subpath in get_client_files_mapping():
dst_path = os.path.join(full_output_path, dst_subpath)
safe_copy_file(src_path, dst_path)
def _update_client_version(client_addon_dir):
"""Write version.py file to 'client' directory.
Make sure the version in client dir is the same as in package.py.
Args:
client_addon_dir (str): Directory path of client addon.
"""
dst_version_path = os.path.join(client_addon_dir, "version.py")
with open(dst_version_path, "w") as stream:
stream.write(CLIENT_VERSION_CONTENT.format(ADDON_VERSION))
log.info("Client copy finished")
def zip_client_side(addon_package_dir, current_dir, log):
"""Copy and zip `client` content into 'addon_package_dir'.
Args:
addon_package_dir (str): Output package directory path.
current_dir (str): Directory path of addon source.
log (logging.Logger): Logger object.
"""
client_dir = os.path.join(current_dir, "client")
client_addon_dir = os.path.join(client_dir, ADDON_CLIENT_DIR)
if not os.path.isdir(client_addon_dir):
raise ValueError(
f"Failed to find client directory '{client_addon_dir}'"
)
log.info("Preparing client code zip")
private_dir = os.path.join(addon_package_dir, "private")
if not os.path.exists(private_dir):
os.makedirs(private_dir)
_update_client_version(client_addon_dir)
zip_filepath = os.path.join(os.path.join(private_dir, "client.zip"))
with ZipFileLongPaths(zip_filepath, "w", zipfile.ZIP_DEFLATED) as zipf:
# Add client code content to zip
for path, sub_path in find_files_in_subdir(client_addon_dir):
sub_path = os.path.join(ADDON_CLIENT_DIR, sub_path)
zipf.write(path, sub_path)
shutil.copy(os.path.join(client_dir, "pyproject.toml"), private_dir)
def create_server_package(
def copy_addon_package(
output_dir: str,
addon_output_dir: str,
files_mapping: List[FileMapping],
log: logging.Logger
):
"""Create server package zip file.
The zip file can be installed to a server using UI or rest api endpoints.
"""Copy client code to output directory.
Args:
output_dir (str): Directory path to output zip file.
addon_output_dir (str): Directory path to addon output directory.
output_dir (str): Directory path to output client code.
files_mapping (List[FileMapping]): List of tuples with source file
and destination subpath.
log (logging.Logger): Logger object.
"""
log.info("Creating server package")
"""
log.info(f"Copying package for {ADDON_NAME}-{ADDON_VERSION}")
# Add addon name and version to output directory
addon_output_dir: str = os.path.join(
output_dir, ADDON_NAME, ADDON_VERSION
)
if os.path.isdir(addon_output_dir):
log.info(f"Purging {addon_output_dir}")
shutil.rmtree(addon_output_dir)
os.makedirs(addon_output_dir, exist_ok=True)
# Copy server content
for src_file, dst_subpath in files_mapping:
dst_path: str = os.path.join(addon_output_dir, dst_subpath)
dst_dir: str = os.path.dirname(dst_path)
os.makedirs(dst_dir, exist_ok=True)
if isinstance(src_file, io.BytesIO):
with open(dst_path, "wb") as stream:
stream.write(src_file.getvalue())
else:
safe_copy_file(src_file, dst_path)
log.info("Package copy finished")
def create_addon_package(
output_dir: str,
files_mapping: List[FileMapping],
log: logging.Logger
):
log.info(f"Creating package for {ADDON_NAME}-{ADDON_VERSION}")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(
output_dir, f"{ADDON_NAME}-{ADDON_VERSION}.zip"
)
with ZipFileLongPaths(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
# Move addon content to zip into 'addon' directory
addon_output_dir_offset = len(addon_output_dir) + 1
for root, _, filenames in os.walk(addon_output_dir):
if not filenames:
continue
# Copy server content
for src_file, dst_subpath in files_mapping:
if isinstance(src_file, io.BytesIO):
zipf.writestr(dst_subpath, src_file.getvalue())
else:
zipf.write(src_file, dst_subpath)
dst_root = None
if root != addon_output_dir:
dst_root = root[addon_output_dir_offset:]
for filename in filenames:
src_path = os.path.join(root, filename)
dst_path = filename
if dst_root:
dst_path = os.path.join(dst_root, dst_path)
zipf.write(src_path, dst_path)
log.info(f"Output package can be found: {output_path}")
log.info("Package created")
def main(
output_dir: Optional[str]=None,
skip_zip: bool=False,
keep_sources: bool=False,
clear_output_dir: bool=False
output_dir: Optional[str] = None,
skip_zip: Optional[bool] = False,
only_client: Optional[bool] = False
):
log = logging.getLogger("create_package")
log.info("Start creating package")
log: logging.Logger = logging.getLogger("create_package")
log.info("Package creation started")
current_dir = os.path.dirname(os.path.abspath(__file__))
if not output_dir:
output_dir = os.path.join(current_dir, "package")
output_dir = os.path.join(CURRENT_ROOT, "package")
has_client_code = bool(ADDON_CLIENT_DIR)
if has_client_code:
client_dir: str = os.path.join(CLIENT_ROOT, ADDON_CLIENT_DIR)
if not os.path.exists(client_dir):
raise RuntimeError(
f"Client directory was not found '{client_dir}'."
" Please check 'client_dir' in 'package.py'."
)
update_client_version(log)
new_created_version_dir = os.path.join(
output_dir, ADDON_NAME, ADDON_VERSION
)
update_pyproject_toml(log)
if os.path.isdir(new_created_version_dir) and clear_output_dir:
log.info(f"Purging {new_created_version_dir}")
shutil.rmtree(output_dir)
if only_client:
if not has_client_code:
raise RuntimeError("Client code is not available. Skipping")
copy_client_code(output_dir, log)
return
log.info(f"Preparing package for {ADDON_NAME}-{ADDON_VERSION}")
addon_output_root = os.path.join(output_dir, ADDON_NAME)
addon_output_dir = os.path.join(addon_output_root, ADDON_VERSION)
if not os.path.exists(addon_output_dir):
os.makedirs(addon_output_dir)
if os.path.exists(FRONTEND_ROOT):
build_frontend()
copy_server_content(addon_output_dir, current_dir, log)
safe_copy_file(
PACKAGE_PATH,
os.path.join(addon_output_dir, os.path.basename(PACKAGE_PATH))
)
zip_client_side(addon_output_dir, current_dir, log)
files_mapping: List[FileMapping] = []
files_mapping.extend(get_base_files_mapping())
if has_client_code:
files_mapping.append(
(get_client_zip_content(log), "private/client.zip")
)
# Skip server zipping
if not skip_zip:
create_server_package(output_dir, addon_output_dir, log)
# Remove sources only if zip file is created
if not keep_sources:
log.info("Removing source files for server package")
shutil.rmtree(addon_output_root)
if skip_zip:
copy_addon_package(output_dir, files_mapping, log)
else:
create_addon_package(output_dir, files_mapping, log)
log.info("Package creation finished")
@ -333,23 +474,6 @@ if __name__ == "__main__":
" server folder structure."
)
)
parser.add_argument(
"--keep-sources",
dest="keep_sources",
action="store_true",
help=(
"Keep folder structure when server package is created."
)
)
parser.add_argument(
"-c", "--clear-output-dir",
dest="clear_output_dir",
action="store_true",
help=(
"Clear output directory before package creation."
)
)
parser.add_argument(
"-o", "--output",
dest="output_dir",
@ -359,11 +483,25 @@ if __name__ == "__main__":
" (Will be purged if already exists!)"
)
)
parser.add_argument(
"--only-client",
dest="only_client",
action="store_true",
help=(
"Extract only client code. This is useful for development."
" Requires '-o', '--output' argument to be filled."
)
)
parser.add_argument(
"--debug",
dest="debug",
action="store_true",
help="Debug log messages."
)
args = parser.parse_args(sys.argv[1:])
main(
args.output_dir,
args.skip_zip,
args.keep_sources,
args.clear_output_dir
)
level = logging.INFO
if args.debug:
level = logging.DEBUG
logging.basicConfig(level=level)
main(args.output_dir, args.skip_zip, args.only_client)