create/copy files only if needed

This commit is contained in:
Jakub Trllo 2024-06-05 19:15:34 +02:00
parent 2e2fe06145
commit 84cac2a146

View file

@ -1,3 +1,4 @@
import io
import os import os
import sys import sys
import re import re
@ -9,7 +10,7 @@ import importlib.machinery
import platform import platform
import collections import collections
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Iterable, Pattern, List, Tuple from typing import Optional, Iterable, Pattern, List, Tuple
# Patterns of directories to be skipped for server part of addon # Patterns of directories to be skipped for server part of addon
IGNORE_DIR_PATTERNS: List[Pattern] = [ IGNORE_DIR_PATTERNS: List[Pattern] = [
@ -70,9 +71,7 @@ class ZipFileLongPaths(zipfile.ZipFile):
else: else:
tpath = "\\\\?\\" + tpath tpath = "\\\\?\\" + tpath
return super(ZipFileLongPaths, self)._extract_member( return super()._extract_member(member, tpath, pwd)
member, tpath, pwd
)
def _value_match_regexes(value: str, regexes: Iterable[Pattern]) -> bool: def _value_match_regexes(value: str, regexes: Iterable[Pattern]) -> bool:
@ -86,7 +85,7 @@ def find_files_in_subdir(
src_path: str, src_path: str,
ignore_file_patterns: Optional[List[Pattern]] = None, ignore_file_patterns: Optional[List[Pattern]] = None,
ignore_dir_patterns: Optional[List[Pattern]] = None, ignore_dir_patterns: Optional[List[Pattern]] = None,
ignore_subdirs: Optional[Iterable[Tuple[str]]] = None include_empty_dirs: bool = True
): ):
"""Find all files to copy in subdirectories of given path. """Find all files to copy in subdirectories of given path.
@ -100,8 +99,7 @@ def find_files_in_subdir(
to match files to ignore. to match files to ignore.
ignore_dir_patterns (Optional[List[Pattern]]): List of regexes ignore_dir_patterns (Optional[List[Pattern]]): List of regexes
to match directories to ignore. to match directories to ignore.
ignore_subdirs (Optional[Iterable[Tuple[str]]]): List of include_empty_dirs (Optional[bool]): Do not skip empty directories.
subdirectories to ignore.
Returns: Returns:
List[Tuple[str, str]]: List of tuples with path to file and parent List[Tuple[str, str]]: List of tuples with path to file and parent
@ -113,16 +111,18 @@ def find_files_in_subdir(
if ignore_dir_patterns is None: if ignore_dir_patterns is None:
ignore_dir_patterns = IGNORE_DIR_PATTERNS ignore_dir_patterns = IGNORE_DIR_PATTERNS
output: list[tuple[str, str]] = [] output: List[Tuple[str, str]] = []
hierarchy_queue = collections.deque() hierarchy_queue = collections.deque()
hierarchy_queue.append((src_path, [])) hierarchy_queue.append((src_path, []))
while hierarchy_queue: while hierarchy_queue:
item: tuple[str, str] = hierarchy_queue.popleft() item: Tuple[str, List[str]] = hierarchy_queue.popleft()
dirpath, parents = item dirpath, parents = item
if ignore_subdirs and parents in ignore_subdirs: subnames = list(os.listdir(dirpath))
continue if not subnames and include_empty_dirs:
for name in os.listdir(dirpath): output.append((dirpath, os.path.sep.join(parents)))
for name in subnames:
path = os.path.join(dirpath, name) path = os.path.join(dirpath, name)
if os.path.isfile(path): if os.path.isfile(path):
if not _value_match_regexes(name, ignore_file_patterns): if not _value_match_regexes(name, ignore_file_patterns):
@ -139,89 +139,54 @@ def find_files_in_subdir(
return output return output
def read_addon_version(version_path: Path) -> str:
# Read version
version_content: dict[str, Any] = {}
with open(str(version_path), "r") as stream:
exec(stream.read(), version_content)
return version_content["__version__"]
def get_addon_version(addon_dir: Path) -> str:
return read_addon_version(addon_dir / "server" / "version.py")
def create_addon_zip( def create_addon_zip(
output_dir: Path, output_dir: Path,
addon_name: str, addon_name: str,
addon_version: str, addon_version: str,
keep_source: bool, files_mapping: List[Tuple[str, str]],
client_zip_content: io.BytesIO
): ):
zip_filepath = output_dir / f"{addon_name}-{addon_version}.zip" zip_filepath = output_dir / f"{addon_name}-{addon_version}.zip"
addon_output_dir = output_dir / addon_name / addon_version
with ZipFileLongPaths(zip_filepath, "w", zipfile.ZIP_DEFLATED) as zipf: with ZipFileLongPaths(zip_filepath, "w", zipfile.ZIP_DEFLATED) as zipf:
# Add client code content to zip for src_path, dst_subpath in files_mapping:
src_root = os.path.normpath(str(addon_output_dir.absolute())) zipf.write(src_path, dst_subpath)
src_root_offset = len(src_root) + 1
for root, _, filenames in os.walk(str(addon_output_dir)):
rel_root = ""
if root != src_root:
rel_root = root[src_root_offset:]
for filename in filenames: if client_zip_content is not None:
src_path = os.path.join(root, filename) zipf.writestr("private/client.zip", client_zip_content.getvalue())
if rel_root:
dst_path = os.path.join(rel_root, filename)
else:
dst_path = filename
zipf.write(src_path, dst_path)
if not keep_source:
shutil.rmtree(str(output_dir / addon_name))
def prepare_client_code( def prepare_client_zip(
addon_name: str,
addon_dir: Path, addon_dir: Path,
addon_output_dir: Path, addon_name: str,
addon_version: str addon_version: str,
client_dir: str
): ):
client_dir = addon_dir / "client" if not client_dir:
if not client_dir.exists(): return None
return client_dir_obj = addon_dir / "client" / client_dir
if not client_dir_obj.exists():
return None
# Prepare private dir in output # Update version.py with server version if 'version.py' is available
private_dir = addon_output_dir / "private" version_path = client_dir_obj / "version.py"
private_dir.mkdir(parents=True, exist_ok=True) if version_path.exists():
with open(version_path, "w") as stream:
stream.write(
CLIENT_VERSION_CONTENT.format(addon_name, addon_version)
)
# Copy pyproject toml if available zip_content = io.BytesIO()
pyproject_toml = client_dir / "pyproject.toml" with ZipFileLongPaths(zip_content, "a", zipfile.ZIP_DEFLATED) as zipf:
if pyproject_toml.exists(): # Add client code content to zip
shutil.copy(pyproject_toml, private_dir) for path, sub_path in find_files_in_subdir(
str(client_dir_obj), include_empty_dirs=False
):
sub_path = os.path.join(client_dir, sub_path)
zipf.write(path, sub_path)
for subpath in client_dir.iterdir(): zip_content.seek(0)
if subpath.name == "pyproject.toml": return zip_content
continue
if subpath.is_file():
continue
# Update version.py with server version if 'version.py' is available
version_path = subpath / "version.py"
if version_path.exists():
with open(version_path, "w") as stream:
stream.write(
CLIENT_VERSION_CONTENT.format(addon_name, addon_version)
)
zip_filepath = private_dir / "client.zip"
with ZipFileLongPaths(zip_filepath, "w", zipfile.ZIP_DEFLATED) as zipf:
# Add client code content to zip
for path, sub_path in find_files_in_subdir(str(subpath)):
sub_path = os.path.join(subpath.name, sub_path)
zipf.write(path, sub_path)
def import_filepath(path: Path, module_name: Optional[str] = None): def import_filepath(path: Path, module_name: Optional[str] = None):
@ -241,44 +206,73 @@ def import_filepath(path: Path, module_name: Optional[str] = None):
return module return module
def _get_server_mapping(
addon_dir: Path, addon_version: str
) -> List[Tuple[str, str]]:
server_dir = addon_dir / "server"
src_package_py = addon_dir / "package.py"
pyproject_toml = addon_dir / "client" / "pyproject.toml"
mapping: List[Tuple[str, str]] = [
(src_path, f"server/{sub_path}")
for src_path, sub_path in find_files_in_subdir(str(server_dir))
]
mapping.append((src_package_py.as_posix(), "package.py"))
if pyproject_toml.exists():
mapping.append((pyproject_toml.as_posix(), "private/pyproject.toml"))
return mapping
def create_addon_package( def create_addon_package(
addon_dir: Path, addon_dir: Path,
output_dir: Path, output_dir: Path,
create_zip: bool, create_zip: bool,
keep_source: bool,
): ):
src_package_py = addon_dir / "package.py" src_package_py = addon_dir / "package.py"
package = import_filepath(src_package_py) package = import_filepath(src_package_py)
addon_name = package.name
addon_version = package.version addon_version = package.version
addon_output_dir = output_dir / addon_dir.name / addon_version files_mapping = _get_server_mapping(addon_dir, addon_version)
if addon_output_dir.exists():
shutil.rmtree(str(addon_output_dir))
addon_output_dir.mkdir(parents=True)
# Copy server content client_dir = getattr(package, "client_dir", None)
dst_package_py = addon_output_dir / "package.py" client_zip_content = prepare_client_zip(
shutil.copy(src_package_py, dst_package_py) addon_dir, addon_name, addon_version, client_dir
server_dir = addon_dir / "server"
shutil.copytree(
server_dir, addon_output_dir / "server", dirs_exist_ok=True
)
prepare_client_code(
package.name, addon_dir, addon_output_dir, addon_version
) )
if create_zip: if create_zip:
create_addon_zip( create_addon_zip(
output_dir, addon_dir.name, addon_version, keep_source output_dir,
addon_name,
addon_version,
files_mapping,
client_zip_content
) )
else:
addon_output_dir = output_dir / addon_dir.name / addon_version
if addon_output_dir.exists():
shutil.rmtree(str(addon_output_dir))
addon_output_dir.mkdir(parents=True, exist_ok=True)
for src_path, dst_subpath in files_mapping:
dst_path = addon_output_dir / dst_subpath
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_path, dst_path)
if client_zip_content is not None:
private_dir = addon_output_dir / "private"
private_dir.mkdir(parents=True, exist_ok=True)
with open(private_dir / "client.zip", "wb") as stream:
stream.write(client_zip_content.read())
def main( def main(
output_dir=None, output_dir=None,
skip_zip=True, skip_zip=True,
keep_source=False,
clear_output_dir=False, clear_output_dir=False,
addons=None, addons=None,
): ):
@ -313,9 +307,7 @@ def main(
if not server_dir.exists(): if not server_dir.exists():
continue continue
create_addon_package( create_addon_package(addon_dir, output_dir, create_zip)
addon_dir, output_dir, create_zip, keep_source
)
print(f"- package '{addon_dir.name}' created") print(f"- package '{addon_dir.name}' created")
print(f"Package creation finished. Output directory: {output_dir}") print(f"Package creation finished. Output directory: {output_dir}")
@ -366,10 +358,12 @@ if __name__ == "__main__":
) )
args = parser.parse_args(sys.argv[1:]) args = parser.parse_args(sys.argv[1:])
if args.keep_sources:
print("Keeping sources is not supported anymore!")
main( main(
args.output_dir, args.output_dir,
args.skip_zip, args.skip_zip,
args.keep_sources,
args.clear_output_dir, args.clear_output_dir,
args.addons, args.addons,
) )