work on sphinx

This commit is contained in:
Ondrej Samohel 2020-10-09 16:05:14 +02:00
parent f9c9214e35
commit da3c0d73f4
No known key found for this signature in database
GPG key ID: 8A29C663C672C2B7
105 changed files with 3112 additions and 1022 deletions

View file

@ -8,52 +8,51 @@ import shutil
import tempfile
from typing import Union, Callable, Dict
from zipfile import ZipFile
from pathlib import Path
from appdirs import user_data_dir
from pype.version import __version__
from pype.lib import PypeSettingsRegistry
from igniter.tools import load_environments
class BootstrapRepos:
"""Class for bootstrapping local Pype installation.
Attributes:
data_dir (str): local Pype installation directory.
live_repo_dir (str): path to repos directory if running live,
data_dir (Path): local Pype installation directory.
live_repo_dir (Path): path to repos directory if running live,
otherwise `None`.
"""
_vendor = "pypeclub"
_app = "pype"
def __init__(self):
# vendor and app used to construct user data dir
self._vendor = "pypeclub"
self._app = "pype"
self._log = log.getLogger(str(__class__))
self.data_dir = user_data_dir(self._app, self._vendor)
if getattr(sys, 'frozen', False):
self.live_repo_dir = os.path.join(
os.path.dirname(sys.executable),
"repos"
)
self.data_dir = Path(user_data_dir(self._app, self._vendor))
self.registry = PypeSettingsRegistry()
if getattr(sys, "frozen", False):
self.live_repo_dir = Path(sys.executable).parent / "repos"
else:
self.live_repo_dir = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"repos"
)
)
self.live_repo_dir = Path(Path(__file__).parent / ".." / "repos")
@staticmethod
def get_local_version() -> str:
"""Get version of local Pype."""
return __version__
def install_live_repos(self, progress_callback=None) -> Union[str, None]:
def install_live_repos(self, progress_callback=None) -> Union[Path, None]:
"""Copy zip created from local repositories to user data dir.
Args:
progress_callback (callable): Optional callback method to report
progress.
Returns:
str: path of installed repository file.
Path: path of installed repository file.
"""
# dummy progress reporter
def empty_progress(x: int):
@ -64,7 +63,6 @@ class BootstrapRepos:
# create zip from repositories
local_version = self.get_local_version()
repo_dir = self.live_repo_dir
# create destination directory
try:
@ -72,45 +70,42 @@ class BootstrapRepos:
except OSError:
self._log.error("directory already exists")
with tempfile.TemporaryDirectory() as temp_dir:
temp_zip = os.path.join(
temp_dir,
f"pype-repositories-v{local_version}.zip"
)
temp_zip = \
Path(temp_dir) / f"pype-repositories-v{local_version}.zip"
self._log.info(f"creating zip: {temp_zip}")
BootstrapRepos._create_pype_zip(
temp_zip, repo_dir, progress_callback=progress_callback)
temp_zip, self.live_repo_dir, progress_callback=progress_callback)
if not os.path.exists(temp_zip):
self._log.error("make archive failed.")
return None
destination = os.path.join(
self.data_dir, os.path.basename(temp_zip))
destination = self.data_dir / temp_zip.name
if os.path.exists(destination):
if destination.exists():
self._log.warning(
f"Destination file {destination} exists, removing.")
try:
os.remove(destination)
destination.unlink()
except Exception as e:
self._log.error(e)
return None
try:
shutil.move(temp_zip, self.data_dir)
shutil.move(temp_zip.as_posix(), self.data_dir.as_posix())
except shutil.Error as e:
self._log.error(e)
return None
return os.path.join(self.data_dir, os.path.basename(temp_zip))
return self.data_dir / temp_zip.name
@staticmethod
def _create_pype_zip(
zip_path: str, include_dir: str,
zip_path: Path, include_dir: Path,
progress_callback: Callable, include_pype: bool = True) -> None:
"""Pack repositories and Pype into zip.
We are using `zipfile` instead :meth:`shutil.make_archive` to later
implement file filter to skip git related stuff to make it into
archive.
We are using :mod:`zipfile` instead :meth:`shutil.make_archive`
to later implement file filter to skip git related stuff to make
it into archive.
Todo:
Implement file filter
@ -118,9 +113,11 @@ class BootstrapRepos:
Args:
zip_path (str): path to zip file.
include_dir: repo directories to include.
progress_callback (Callable): callback to report progress back to
UI progress bar.
progress_callback (Callable(progress: int): callback to
report progress back to UI progress bar. It takes progress
percents as argument.
include_pype (bool): add Pype module itself.
"""
repo_files = sum(len(files) for _, _, files in os.walk(include_dir))
assert repo_files != 0, f"No repositories to include in {include_dir}"
@ -161,8 +158,8 @@ class BootstrapRepos:
progress_callback(100)
@staticmethod
def add_paths_from_archive(archive: str) -> None:
"""Add first-level directories as paths to sys.path.
def add_paths_from_archive(archive: Path) -> None:
"""Add first-level directories as paths to :mod:`sys.path`.
This will enable Python to import modules is second-level directories
in zip file.
@ -188,26 +185,74 @@ class BootstrapRepos:
os.environ["PYTHONPATH"] = os.pathsep.join(paths)
def find_pype(self) -> Union[Dict, None]:
def find_pype(self) -> Union[Dict[str, Path], None]:
"""Get ordered dict of detected Pype version.
Resolution order for Pype is following:
1) First we test for ``PYPE_PATH`` environment variable
2) We try to find ``pypePath`` in registry setting
3) We use user data directory
Returns:
dict: Dictionary of detected Pype version. Key is version, value
is path to zip file.
dict of Path: Dictionary of detected Pype version.
Key is version, value is path to zip file.
None: if Pype is not found.
"""
dir_to_search = self.data_dir
if os.getenv("PYPE_PATH"):
if Path(os.getenv("PYPE_PATH")).exists():
dir_to_search = Path(os.getenv("PYPE_PATH"))
else:
try:
registry_dir = Path(self.registry.get_item("pypePath"))
if registry_dir.exists():
dir_to_search = registry_dir
except ValueError:
# nothing found in registry, we'll use data dir
pass
# pype installation dir doesn't exists
if not os.path.exists(self.data_dir):
if not dir_to_search.exists():
return None
# f"pype-repositories-v{local_version}.zip"
files = os.listdir(self.data_dir)
_pype_versions = {}
for file in files:
for file in dir_to_search.iterdir():
m = re.match(
r"^pype-repositories-v(?P<version>\d+\.\d+\.\d+).zip$", file)
r"^pype-repositories-v(?P<version>\d+\.\d+\.\d+).zip$",
file.name)
if m:
_pype_versions[m.group("version")] = os.path.join(
self.data_dir, file)
_pype_versions[m.group("version")] = file
return dict(sorted(_pype_versions.items()))
@staticmethod
def _get_pype_from_mongo(mongo_url: str) -> Union[Path, None]:
"""Get path from Mongo database.
This sets environment variable ``AVALON_MONGO`` for
:mod:`pype.settings` to be able to read data from database.
It will then retrieve environment variables and among them
must be ``PYPE_ROOT``.
Args:
mongo_url (str): mongodb connection url
Returns:
Path: if path from ``PYPE_ROOT`` is found.
None: if not.
"""
os.environ["AVALON_MONGO"] = mongo_url
env = load_environments()
if not env.get("PYPE_ROOT"):
return None
return Path(env.get("PYPE_ROOT"))
def process_entered_path(self, location: str) -> str:
pype_path = None
if location.startswith("mongodb"):
self._get_pype_from_mongo(location)