mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-25 05:14:40 +01:00
* WIP clockify fix * WIP disable wp validation, make sync work * fix launcher start timer action * fix finish time entry * fix start and stop timers, cleanup, add TODO * show task name and type in description, add TODO * change rate limiter constants * black formatting * remove task type from data * cleanup debug prints * fix hound comments * remove unused import * move ids to property, fix user validation * remove f-strings, rollback description parsing * attempt to fix ftrack actions * check if sync action got some projects * get api data on process * remove unused variable * remove ratelimiter dependency * add response validation * a bit cleanup * Update openpype/modules/clockify/clockify_module.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * Update openpype/modules/clockify/clockify_api.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * Update openpype/modules/clockify/clockify_api.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * Update openpype/modules/clockify/clockify_api.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * Update openpype/modules/clockify/ftrack/server/action_clockify_sync_server.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * replace dunders with underscores * remove excessive variables * update set_user_id * continue check_running if no timer found * bring back come py2 compatibility * cleanup * get values directly from clockapi * hound * get task type to fill the tag field correctly * add logger, catch some json errors * remove check running timer, add project_id verification module * add current task_id check * remove package entries * make method private, fix typo * get task_type for the idle-restarted timer * remove trailing whitespace * show correct idle countdown values * finx indentation * Update openpype/modules/clockify/clockify_api.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * Update openpype/modules/clockify/clockify_module.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> * revert lock file * remove unused constants and redundant code * import clockify_api inside the method * do not query asset docs double time, add comments * add permissions check fail Exception * rename clockapi to clockify_api * formatting * removed unused variables --------- Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Co-authored-by: Jakub Trllo <jakub.trllo@gmail.com>
447 lines
14 KiB
Python
447 lines
14 KiB
Python
import os
|
|
import re
|
|
import time
|
|
import json
|
|
import datetime
|
|
import requests
|
|
from .constants import (
|
|
CLOCKIFY_ENDPOINT,
|
|
ADMIN_PERMISSION_NAMES,
|
|
)
|
|
|
|
from openpype.lib.local_settings import OpenPypeSecureRegistry
|
|
from openpype.lib import Logger
|
|
|
|
|
|
class ClockifyAPI:
|
|
log = Logger.get_logger(__name__)
|
|
|
|
def __init__(self, api_key=None, master_parent=None):
|
|
self.workspace_name = None
|
|
self.master_parent = master_parent
|
|
self.api_key = api_key
|
|
self._workspace_id = None
|
|
self._user_id = None
|
|
self._secure_registry = None
|
|
|
|
@property
|
|
def secure_registry(self):
|
|
if self._secure_registry is None:
|
|
self._secure_registry = OpenPypeSecureRegistry("clockify")
|
|
return self._secure_registry
|
|
|
|
@property
|
|
def headers(self):
|
|
return {"x-api-key": self.api_key}
|
|
|
|
@property
|
|
def workspace_id(self):
|
|
return self._workspace_id
|
|
|
|
@property
|
|
def user_id(self):
|
|
return self._user_id
|
|
|
|
def verify_api(self):
|
|
for key, value in self.headers.items():
|
|
if value is None or value.strip() == "":
|
|
return False
|
|
return True
|
|
|
|
def set_api(self, api_key=None):
|
|
if api_key is None:
|
|
api_key = self.get_api_key()
|
|
|
|
if api_key is not None and self.validate_api_key(api_key) is True:
|
|
self.api_key = api_key
|
|
self.set_workspace()
|
|
self.set_user_id()
|
|
if self.master_parent:
|
|
self.master_parent.signed_in()
|
|
return True
|
|
return False
|
|
|
|
def validate_api_key(self, api_key):
|
|
test_headers = {"x-api-key": api_key}
|
|
action_url = "user"
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=test_headers
|
|
)
|
|
if response.status_code != 200:
|
|
return False
|
|
return True
|
|
|
|
def validate_workspace_permissions(self, workspace_id=None, user_id=None):
|
|
if user_id is None:
|
|
self.log.info("No user_id found during validation")
|
|
return False
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = f"workspaces/{workspace_id}/users?includeRoles=1"
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
data = response.json()
|
|
for user in data:
|
|
if user.get("id") == user_id:
|
|
roles_data = user.get("roles")
|
|
for entities in roles_data:
|
|
if entities.get("role") in ADMIN_PERMISSION_NAMES:
|
|
return True
|
|
return False
|
|
|
|
def get_user_id(self):
|
|
action_url = "user"
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
result = response.json()
|
|
user_id = result.get("id", None)
|
|
|
|
return user_id
|
|
|
|
def set_workspace(self, name=None):
|
|
if name is None:
|
|
name = os.environ.get("CLOCKIFY_WORKSPACE", None)
|
|
self.workspace_name = name
|
|
if self.workspace_name is None:
|
|
return
|
|
try:
|
|
result = self.validate_workspace()
|
|
except Exception:
|
|
result = False
|
|
if result is not False:
|
|
self._workspace_id = result
|
|
if self.master_parent is not None:
|
|
self.master_parent.start_timer_check()
|
|
return True
|
|
return False
|
|
|
|
def validate_workspace(self, name=None):
|
|
if name is None:
|
|
name = self.workspace_name
|
|
all_workspaces = self.get_workspaces()
|
|
if name in all_workspaces:
|
|
return all_workspaces[name]
|
|
return False
|
|
|
|
def set_user_id(self):
|
|
try:
|
|
user_id = self.get_user_id()
|
|
except Exception:
|
|
user_id = None
|
|
if user_id is not None:
|
|
self._user_id = user_id
|
|
|
|
def get_api_key(self):
|
|
return self.secure_registry.get_item("api_key", None)
|
|
|
|
def save_api_key(self, api_key):
|
|
self.secure_registry.set_item("api_key", api_key)
|
|
|
|
def get_workspaces(self):
|
|
action_url = "workspaces/"
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
return {
|
|
workspace["name"]: workspace["id"] for workspace in response.json()
|
|
}
|
|
|
|
def get_projects(self, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = f"workspaces/{workspace_id}/projects"
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
if response.status_code != 403:
|
|
result = response.json()
|
|
return {project["name"]: project["id"] for project in result}
|
|
|
|
def get_project_by_id(self, project_id, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/projects/{}".format(
|
|
workspace_id, project_id
|
|
)
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
|
|
return response.json()
|
|
|
|
def get_tags(self, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/tags".format(workspace_id)
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
|
|
return {tag["name"]: tag["id"] for tag in response.json()}
|
|
|
|
def get_tasks(self, project_id, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/projects/{}/tasks".format(
|
|
workspace_id, project_id
|
|
)
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
|
|
return {task["name"]: task["id"] for task in response.json()}
|
|
|
|
def get_workspace_id(self, workspace_name):
|
|
all_workspaces = self.get_workspaces()
|
|
if workspace_name not in all_workspaces:
|
|
return None
|
|
return all_workspaces[workspace_name]
|
|
|
|
def get_project_id(self, project_name, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
all_projects = self.get_projects(workspace_id)
|
|
if project_name not in all_projects:
|
|
return None
|
|
return all_projects[project_name]
|
|
|
|
def get_tag_id(self, tag_name, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
all_tasks = self.get_tags(workspace_id)
|
|
if tag_name not in all_tasks:
|
|
return None
|
|
return all_tasks[tag_name]
|
|
|
|
def get_task_id(self, task_name, project_id, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
all_tasks = self.get_tasks(project_id, workspace_id)
|
|
if task_name not in all_tasks:
|
|
return None
|
|
return all_tasks[task_name]
|
|
|
|
def get_current_time(self):
|
|
return str(datetime.datetime.utcnow().isoformat()) + "Z"
|
|
|
|
def start_time_entry(
|
|
self,
|
|
description,
|
|
project_id,
|
|
task_id=None,
|
|
tag_ids=None,
|
|
workspace_id=None,
|
|
user_id=None,
|
|
billable=True,
|
|
):
|
|
# Workspace
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
# User ID
|
|
if user_id is None:
|
|
user_id = self._user_id
|
|
|
|
# get running timer to check if we need to start it
|
|
current_timer = self.get_in_progress()
|
|
|
|
# Check if is currently run another times and has same values
|
|
# DO not restart the timer, if it is already running for curent task
|
|
if current_timer:
|
|
current_timer_hierarchy = current_timer.get("description")
|
|
current_project_id = current_timer.get("projectId")
|
|
current_task_id = current_timer.get("taskId")
|
|
if (
|
|
description == current_timer_hierarchy
|
|
and project_id == current_project_id
|
|
and task_id == current_task_id
|
|
):
|
|
self.log.info(
|
|
"Timer for the current project is already running"
|
|
)
|
|
self.bool_timer_run = True
|
|
return self.bool_timer_run
|
|
self.finish_time_entry()
|
|
|
|
# Convert billable to strings
|
|
if billable:
|
|
billable = "true"
|
|
else:
|
|
billable = "false"
|
|
# Rest API Action
|
|
action_url = "workspaces/{}/user/{}/time-entries".format(
|
|
workspace_id, user_id
|
|
)
|
|
start = self.get_current_time()
|
|
body = {
|
|
"start": start,
|
|
"billable": billable,
|
|
"description": description,
|
|
"projectId": project_id,
|
|
"taskId": task_id,
|
|
"tagIds": tag_ids,
|
|
}
|
|
response = requests.post(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
if response.status_code < 300:
|
|
return True
|
|
return False
|
|
|
|
def _get_current_timer_values(self, response):
|
|
if response is None:
|
|
return
|
|
try:
|
|
output = response.json()
|
|
except json.decoder.JSONDecodeError:
|
|
return None
|
|
if output and isinstance(output, list):
|
|
return output[0]
|
|
return None
|
|
|
|
def get_in_progress(self, user_id=None, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
if user_id is None:
|
|
user_id = self.user_id
|
|
|
|
action_url = (
|
|
f"workspaces/{workspace_id}/user/"
|
|
f"{user_id}/time-entries?in-progress=1"
|
|
)
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
return self._get_current_timer_values(response)
|
|
|
|
def finish_time_entry(self, workspace_id=None, user_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
if user_id is None:
|
|
user_id = self.user_id
|
|
current_timer = self.get_in_progress()
|
|
if not current_timer:
|
|
return
|
|
action_url = "workspaces/{}/user/{}/time-entries".format(
|
|
workspace_id, user_id
|
|
)
|
|
body = {"end": self.get_current_time()}
|
|
response = requests.patch(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
return response.json()
|
|
|
|
def get_time_entries(self, workspace_id=None, user_id=None, quantity=10):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
if user_id is None:
|
|
user_id = self.user_id
|
|
action_url = "workspaces/{}/user/{}/time-entries".format(
|
|
workspace_id, user_id
|
|
)
|
|
response = requests.get(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
return response.json()[:quantity]
|
|
|
|
def remove_time_entry(self, tid, workspace_id=None, user_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/user/{}/time-entries/{}".format(
|
|
workspace_id, user_id, tid
|
|
)
|
|
response = requests.delete(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
|
|
)
|
|
return response.json()
|
|
|
|
def add_project(self, name, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/projects".format(workspace_id)
|
|
body = {
|
|
"name": name,
|
|
"clientId": "",
|
|
"isPublic": "false",
|
|
"estimate": {"estimate": 0, "type": "AUTO"},
|
|
"color": "#f44336",
|
|
"billable": "true",
|
|
}
|
|
response = requests.post(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
return response.json()
|
|
|
|
def add_workspace(self, name):
|
|
action_url = "workspaces/"
|
|
body = {"name": name}
|
|
response = requests.post(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
return response.json()
|
|
|
|
def add_task(self, name, project_id, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/projects/{}/tasks".format(
|
|
workspace_id, project_id
|
|
)
|
|
body = {"name": name, "projectId": project_id}
|
|
response = requests.post(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
return response.json()
|
|
|
|
def add_tag(self, name, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "workspaces/{}/tags".format(workspace_id)
|
|
body = {"name": name}
|
|
response = requests.post(
|
|
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
|
|
)
|
|
return response.json()
|
|
|
|
def delete_project(self, project_id, workspace_id=None):
|
|
if workspace_id is None:
|
|
workspace_id = self.workspace_id
|
|
action_url = "/workspaces/{}/projects/{}".format(
|
|
workspace_id, project_id
|
|
)
|
|
response = requests.delete(
|
|
CLOCKIFY_ENDPOINT + action_url,
|
|
headers=self.headers,
|
|
)
|
|
return response.json()
|
|
|
|
def convert_input(
|
|
self, entity_id, entity_name, mode="Workspace", project_id=None
|
|
):
|
|
if entity_id is None:
|
|
error = False
|
|
error_msg = 'Missing information "{}"'
|
|
if mode.lower() == "workspace":
|
|
if entity_id is None and entity_name is None:
|
|
if self.workspace_id is not None:
|
|
entity_id = self.workspace_id
|
|
else:
|
|
error = True
|
|
else:
|
|
entity_id = self.get_workspace_id(entity_name)
|
|
else:
|
|
if entity_id is None and entity_name is None:
|
|
error = True
|
|
elif mode.lower() == "project":
|
|
entity_id = self.get_project_id(entity_name)
|
|
elif mode.lower() == "task":
|
|
entity_id = self.get_task_id(
|
|
task_name=entity_name, project_id=project_id
|
|
)
|
|
else:
|
|
raise TypeError("Unknown type")
|
|
# Raise error
|
|
if error:
|
|
raise ValueError(error_msg.format(mode))
|
|
|
|
return entity_id
|