Add shotgrid module

This commit is contained in:
Vic Bartel 2022-03-18 17:20:58 +01:00
parent 5de8185ded
commit 0e96071996
34 changed files with 1433 additions and 40 deletions

19
.editorconfig Normal file
View file

@ -0,0 +1,19 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
[*.{js,py}]
charset = utf-8
[*.py]
indent_style = space
indent_size = 4
[*.yml]
indent_style = space
indent_size = 2
[Makefile]
indent_style = tab

26
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,26 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/ambv/black
rev: 21.4b0
hooks:
- id: black
language_version: "3"
args:
- "--config"
- "./pyproject.toml"
- repo: https://github.com/pycqa/flake8
rev: "3.9.2" # pick a git hash / tag to point to
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.902'
hooks:
- id: mypy
args: [--no-strict-optional, --ignore-missing-imports]
additional_dependencies: [tokenize-rt==3.2.0]

5
mypy.ini Normal file
View file

@ -0,0 +1,5 @@
[mypy]
python_version = 3.7
ignore_missing_imports = false
check_untyped_defs = true
follow_imports = silent

View file

@ -476,6 +476,7 @@ class MayaSubmitDeadline(pyblish.api.InstancePlugin):
"FTRACK_API_KEY",
"FTRACK_API_USER",
"FTRACK_SERVER",
"OPENPYPE_SG_USER",
"AVALON_PROJECT",
"AVALON_ASSET",
"AVALON_TASK",

View file

@ -113,6 +113,7 @@ class ProcessSubmittedJobOnFarm(pyblish.api.InstancePlugin):
"celaction": [r".*"]}
enviro_filter = [
"OPENPYPE_SG_USER",
"FTRACK_API_USER",
"FTRACK_API_KEY",
"FTRACK_SERVER",

View file

@ -0,0 +1,19 @@
## Shotgrid Module
### Pre-requisites
Install and launch a [shotgrid leecher](https://github.com/Ellipsanime/shotgrid-leecher) server
### Quickstart
The goal of this tutorial is to synchronize an already existing shotgrid project with OpenPype.
- Activate the shotgrid module in the **system settings** and inform the shotgrid leecher server API url
- Create a new OpenPype project with the **project manager**
- Inform the shotgrid authentication infos (url, script name, api key) and the shotgrid project ID related to this OpenPype project in the **project settings**
- Use the batch interface (Tray > shotgrid > Launch batch), select your project and click "batch"
- You can now access your shotgrid entities within the **avalon launcher** and publish informations to shotgrid with **pyblish**

View file

@ -0,0 +1,5 @@
from .shotgrid_module import (
ShotgridModule,
)
__all__ = ("ShotgridModule",)

View file

@ -0,0 +1,50 @@
from copy import copy
from typing import Any, Iterator, Dict, Set
from avalon.api import AvalonMongoDB
from openpype.api import Logger
from openpype.modules.shotgrid.lib import (
credentials,
settings,
server,
)
_LOG = Logger().get_logger("ShotgridModule.patch")
def _patched_projects(
self: Any, projection: Any = None, only_active: bool = True
) -> Iterator[Dict[str, Any]]:
all_projects = list(self._prev_projects(projection, only_active))
if (
not credentials.get_local_login()
or not settings.filter_projects_by_login()
):
return all_projects
try:
linked_names = _fetch_linked_project_names() or set()
return [x for x in all_projects if _upper(x["name"]) in linked_names]
except Exception as e:
print(e)
return all_projects
def _upper(x: Any) -> str:
return str(x).strip().upper()
def _fetch_linked_project_names() -> Set[str]:
return {
_upper(x["project_name"])
for x in server.find_linked_projects(credentials.get_local_login())
}
def patch_avalon_db() -> None:
_LOG.debug("Run avalon patching")
if AvalonMongoDB.projects is _patched_projects:
return None
_LOG.debug("Patch Avalon.projects method")
AvalonMongoDB._prev_projects = copy(AvalonMongoDB.projects)
AvalonMongoDB.projects = _patched_projects

View file

@ -0,0 +1,9 @@
from openpype.lib import PostLaunchHook
class PostShotgridHook(PostLaunchHook):
order = None
def execute(self, *args, **kwargs):
print(args, kwargs)
pass

View file

@ -0,0 +1 @@
MODULE_NAME = "shotgrid"

View file

@ -0,0 +1,125 @@
from typing import Tuple, Optional
from urllib.parse import urlparse
import shotgun_api3
from shotgun_api3.shotgun import AuthenticationFault
from openpype.lib import OpenPypeSecureRegistry, OpenPypeSettingsRegistry
from openpype.modules.shotgrid.lib.record import Credentials
def _get_shotgrid_secure_key(hostname: str, key: str) -> str:
"""Secure item key for entered hostname."""
return f"shotgrid/{hostname}/{key}"
def _get_secure_value_and_registry(
hostname: str,
name: str,
) -> Tuple[str, OpenPypeSecureRegistry]:
key = _get_shotgrid_secure_key(hostname, name)
registry = OpenPypeSecureRegistry(key)
return registry.get_item(name, None), registry
def get_shotgrid_hostname(shotgrid_url: str) -> str:
if not shotgrid_url:
raise Exception("Shotgrid url cannot be a null")
valid_shotgrid_url = (
f"//{shotgrid_url}" if "//" not in shotgrid_url else shotgrid_url
)
return urlparse(valid_shotgrid_url).hostname
# Credentials storing function (using keyring)
def get_credentials(shotgrid_url: str) -> Optional[Credentials]:
hostname = get_shotgrid_hostname(shotgrid_url)
if not hostname:
return None
login_value, _ = _get_secure_value_and_registry(
hostname,
Credentials.login_key_prefix(),
)
password_value, _ = _get_secure_value_and_registry(
hostname,
Credentials.password_key_prefix(),
)
return Credentials(login_value, password_value)
def save_credentials(login: str, password: str, shotgrid_url: str):
hostname = get_shotgrid_hostname(shotgrid_url)
_, login_registry = _get_secure_value_and_registry(
hostname,
Credentials.login_key_prefix(),
)
_, password_registry = _get_secure_value_and_registry(
hostname,
Credentials.password_key_prefix(),
)
clear_credentials(shotgrid_url)
login_registry.set_item(Credentials.login_key_prefix(), login)
password_registry.set_item(Credentials.password_key_prefix(), password)
def clear_credentials(shotgrid_url: str):
hostname = get_shotgrid_hostname(shotgrid_url)
login_value, login_registry = _get_secure_value_and_registry(
hostname,
Credentials.login_key_prefix(),
)
password_value, password_registry = _get_secure_value_and_registry(
hostname,
Credentials.password_key_prefix(),
)
if login_value is not None:
login_registry.delete_item(Credentials.login_key_prefix())
if password_value is not None:
password_registry.delete_item(Credentials.password_key_prefix())
# Login storing function (using json)
def get_local_login() -> Optional[str]:
reg = OpenPypeSettingsRegistry()
try:
return str(reg.get_item("shotgrid_login"))
except Exception:
return None
def save_local_login(login: str):
reg = OpenPypeSettingsRegistry()
reg.set_item("shotgrid_login", login)
def clear_local_login():
reg = OpenPypeSettingsRegistry()
reg.delete_item("shotgrid_login")
def check_credentials(
login: str,
password: str,
shotgrid_url: str,
) -> bool:
if not shotgrid_url or not login or not password:
return False
try:
session = shotgun_api3.Shotgun(
shotgrid_url,
login=login,
password=password,
)
session.preferences_read()
session.close()
except AuthenticationFault:
return False
return True

View file

@ -0,0 +1,18 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class Credentials:
login: str
password: str
def is_empty(self) -> bool:
return not (self.login and self.password)
@staticmethod
def login_key_prefix() -> str:
return "login"
@staticmethod
def password_key_prefix() -> str:
return "password"

View file

@ -0,0 +1,27 @@
import traceback
import requests
from typing import Dict, Any, List
from openpype.api import Logger
from openpype.modules.shotgrid.lib import (
settings as settings_lib,
)
_LOG = Logger().get_logger("ShotgridModule.server")
def find_linked_projects(email: str) -> List[Dict[str, Any]]:
url = "".join(
[
settings_lib.get_leecher_backend_url(),
"/user/",
email,
"/project-user-links",
]
)
try:
return requests.get(url).json()
except requests.exceptions.RequestException as e:
_LOG.error(e)
traceback.print_stack()

View file

@ -0,0 +1,39 @@
import os
from typing import Tuple, Dict, List, Any
from pymongo import MongoClient
from openpype.api import get_system_settings, get_project_settings
from openpype.modules.shotgrid.lib.const import MODULE_NAME
def get_project_list() -> List[str]:
mongo_url = os.getenv("OPENPYPE_MONGO")
client = MongoClient(mongo_url)
db = client['avalon']
return db.list_collection_names()
def get_shotgrid_project_settings(project: str) -> Dict[str, Any]:
return get_project_settings(project).get(MODULE_NAME, {})
def get_shotgrid_settings() -> Dict[str, Any]:
return get_system_settings().get("modules", {}).get(MODULE_NAME, {})
def get_shotgrid_servers() -> Dict[str, Any]:
return get_shotgrid_settings().get("shotgrid_settings", {})
def get_leecher_backend_url() -> str:
return get_shotgrid_settings().get("leecher_backend_url")
def filter_projects_by_login() -> bool:
return bool(get_shotgrid_settings().get("filter_projects_by_login", False))
def get_shotgrid_event_mongo_info() -> Tuple[str, str]:
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
collection_name = "shotgrid_events"
return database_name, collection_name

View file

@ -0,0 +1,103 @@
import os
import pyblish.api
from pymongo import MongoClient
from openpype.api import get_project_settings
class CollectShotgridEntities(pyblish.api.ContextPlugin):
"""Collect shotgrid entities according to the current context"""
order = pyblish.api.CollectorOrder + 0.499
label = "Shotgrid entities"
def process(self, context):
avalon_project = context.data.get("projectEntity")
avalon_asset = context.data.get("assetEntity")
avalon_task_name = os.getenv("AVALON_TASK")
self.log.info(avalon_project)
self.log.info(avalon_asset)
sg_project = _get_shotgrid_project(avalon_project)
sg_task = _get_shotgrid_task(
avalon_project, avalon_asset, avalon_task_name
)
sg_entity = _get_shotgrid_entity(avalon_project, avalon_asset)
if sg_project:
context.data["shotgridProject"] = sg_project
self.log.info(
"Collected correspondig shotgrid project : {}".format(
sg_project
)
)
if sg_task:
context.data["shotgridTask"] = sg_task
self.log.info(
"Collected correspondig shotgrid task : {}".format(sg_task)
)
if sg_entity:
context.data["shotgridEntity"] = sg_entity
self.log.info(
"Collected correspondig shotgrid entity : {}".format(sg_entity)
)
def _find_existing_version(self, code, context):
filters = [
["project", "is", context.data.get("shotgridProject")],
["sg_task", "is", context.data.get("shotgridTask")],
["entity", "is", context.data.get("shotgridEntity")],
["code", "is", code],
]
sg = context.data.get("shotgridSession")
return sg.find_one("Version", filters, [])
def _get_shotgrid_collection(project):
mongo_url = os.getenv("OPENPYPE_MONGO")
client = MongoClient(mongo_url)
return client.get_database("shotgrid_openpype").get_collection(project)
def _get_shotgrid_project_settings(project):
return get_project_settings(project).get("shotgrid", {})
def _get_shotgrid_project(avalon_project):
proj_settings = _get_shotgrid_project_settings(avalon_project["name"])
shotgrid_project_id = proj_settings.get("shotgrid_project_id")
if shotgrid_project_id:
return {"type": "Project", "id": shotgrid_project_id}
return {}
def _get_shotgrid_task(avalon_project, avalon_asset, avalon_task):
sg_col = _get_shotgrid_collection(avalon_project["name"])
shotgrid_task_hierarchy_row = sg_col.find_one(
{
"type": "Task",
"_id": {"$regex": "^" + avalon_task + "_[0-9]*"},
"parent": {"$regex": ".*," + avalon_asset["name"] + ","},
}
)
if shotgrid_task_hierarchy_row:
return {"type": "Task", "id": shotgrid_task_hierarchy_row["src_id"]}
return {}
def _get_shotgrid_entity(avalon_project, avalon_asset):
sg_col = _get_shotgrid_collection(avalon_project["name"])
shotgrid_entity_hierarchy_row = sg_col.find_one(
{"_id": avalon_asset["name"]}
)
if shotgrid_entity_hierarchy_row:
return {
"type": shotgrid_entity_hierarchy_row["type"],
"id": shotgrid_entity_hierarchy_row["src_id"],
}
return {}

View file

@ -0,0 +1,132 @@
import os
import sys
import pyblish.api
import shotgun_api3
from shotgun_api3.shotgun import AuthenticationFault
from openpype.lib import OpenPypeSettingsRegistry
from openpype.api import get_project_settings, get_system_settings
class CollectShotgridSession(pyblish.api.ContextPlugin):
"""Collect shotgrid session using user credentials"""
order = pyblish.api.CollectorOrder
label = "Shotgrid user session"
def process(self, context):
certificate_path = os.getenv("SHOTGUN_API_CACERTS")
if certificate_path is None or not os.path.exists(certificate_path):
self.log.info(
"SHOTGUN_API_CACERTS does not contains a valid \
path: {}".format(
certificate_path
)
)
certificate_path = get_shotgrid_certificate()
self.log.info("Get Certificate from shotgrid_api")
if not os.path.exists(certificate_path):
self.log.error(
"Could not find certificate in shotgun_api3: \
{}".format(
certificate_path
)
)
return
set_shotgrid_certificate(certificate_path)
self.log.info("Set Certificate: {}".format(certificate_path))
avalon_project = os.getenv("AVALON_PROJECT")
shotgrid_settings = get_shotgrid_settings(avalon_project)
self.log.info("shotgrid settings: {}".format(shotgrid_settings))
shotgrid_servers_settings = get_shotgrid_servers()
self.log.info(
"shotgrid_servers_settings: {}".format(shotgrid_servers_settings)
)
shotgrid_server = shotgrid_settings.get("shotgrid_server", "")
if not shotgrid_server:
self.log.error(
"No Shotgrid server found, please choose a credential"
"in script name and script key in OpenPype settings"
)
shotgrid_server_setting = shotgrid_servers_settings.get(
shotgrid_server, {}
)
shotgrid_url = shotgrid_server_setting.get("shotgrid_url", "")
shotgrid_script_name = shotgrid_server_setting.get(
"shotgrid_script_name", ""
)
shotgrid_script_key = shotgrid_server_setting.get(
"shotgrid_script_key", ""
)
if not shotgrid_script_name and not shotgrid_script_key:
self.log.error(
"No Shotgrid api credential found, please enter "
"script name and script key in OpenPype settings"
)
login = get_login() or os.getenv("OPENPYPE_SG_USER")
if not login:
self.log.error(
"No Shotgrid login found, please "
"login to shotgrid withing openpype Tray"
)
session = shotgun_api3.Shotgun(
base_url=shotgrid_url,
script_name=shotgrid_script_name,
api_key=shotgrid_script_key,
sudo_as_login=login,
)
try:
session.preferences_read()
except AuthenticationFault:
raise ValueError(
"Could not connect to shotgrid {} with user {}".format(
shotgrid_url, login
)
)
self.log.info(
"Logged to shotgrid {} with user {}".format(shotgrid_url, login)
)
context.data["shotgridSession"] = session
context.data["shotgridUser"] = login
def get_shotgrid_certificate():
shotgun_api_path = os.path.dirname(shotgun_api3.__file__)
return os.path.join(shotgun_api_path, "lib", "certifi", "cacert.pem")
def set_shotgrid_certificate(certificate):
os.environ["SHOTGUN_API_CACERTS"] = certificate
def get_shotgrid_settings(project):
return get_project_settings(project).get("shotgrid", {})
def get_shotgrid_servers():
return (
get_system_settings()
.get("modules", {})
.get("shotgrid", {})
.get("shotgrid_settings", {})
)
def get_login():
reg = OpenPypeSettingsRegistry()
try:
return str(reg.get_item("shotgrid_login"))
except Exception as e:
return None

View file

@ -0,0 +1,77 @@
import os
import pyblish.api
class IntegrateShotgridPublish(pyblish.api.InstancePlugin):
"""
Create published Files from representations and add it to version. If
representation is tagged add shotgrid review, it will add it in
path to movie for a movie file or path to frame for an image sequence.
"""
order = pyblish.api.IntegratorOrder + 0.499
label = "Shotgrid Published Files"
def process(self, instance):
context = instance.context
self.sg = context.data.get("shotgridSession")
shotgrid_version = instance.data.get("shotgridVersion")
for representation in instance.data.get("representations", []):
local_path = representation.get("published_path")
code = os.path.basename(local_path)
if representation.get("tags", []):
continue
published_file = self._find_existing_publish(
code, context, shotgrid_version
)
published_file_data = {
"project": context.data.get("shotgridProject"),
"code": code,
"entity": context.data.get("shotgridEntity"),
"task": context.data.get("shotgridTask"),
"version": shotgrid_version,
"path": {"local_path": local_path},
}
if not published_file:
published_file = self._create_published(published_file_data)
self.log.info(
"Create Shotgrid PublishedFile: {}".format(published_file)
)
else:
self.sg.update(
published_file["type"],
published_file["id"],
published_file_data,
)
self.log.info(
"Update Shotgrid PublishedFile: {}".format(published_file)
)
if instance.data["family"] == "image":
self.sg.upload_thumbnail(
published_file["type"], published_file["id"], local_path
)
instance.data["shotgridPublishedFile"] = published_file
def _find_existing_publish(self, code, context, shotgrid_version):
filters = [
["project", "is", context.data.get("shotgridProject")],
["task", "is", context.data.get("shotgridTask")],
["entity", "is", context.data.get("shotgridEntity")],
["version", "is", shotgrid_version],
["code", "is", code],
]
return self.sg.find_one("PublishedFile", filters, [])
def _create_published(self, published_file_data):
return self.sg.create("PublishedFile", published_file_data)

View file

@ -0,0 +1,92 @@
import os
import pyblish.api
class IntegrateShotgridVersion(pyblish.api.InstancePlugin):
"""Integrate Shotgrid Version"""
order = pyblish.api.IntegratorOrder + 0.497
label = "Shotgrid Version"
sg = None
def process(self, instance):
context = instance.context
self.sg = context.data.get("shotgridSession")
# TODO: Use path template solver to build version code from settings
anatomy = instance.data.get("anatomyData", {})
code = "_".join(
[
anatomy["project"]["code"],
anatomy["parent"],
anatomy["asset"],
anatomy["task"]["name"],
"v{:03}".format(int(anatomy["version"])),
]
)
version = self._find_existing_version(code, context)
if not version:
version = self._create_version(code, context)
self.log.info("Create Shotgrid version: {}".format(version))
else:
self.log.info("Use existing Shotgrid version: {}".format(version))
data_to_update = {}
status = context.data.get("intent", {}).get("value")
if status:
data_to_update["sg_status_list"] = status
for representation in instance.data.get("representations", []):
local_path = representation.get("published_path")
code = os.path.basename(local_path)
if "shotgridreview" in representation.get("tags", []):
if representation["ext"] in ["mov", "avi"]:
self.log.info(
"Upload review: {} for version shotgrid {}".format(
local_path, version.get("id")
)
)
self.sg.upload(
"Version",
version.get("id"),
local_path,
field_name="sg_uploaded_movie",
)
data_to_update["sg_path_to_movie"] = local_path
elif representation["ext"] in ["jpg", "png", "exr", "tga"]:
path_to_frame = local_path.replace("0000", "#")
data_to_update["sg_path_to_frames"] = path_to_frame
self.log.info("Update Shotgrid version with {}".format(data_to_update))
self.sg.update("Version", version["id"], data_to_update)
instance.data["shotgridVersion"] = version
def _find_existing_version(self, code, context):
filters = [
["project", "is", context.data.get("shotgridProject")],
["sg_task", "is", context.data.get("shotgridTask")],
["entity", "is", context.data.get("shotgridEntity")],
["code", "is", code],
]
return self.sg.find_one("Version", filters, [])
def _create_version(self, code, context):
version_data = {
"project": context.data.get("shotgridProject"),
"sg_task": context.data.get("shotgridTask"),
"entity": context.data.get("shotgridEntity"),
"code": code,
}
return self.sg.create("Version", version_data)

View file

@ -0,0 +1,36 @@
import pyblish.api
import openpype.api
class ValidateShotgridUser(pyblish.api.ContextPlugin):
"""
Check if user is valid and have access to the project.
"""
label = "Validate Shotgrid User"
order = openpype.api.ValidateContentsOrder
def process(self, context):
sg = context.data.get('shotgridSession')
login = context.data.get('shotgridUser')
self.log.info("Login shotgrid set in OpenPype is {}".format(login))
project = context.data.get("shotgridProject")
self.log.info("Current shotgun project is {}".format(project))
if not (login and sg and project):
raise KeyError()
user = sg.find_one(
"HumanUser",
[["login", "is", login]],
["projects"]
)
self.log.info(user)
self.log.info(login)
user_projects_id = [p["id"] for p in user.get("projects", [])]
if not project.get('id') in user_projects_id:
raise PermissionError("Login {} don't have access to the project {}".format(login, project))
self.log.info("Login {} have access to the project {}".format(login, project))

View file

@ -0,0 +1,5 @@
### Shotgrid server
Please refer to the external project that covers Openpype/Shotgrid communication:
- https://github.com/Ellipsanime/shotgrid-leecher

View file

@ -0,0 +1,62 @@
import os
import threading
from typing import Optional, Dict, Any
from openpype_interfaces import (
ITrayModule,
IPluginPaths,
ILaunchHookPaths,
)
from openpype.modules import OpenPypeModule
from .aop.patch import patch_avalon_db
from .tray.shotgrid_tray import (
ShotgridTrayWrapper,
)
SHOTGRID_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
class ShotgridModule(
OpenPypeModule, ITrayModule, IPluginPaths, ILaunchHookPaths
):
leecher_manager_url: str
name: str = "shotgrid"
enabled: bool = False
project_id: Optional[str] = None
tray_wrapper: ShotgridTrayWrapper
def initialize(self, modules_settings: Dict[str, Any]):
patch_avalon_db()
threading.Timer(10.0, patch_avalon_db).start()
shotgrid_settings = modules_settings.get(self.name, dict())
self.enabled = shotgrid_settings.get("enabled", False)
self.leecher_manager_url = shotgrid_settings.get(
"leecher_manager_url", ""
)
def connect_with_modules(self, enabled_modules):
pass
def get_global_environments(self) -> Dict[str, Any]:
return {"PROJECT_ID": self.project_id}
def get_plugin_paths(self) -> Dict[str, Any]:
return {
"publish": [os.path.join(SHOTGRID_MODULE_DIR, "plugins", "publish")]
}
def get_launch_hook_paths(self) -> str:
return os.path.join(SHOTGRID_MODULE_DIR, "hooks")
def tray_init(self):
self.tray_wrapper = ShotgridTrayWrapper(self)
def tray_start(self):
return self.tray_wrapper.validate()
def tray_exit(self, *args, **kwargs):
return self.tray_wrapper
def tray_menu(self, tray_menu):
return self.tray_wrapper.tray_menu(tray_menu)

View file

@ -0,0 +1,34 @@
import pytest
from assertpy import assert_that
import openpype.modules.shotgrid.lib.credentials as sut
def test_missing_shotgrid_url():
with pytest.raises(Exception) as ex:
# arrange
url = ""
# act
sut.get_shotgrid_hostname(url)
# assert
assert_that(ex).is_equal_to("Shotgrid url cannot be a null")
def test_full_shotgrid_url():
# arrange
url = "https://shotgrid.com/myinstance"
# act
actual = sut.get_shotgrid_hostname(url)
# assert
assert_that(actual).is_not_empty()
assert_that(actual).is_equal_to("shotgrid.com")
def test_incomplete_shotgrid_url():
# arrange
url = "shotgrid.com/myinstance"
# act
actual = sut.get_shotgrid_hostname(url)
# assert
assert_that(actual).is_not_empty()
assert_that(actual).is_equal_to("shotgrid.com")

View file

@ -0,0 +1,202 @@
import os
from typing import Any
from Qt import QtCore, QtWidgets, QtGui
from openpype import style
from openpype import resources
from openpype.modules.shotgrid.lib import settings, credentials
class CredentialsDialog(QtWidgets.QDialog):
SIZE_W = 450
SIZE_H = 200
_module: Any = None
_is_logged: bool = False
url_label: QtWidgets.QLabel
login_label: QtWidgets.QLabel
password_label: QtWidgets.QLabel
url_input: QtWidgets.QComboBox
login_input: QtWidgets.QLineEdit
password_input: QtWidgets.QLineEdit
input_layout: QtWidgets.QFormLayout
login_button: QtWidgets.QPushButton
buttons_layout: QtWidgets.QHBoxLayout
main_widget: QtWidgets.QVBoxLayout
login_changed: QtCore.Signal = QtCore.Signal()
def __init__(self, module, parent=None):
super(CredentialsDialog, self).__init__(parent)
self._module = module
self._is_logged = False
self.setWindowTitle("OpenPype - Shotgrid Login")
icon = QtGui.QIcon(resources.get_openpype_icon_filepath())
self.setWindowIcon(icon)
self.setWindowFlags(
QtCore.Qt.WindowCloseButtonHint
| QtCore.Qt.WindowMinimizeButtonHint
)
self.setMinimumSize(QtCore.QSize(self.SIZE_W, self.SIZE_H))
self.setMaximumSize(QtCore.QSize(self.SIZE_W + 100, self.SIZE_H + 100))
self.setStyleSheet(style.load_stylesheet())
self.ui_init()
def ui_init(self):
self.url_label = QtWidgets.QLabel("Shotgrid server:")
self.login_label = QtWidgets.QLabel("Login:")
self.password_label = QtWidgets.QLabel("Password:")
self.url_input = QtWidgets.QComboBox()
# self.url_input.setReadOnly(True)
self.login_input = QtWidgets.QLineEdit()
self.login_input.setPlaceholderText("login")
self.password_input = QtWidgets.QLineEdit()
self.password_input.setPlaceholderText("password")
self.password_input.setEchoMode(QtWidgets.QLineEdit.Password)
self.error_label = QtWidgets.QLabel("")
self.error_label.setStyleSheet("color: red;")
self.error_label.setWordWrap(True)
self.error_label.hide()
self.input_layout = QtWidgets.QFormLayout()
self.input_layout.setContentsMargins(10, 15, 10, 5)
self.input_layout.addRow(self.url_label, self.url_input)
self.input_layout.addRow(self.login_label, self.login_input)
self.input_layout.addRow(self.password_label, self.password_input)
self.input_layout.addRow(self.error_label)
self.login_button = QtWidgets.QPushButton("Login")
self.login_button.setToolTip("Log in shotgrid instance")
self.login_button.clicked.connect(self._on_shotgrid_login_clicked)
self.logout_button = QtWidgets.QPushButton("Logout")
self.logout_button.setToolTip("Log out shotgrid instance")
self.logout_button.clicked.connect(self._on_shotgrid_logout_clicked)
self.buttons_layout = QtWidgets.QHBoxLayout()
self.buttons_layout.addWidget(self.logout_button)
self.buttons_layout.addWidget(self.login_button)
self.main_widget = QtWidgets.QVBoxLayout(self)
self.main_widget.addLayout(self.input_layout)
self.main_widget.addLayout(self.buttons_layout)
self.setLayout(self.main_widget)
def show(self, *args, **kwargs):
super(CredentialsDialog, self).show(*args, **kwargs)
self._fill_shotgrid_url()
self._fill_shotgrid_login()
def _fill_shotgrid_url(self):
servers = settings.get_shotgrid_servers()
if servers:
for _, v in servers.items():
self.url_input.addItem("{}".format(v.get('shotgrid_url')))
self._valid_input(self.url_input)
self.login_button.show()
self.logout_button.show()
enabled = True
else:
self.set_error("Ask your admin to add shotgrid server in settings")
self._invalid_input(self.url_input)
self.login_button.hide()
self.logout_button.hide()
enabled = False
self.login_input.setEnabled(enabled)
self.password_input.setEnabled(enabled)
def _fill_shotgrid_login(self):
login = credentials.get_local_login()
if login:
self.login_input.setText(login)
def _clear_shotgrid_login(self):
self.login_input.setText("")
self.password_input.setText("")
def _on_shotgrid_login_clicked(self):
login = self.login_input.text().strip()
password = self.password_input.text().strip()
missing = []
if login == "":
missing.append("login")
self._invalid_input(self.login_input)
if password == "":
missing.append("password")
self._invalid_input(self.password_input)
url = self.url_input.currentText()
if url == "":
missing.append("url")
self._invalid_input(self.url_input)
if len(missing) > 0:
self.set_error("You didn't enter {}".format(" and ".join(missing)))
return
# if credentials.check_credentials(
# login=login,
# password=password,
# shotgrid_url=url,
# ):
credentials.save_local_login(
login=login
)
os.environ['OPENPYPE_SG_USER'] = login
self._on_login()
self.set_error("CANT LOGIN")
def _on_shotgrid_logout_clicked(self):
credentials.clear_local_login()
del os.environ['OPENPYPE_SG_USER']
self._clear_shotgrid_login()
self._on_logout()
def set_error(self, msg: str):
self.error_label.setText(msg)
self.error_label.show()
def _on_login(self):
self._is_logged = True
self.login_changed.emit()
self._close_widget()
def _on_logout(self):
self._is_logged = False
self.login_changed.emit()
def _close_widget(self):
self.hide()
def _valid_input(self, input_widget: QtWidgets.QLineEdit):
input_widget.setStyleSheet("")
def _invalid_input(self, input_widget: QtWidgets.QLineEdit):
input_widget.setStyleSheet("border: 1px solid red;")
def login_with_credentials(
self, url: str, login: str, password: str
) -> bool:
verification = credentials.check_credentials(url, login, password)
if verification:
credentials.save_credentials(login, password, False)
self._module.set_credentials_to_env(login, password)
self.set_credentials(login, password)
self.login_changed.emit()
return verification

View file

@ -0,0 +1,76 @@
import os
import webbrowser
from typing import Any
from Qt import QtWidgets
from openpype.modules.shotgrid.lib import credentials
from openpype.modules.shotgrid.tray.credential_dialog import (
CredentialsDialog,
)
class ShotgridTrayWrapper:
module: Any
credentials_dialog: CredentialsDialog
logged_user_label: QtWidgets.QAction
def __init__(self, module) -> None:
self.module = module
self.credentials_dialog = CredentialsDialog(module)
self.credentials_dialog.login_changed.connect(self.set_login_label)
self.logged_user_label = QtWidgets.QAction("")
self.logged_user_label.setDisabled(True)
self.set_login_label()
def show_batch_dialog(self):
if self.module.leecher_manager_url:
webbrowser.open(self.module.leecher_manager_url)
def show_connect_dialog(self):
self.show_credential_dialog()
def show_credential_dialog(self):
self.credentials_dialog.show()
self.credentials_dialog.activateWindow()
self.credentials_dialog.raise_()
def set_login_label(self):
login = credentials.get_local_login()
if login:
self.logged_user_label.setText("{}".format(login))
else:
self.logged_user_label.setText(
"No User logged in {0}".format(login)
)
def tray_menu(self, tray_menu):
# Add login to user menu
menu = QtWidgets.QMenu("Shotgrid", tray_menu)
show_connect_action = QtWidgets.QAction("Connect to Shotgrid", menu)
show_connect_action.triggered.connect(self.show_connect_dialog)
menu.addAction(self.logged_user_label)
menu.addSeparator()
menu.addAction(show_connect_action)
tray_menu.addMenu(menu)
# Add manager to Admin menu
for m in tray_menu.findChildren(QtWidgets.QMenu):
if m.title() == "Admin":
shotgrid_manager_action = QtWidgets.QAction(
"Shotgrid manager", menu
)
shotgrid_manager_action.triggered.connect(
self.show_batch_dialog
)
m.addAction(shotgrid_manager_action)
def validate(self) -> bool:
login = credentials.get_local_login()
if not login:
self.show_credential_dialog()
else:
os.environ["OPENPYPE_SG_USER"] = login
return True

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

View file

@ -0,0 +1,22 @@
{
"shotgrid_project_id": 0,
"shotgrid_server": "",
"event": {
"enabled": false
},
"fields": {
"asset": {
"type": "sg_asset_type"
},
"sequence": {
"episode_link": "episode"
},
"shot": {
"episode_link": "sg_episode",
"sequence_link": "sg_sequence"
},
"task": {
"step": "step"
}
}
}

View file

@ -137,6 +137,13 @@
}
}
},
"shotgrid": {
"enabled": true,
"filter_projects_by_login": true,
"leecher_manager_url": "http://127.0.0.1:3000",
"leecher_backend_url": "http://127.0.0.1:8090",
"shotgrid_settings": {}
},
"timers_manager": {
"enabled": true,
"auto_stop": true,
@ -205,4 +212,4 @@
"linux": ""
}
}
}
}

View file

@ -107,6 +107,7 @@ from .enum_entity import (
TaskTypeEnumEntity,
DeadlineUrlEnumEntity,
AnatomyTemplatesEnumEntity,
ShotgridUrlEnumEntity
)
from .list_entity import ListEntity
@ -171,6 +172,7 @@ __all__ = (
"ToolsEnumEntity",
"TaskTypeEnumEntity",
"DeadlineUrlEnumEntity",
"ShotgridUrlEnumEntity",
"AnatomyTemplatesEnumEntity",
"ListEntity",

View file

@ -1,10 +1,7 @@
import copy
from .input_entities import InputEntity
from .exceptions import EntitySchemaError
from .lib import (
NOT_SET,
STRING_TYPE
)
from .lib import NOT_SET, STRING_TYPE
class BaseEnumEntity(InputEntity):
@ -26,15 +23,13 @@ class BaseEnumEntity(InputEntity):
for item in self.enum_items:
key = tuple(item.keys())[0]
if key in enum_keys:
reason = "Key \"{}\" is more than once in enum items.".format(
key
)
reason = 'Key "{}" is more than once in enum items.'.format(key)
raise EntitySchemaError(self, reason)
enum_keys.add(key)
if not isinstance(key, STRING_TYPE):
reason = "Key \"{}\" has invalid type {}, expected {}.".format(
reason = 'Key "{}" has invalid type {}, expected {}.'.format(
key, type(key), STRING_TYPE
)
raise EntitySchemaError(self, reason)
@ -59,7 +54,7 @@ class BaseEnumEntity(InputEntity):
for item in check_values:
if item not in self.valid_keys:
raise ValueError(
"{} Invalid value \"{}\". Expected one of: {}".format(
'{} Invalid value "{}". Expected one of: {}'.format(
self.path, item, self.valid_keys
)
)
@ -84,7 +79,7 @@ class EnumEntity(BaseEnumEntity):
self.valid_keys = set(all_keys)
if self.multiselection:
self.valid_value_types = (list, )
self.valid_value_types = (list,)
value_on_not_set = []
if enum_default:
if not isinstance(enum_default, list):
@ -109,7 +104,7 @@ class EnumEntity(BaseEnumEntity):
self.value_on_not_set = key
break
self.valid_value_types = (STRING_TYPE, )
self.valid_value_types = (STRING_TYPE,)
# GUI attribute
self.placeholder = self.schema_data.get("placeholder")
@ -152,6 +147,7 @@ class HostsEnumEntity(BaseEnumEntity):
Host name is not the same as application name. Host name defines
implementation instead of application name.
"""
schema_types = ["hosts-enum"]
all_host_names = [
"aftereffects",
@ -169,7 +165,7 @@ class HostsEnumEntity(BaseEnumEntity):
"tvpaint",
"unreal",
"standalonepublisher",
"webpublisher"
"webpublisher",
]
def _item_initialization(self):
@ -210,7 +206,7 @@ class HostsEnumEntity(BaseEnumEntity):
self.valid_keys = valid_keys
if self.multiselection:
self.valid_value_types = (list, )
self.valid_value_types = (list,)
self.value_on_not_set = []
else:
for key in valid_keys:
@ -218,7 +214,7 @@ class HostsEnumEntity(BaseEnumEntity):
self.value_on_not_set = key
break
self.valid_value_types = (STRING_TYPE, )
self.valid_value_types = (STRING_TYPE,)
# GUI attribute
self.placeholder = self.schema_data.get("placeholder")
@ -226,14 +222,10 @@ class HostsEnumEntity(BaseEnumEntity):
def schema_validations(self):
if self.hosts_filter:
enum_len = len(self.enum_items)
if (
enum_len == 0
or (enum_len == 1 and self.use_empty_value)
):
joined_filters = ", ".join([
'"{}"'.format(item)
for item in self.hosts_filter
])
if enum_len == 0 or (enum_len == 1 and self.use_empty_value):
joined_filters = ", ".join(
['"{}"'.format(item) for item in self.hosts_filter]
)
reason = (
"All host names were removed after applying"
" host filters. {}"
@ -246,24 +238,25 @@ class HostsEnumEntity(BaseEnumEntity):
invalid_filters.add(item)
if invalid_filters:
joined_filters = ", ".join([
'"{}"'.format(item)
for item in self.hosts_filter
])
expected_hosts = ", ".join([
'"{}"'.format(item)
for item in self.all_host_names
])
self.log.warning((
"Host filters containt invalid host names:"
" \"{}\" Expected values are {}"
).format(joined_filters, expected_hosts))
joined_filters = ", ".join(
['"{}"'.format(item) for item in self.hosts_filter]
)
expected_hosts = ", ".join(
['"{}"'.format(item) for item in self.all_host_names]
)
self.log.warning(
(
"Host filters containt invalid host names:"
' "{}" Expected values are {}'
).format(joined_filters, expected_hosts)
)
super(HostsEnumEntity, self).schema_validations()
class AppsEnumEntity(BaseEnumEntity):
"""Enum of applications for project anatomy attributes."""
schema_types = ["apps-enum"]
def _item_initialization(self):
@ -271,7 +264,7 @@ class AppsEnumEntity(BaseEnumEntity):
self.value_on_not_set = []
self.enum_items = []
self.valid_keys = set()
self.valid_value_types = (list, )
self.valid_value_types = (list,)
self.placeholder = None
def _get_enum_values(self):
@ -352,7 +345,7 @@ class ToolsEnumEntity(BaseEnumEntity):
self.value_on_not_set = []
self.enum_items = []
self.valid_keys = set()
self.valid_value_types = (list, )
self.valid_value_types = (list,)
self.placeholder = None
def _get_enum_values(self):
@ -409,10 +402,10 @@ class TaskTypeEnumEntity(BaseEnumEntity):
def _item_initialization(self):
self.multiselection = self.schema_data.get("multiselection", True)
if self.multiselection:
self.valid_value_types = (list, )
self.valid_value_types = (list,)
self.value_on_not_set = []
else:
self.valid_value_types = (STRING_TYPE, )
self.valid_value_types = (STRING_TYPE,)
self.value_on_not_set = ""
self.enum_items = []
@ -507,7 +500,8 @@ class DeadlineUrlEnumEntity(BaseEnumEntity):
enum_items_list = []
for server_name, url_entity in deadline_urls_entity.items():
enum_items_list.append(
{server_name: "{}: {}".format(server_name, url_entity.value)})
{server_name: "{}: {}".format(server_name, url_entity.value)}
)
valid_keys.add(server_name)
return enum_items_list, valid_keys
@ -530,6 +524,50 @@ class DeadlineUrlEnumEntity(BaseEnumEntity):
self._current_value = tuple(self.valid_keys)[0]
class ShotgridUrlEnumEntity(BaseEnumEntity):
schema_types = ["shotgrid_url-enum"]
def _item_initialization(self):
self.multiselection = False
self.enum_items = []
self.valid_keys = set()
self.valid_value_types = (STRING_TYPE,)
self.value_on_not_set = ""
# GUI attribute
self.placeholder = self.schema_data.get("placeholder")
def _get_enum_values(self):
shotgrid_settings = self.get_entity_from_path(
"system_settings/modules/shotgrid/shotgrid_settings"
)
valid_keys = set()
enum_items_list = []
for server_name, settings in shotgrid_settings.items():
enum_items_list.append(
{
server_name: "{}: {}".format(
server_name, settings["shotgrid_url"].value
)
}
)
valid_keys.add(server_name)
return enum_items_list, valid_keys
def set_override_state(self, *args, **kwargs):
super(ShotgridUrlEnumEntity, self).set_override_state(*args, **kwargs)
self.enum_items, self.valid_keys = self._get_enum_values()
if not self.valid_keys:
self._current_value = ""
elif self._current_value not in self.valid_keys:
self._current_value = tuple(self.valid_keys)[0]
class AnatomyTemplatesEnumEntity(BaseEnumEntity):
schema_types = ["anatomy-templates-enum"]

View file

@ -62,6 +62,10 @@
"type": "schema",
"name": "schema_project_ftrack"
},
{
"type": "schema",
"name": "schema_project_shotgrid"
},
{
"type": "schema",
"name": "schema_project_deadline"

View file

@ -0,0 +1,98 @@
{
"type": "dict",
"key": "shotgrid",
"label": "Shotgrid",
"collapsible": true,
"is_file": true,
"children": [
{
"type": "number",
"key": "shotgrid_project_id",
"label": "Shotgrid project id"
},
{
"type": "shotgrid_url-enum",
"key": "shotgrid_server",
"label": "Shotgrid Server"
},
{
"type": "dict",
"key": "event",
"label": "Event Handler",
"collapsible": true,
"checkbox_key": "enabled",
"children": [
{
"type": "boolean",
"key": "enabled",
"label": "Enabled"
}
]
},
{
"type": "dict",
"key": "fields",
"label": "Fields Template",
"collapsible": true,
"children": [
{
"type": "dict",
"key": "asset",
"label": "Asset",
"collapsible": true,
"children": [
{
"type": "text",
"key": "type",
"label": "Asset Type"
}
]
},
{
"type": "dict",
"key": "sequence",
"label": "Sequence",
"collapsible": true,
"children": [
{
"type": "text",
"key": "episode_link",
"label": "Episode link"
}
]
},
{
"type": "dict",
"key": "shot",
"label": "Shot",
"collapsible": true,
"children": [
{
"type": "text",
"key": "episode_link",
"label": "Episode link"
},
{
"type": "text",
"key": "sequence_link",
"label": "Sequence link"
}
]
},
{
"type": "dict",
"key": "task",
"label": "Task",
"collapsible": true,
"children": [
{
"type": "text",
"key": "step",
"label": "Step link"
}
]
}
]
}
]
}

View file

@ -13,6 +13,9 @@
{
"ftrackreview": "Add review to Ftrack"
},
{
"shotgridreview": "Add review to Shotgrid"
},
{
"delete": "Delete output"
},

View file

@ -44,6 +44,60 @@
"type": "schema",
"name": "schema_ftrack"
},
{
"type": "dict",
"key": "shotgrid",
"label": "Shotgrid",
"collapsible": true,
"checkbox_key": "enabled",
"children": [
{
"type": "boolean",
"key": "enabled",
"label": "Enabled"
},
{
"type": "text",
"key": "leecher_manager_url",
"label": "Shotgrid Leecher Manager URL"
},
{
"type": "text",
"key": "leecher_backend_url",
"label": "Shotgrid Leecher Backend URL"
},
{
"type": "boolean",
"key": "filter_projects_by_login",
"label": "Filter projects by SG login"
},
{
"type": "dict-modifiable",
"key": "shotgrid_settings",
"label": "Shotgrid Servers",
"object_type": {
"type": "dict",
"children": [
{
"key": "shotgrid_url",
"label": "Server URL",
"type": "text"
},
{
"key": "shotgrid_script_name",
"label": "Script Name",
"type": "text"
},
{
"key": "shotgrid_script_key",
"label": "Script api key",
"type": "text"
}
]
}
}
]
},
{
"type": "dict",
"key": "timers_manager",

View file

@ -40,6 +40,7 @@ clique = "1.6.*"
Click = "^7"
dnspython = "^2.1.0"
ftrack-python-api = "2.0.*"
shotgun_api3 = {git = "https://github.com/shotgunsoftware/python-api.git", rev = "v3.3.3"}
google-api-python-client = "^1.12.8" # sync server google support (should be separate?)
jsonschema = "^2.6.0"
keyring = "^22.0.1"