ayon-core/openpype/modules/clockify/clockify_api.py
2023-03-30 13:59:22 +02:00

447 lines
14 KiB
Python

import os
import re
import time
import json
import datetime
import requests
from .constants import (
CLOCKIFY_ENDPOINT,
ADMIN_PERMISSION_NAMES,
)
from openpype.lib.local_settings import OpenPypeSecureRegistry
from openpype.lib import Logger
class ClockifyAPI:
log = Logger.get_logger(__name__)
def __init__(self, api_key=None, master_parent=None):
self.workspace_name = None
self.master_parent = master_parent
self.api_key = api_key
self._workspace_id = None
self._user_id = None
self._secure_registry = None
@property
def secure_registry(self):
if self._secure_registry is None:
self._secure_registry = OpenPypeSecureRegistry("clockify")
return self._secure_registry
@property
def headers(self):
return {"x-api-key": self.api_key}
@property
def workspace_id(self):
return self._workspace_id
@property
def user_id(self):
return self._user_id
def verify_api(self):
for key, value in self.headers.items():
if value is None or value.strip() == "":
return False
return True
def set_api(self, api_key=None):
if api_key is None:
api_key = self.get_api_key()
if api_key is not None and self.validate_api_key(api_key) is True:
self.api_key = api_key
self.set_workspace()
self.set_user_id()
if self.master_parent:
self.master_parent.signed_in()
return True
return False
def validate_api_key(self, api_key):
test_headers = {"x-api-key": api_key}
action_url = "user"
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=test_headers
)
if response.status_code != 200:
return False
return True
def validate_workspace_permissions(self, workspace_id=None, user_id=None):
if user_id is None:
self.log.info("No user_id found during validation")
return False
if workspace_id is None:
workspace_id = self.workspace_id
action_url = f"workspaces/{workspace_id}/users?includeRoles=1"
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
data = response.json()
for user in data:
if user.get("id") == user_id:
roles_data = user.get("roles")
for entities in roles_data:
if entities.get("role") in ADMIN_PERMISSION_NAMES:
return True
return False
def get_user_id(self):
action_url = "user"
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
result = response.json()
user_id = result.get("id", None)
return user_id
def set_workspace(self, name=None):
if name is None:
name = os.environ.get("CLOCKIFY_WORKSPACE", None)
self.workspace_name = name
if self.workspace_name is None:
return
try:
result = self.validate_workspace()
except Exception:
result = False
if result is not False:
self._workspace_id = result
if self.master_parent is not None:
self.master_parent.start_timer_check()
return True
return False
def validate_workspace(self, name=None):
if name is None:
name = self.workspace_name
all_workspaces = self.get_workspaces()
if name in all_workspaces:
return all_workspaces[name]
return False
def set_user_id(self):
try:
user_id = self.get_user_id()
except Exception:
user_id = None
if user_id is not None:
self._user_id = user_id
def get_api_key(self):
return self.secure_registry.get_item("api_key", None)
def save_api_key(self, api_key):
self.secure_registry.set_item("api_key", api_key)
def get_workspaces(self):
action_url = "workspaces/"
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return {
workspace["name"]: workspace["id"] for workspace in response.json()
}
def get_projects(self, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = f"workspaces/{workspace_id}/projects"
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
if response.status_code != 403:
result = response.json()
return {project["name"]: project["id"] for project in result}
def get_project_by_id(self, project_id, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/projects/{}".format(
workspace_id, project_id
)
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return response.json()
def get_tags(self, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/tags".format(workspace_id)
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return {tag["name"]: tag["id"] for tag in response.json()}
def get_tasks(self, project_id, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/projects/{}/tasks".format(
workspace_id, project_id
)
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return {task["name"]: task["id"] for task in response.json()}
def get_workspace_id(self, workspace_name):
all_workspaces = self.get_workspaces()
if workspace_name not in all_workspaces:
return None
return all_workspaces[workspace_name]
def get_project_id(self, project_name, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
all_projects = self.get_projects(workspace_id)
if project_name not in all_projects:
return None
return all_projects[project_name]
def get_tag_id(self, tag_name, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
all_tasks = self.get_tags(workspace_id)
if tag_name not in all_tasks:
return None
return all_tasks[tag_name]
def get_task_id(self, task_name, project_id, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
all_tasks = self.get_tasks(project_id, workspace_id)
if task_name not in all_tasks:
return None
return all_tasks[task_name]
def get_current_time(self):
return str(datetime.datetime.utcnow().isoformat()) + "Z"
def start_time_entry(
self,
description,
project_id,
task_id=None,
tag_ids=None,
workspace_id=None,
user_id=None,
billable=True,
):
# Workspace
if workspace_id is None:
workspace_id = self.workspace_id
# User ID
if user_id is None:
user_id = self._user_id
# get running timer to check if we need to start it
current_timer = self.get_in_progress()
# Check if is currently run another times and has same values
# DO not restart the timer, if it is already running for current task
if current_timer:
current_timer_hierarchy = current_timer.get("description")
current_project_id = current_timer.get("projectId")
current_task_id = current_timer.get("taskId")
if (
description == current_timer_hierarchy
and project_id == current_project_id
and task_id == current_task_id
):
self.log.info(
"Timer for the current project is already running"
)
self.bool_timer_run = True
return self.bool_timer_run
self.finish_time_entry()
# Convert billable to strings
if billable:
billable = "true"
else:
billable = "false"
# Rest API Action
action_url = "workspaces/{}/user/{}/time-entries".format(
workspace_id, user_id
)
start = self.get_current_time()
body = {
"start": start,
"billable": billable,
"description": description,
"projectId": project_id,
"taskId": task_id,
"tagIds": tag_ids,
}
response = requests.post(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
if response.status_code < 300:
return True
return False
def _get_current_timer_values(self, response):
if response is None:
return
try:
output = response.json()
except json.decoder.JSONDecodeError:
return None
if output and isinstance(output, list):
return output[0]
return None
def get_in_progress(self, user_id=None, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
if user_id is None:
user_id = self.user_id
action_url = (
f"workspaces/{workspace_id}/user/"
f"{user_id}/time-entries?in-progress=1"
)
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return self._get_current_timer_values(response)
def finish_time_entry(self, workspace_id=None, user_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
if user_id is None:
user_id = self.user_id
current_timer = self.get_in_progress()
if not current_timer:
return
action_url = "workspaces/{}/user/{}/time-entries".format(
workspace_id, user_id
)
body = {"end": self.get_current_time()}
response = requests.patch(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
return response.json()
def get_time_entries(self, workspace_id=None, user_id=None, quantity=10):
if workspace_id is None:
workspace_id = self.workspace_id
if user_id is None:
user_id = self.user_id
action_url = "workspaces/{}/user/{}/time-entries".format(
workspace_id, user_id
)
response = requests.get(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return response.json()[:quantity]
def remove_time_entry(self, tid, workspace_id=None, user_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/user/{}/time-entries/{}".format(
workspace_id, user_id, tid
)
response = requests.delete(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers
)
return response.json()
def add_project(self, name, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/projects".format(workspace_id)
body = {
"name": name,
"clientId": "",
"isPublic": "false",
"estimate": {"estimate": 0, "type": "AUTO"},
"color": "#f44336",
"billable": "true",
}
response = requests.post(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
return response.json()
def add_workspace(self, name):
action_url = "workspaces/"
body = {"name": name}
response = requests.post(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
return response.json()
def add_task(self, name, project_id, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/projects/{}/tasks".format(
workspace_id, project_id
)
body = {"name": name, "projectId": project_id}
response = requests.post(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
return response.json()
def add_tag(self, name, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "workspaces/{}/tags".format(workspace_id)
body = {"name": name}
response = requests.post(
CLOCKIFY_ENDPOINT + action_url, headers=self.headers, json=body
)
return response.json()
def delete_project(self, project_id, workspace_id=None):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = "/workspaces/{}/projects/{}".format(
workspace_id, project_id
)
response = requests.delete(
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
)
return response.json()
def convert_input(
self, entity_id, entity_name, mode="Workspace", project_id=None
):
if entity_id is None:
error = False
error_msg = 'Missing information "{}"'
if mode.lower() == "workspace":
if entity_id is None and entity_name is None:
if self.workspace_id is not None:
entity_id = self.workspace_id
else:
error = True
else:
entity_id = self.get_workspace_id(entity_name)
else:
if entity_id is None and entity_name is None:
error = True
elif mode.lower() == "project":
entity_id = self.get_project_id(entity_name)
elif mode.lower() == "task":
entity_id = self.get_task_id(
task_name=entity_name, project_id=project_id
)
else:
raise TypeError("Unknown type")
# Raise error
if error:
raise ValueError(error_msg.format(mode))
return entity_id