Merge pull request #372 from pypeclub/feature/clockify_on_event_server

Feature/clockify on event server
This commit is contained in:
Milan Kolar 2020-07-26 21:35:20 +02:00 committed by GitHub
commit 36bc8d3a4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 422 additions and 164 deletions

View file

@ -1,6 +1,3 @@
from .clockify_api import ClockifyAPI
from .widget_settings import ClockifySettings
from .widget_message import MessageWidget
from .clockify import ClockifyModule
CLASS_DEFINIION = ClockifyModule

View file

@ -3,11 +3,12 @@ import threading
from pype.api import Logger
from avalon import style
from Qt import QtWidgets
from . import ClockifySettings, ClockifyAPI, MessageWidget
from .widgets import ClockifySettings, MessageWidget
from .clockify_api import ClockifyAPI
from .constants import CLOCKIFY_FTRACK_USER_PATH
class ClockifyModule:
workspace_name = None
def __init__(self, main_parent=None, parent=None):
@ -20,7 +21,7 @@ class ClockifyModule:
self.main_parent = main_parent
self.parent = parent
self.clockapi = ClockifyAPI()
self.clockapi = ClockifyAPI(master_parent=self)
self.message_widget = None
self.widget_settings = ClockifySettings(main_parent, self)
self.widget_settings_required = None
@ -31,8 +32,6 @@ class ClockifyModule:
self.bool_api_key_set = False
self.bool_workspace_set = False
self.bool_timer_run = False
self.clockapi.set_master(self)
self.bool_api_key_set = self.clockapi.set_api()
def tray_start(self):
@ -50,14 +49,12 @@ class ClockifyModule:
def process_modules(self, modules):
if 'FtrackModule' in modules:
actions_path = os.path.sep.join([
os.path.dirname(__file__),
'ftrack_actions'
])
current = os.environ.get('FTRACK_ACTIONS_PATH', '')
if current:
current += os.pathsep
os.environ['FTRACK_ACTIONS_PATH'] = current + actions_path
os.environ['FTRACK_ACTIONS_PATH'] = (
current + CLOCKIFY_FTRACK_USER_PATH
)
if 'AvalonApps' in modules:
from launcher import lib
@ -195,9 +192,10 @@ class ClockifyModule:
).format(project_name))
msg = (
"Project <b>\"{}\"</b> is not in Clockify Workspace <b>\"{}\"</b>."
"Project <b>\"{}\"</b> is not"
" in Clockify Workspace <b>\"{}\"</b>."
"<br><br>Please inform your Project Manager."
).format(project_name, str(self.clockapi.workspace))
).format(project_name, str(self.clockapi.workspace_name))
self.message_widget = MessageWidget(
self.main_parent, msg, "Clockify - Info Message"

View file

@ -1,35 +1,39 @@
import os
import re
import time
import requests
import json
import datetime
import appdirs
from .constants import (
CLOCKIFY_ENDPOINT, ADMIN_PERMISSION_NAMES, CREDENTIALS_JSON_PATH
)
class Singleton(type):
_instances = {}
def time_check(obj):
if obj.request_counter < 10:
obj.request_counter += 1
return
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(
Singleton, cls
).__call__(*args, **kwargs)
return cls._instances[cls]
wait_time = 1 - (time.time() - obj.request_time)
if wait_time > 0:
time.sleep(wait_time)
obj.request_time = time.time()
obj.request_counter = 0
class ClockifyAPI(metaclass=Singleton):
endpoint = "https://api.clockify.me/api/"
headers = {"X-Api-Key": None}
app_dir = os.path.normpath(appdirs.user_data_dir('pype-app', 'pype'))
file_name = 'clockify.json'
fpath = os.path.join(app_dir, file_name)
admin_permission_names = ['WORKSPACE_OWN', 'WORKSPACE_ADMIN']
master_parent = None
workspace = None
workspace_id = None
def set_master(self, master_parent):
class ClockifyAPI:
def __init__(self, api_key=None, master_parent=None):
self.workspace_name = None
self.workspace_id = None
self.master_parent = master_parent
self.api_key = api_key
self.request_counter = 0
self.request_time = time.time()
@property
def headers(self):
return {"X-Api-Key": self.api_key}
def verify_api(self):
for key, value in self.headers.items():
@ -42,7 +46,7 @@ class ClockifyAPI(metaclass=Singleton):
api_key = self.get_api_key()
if api_key is not None and self.validate_api_key(api_key) is True:
self.headers["X-Api-Key"] = api_key
self.api_key = api_key
self.set_workspace()
if self.master_parent:
self.master_parent.signed_in()
@ -52,8 +56,9 @@ class ClockifyAPI(metaclass=Singleton):
def validate_api_key(self, api_key):
test_headers = {'X-Api-Key': api_key}
action_url = 'workspaces/'
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=test_headers
)
if response.status_code != 200:
@ -69,25 +74,27 @@ class ClockifyAPI(metaclass=Singleton):
action_url = "/workspaces/{}/users/{}/permissions".format(
workspace_id, user_id
)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
user_permissions = response.json()
for perm in user_permissions:
if perm['name'] in self.admin_permission_names:
if perm['name'] in ADMIN_PERMISSION_NAMES:
return True
return False
def get_user_id(self):
action_url = 'v1/user/'
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
# this regex is neccessary: UNICODE strings are crashing
# during json serialization
id_regex ='\"{1}id\"{1}\:{1}\"{1}\w+\"{1}'
id_regex = '\"{1}id\"{1}\:{1}\"{1}\w+\"{1}'
result = re.findall(id_regex, str(response.content))
if len(result) != 1:
# replace with log and better message?
@ -98,9 +105,9 @@ class ClockifyAPI(metaclass=Singleton):
def set_workspace(self, name=None):
if name is None:
name = os.environ.get('CLOCKIFY_WORKSPACE', None)
self.workspace = name
self.workspace_name = name
self.workspace_id = None
if self.workspace is None:
if self.workspace_name is None:
return
try:
result = self.validate_workspace()
@ -115,7 +122,7 @@ class ClockifyAPI(metaclass=Singleton):
def validate_workspace(self, name=None):
if name is None:
name = self.workspace
name = self.workspace_name
all_workspaces = self.get_workspaces()
if name in all_workspaces:
return all_workspaces[name]
@ -124,25 +131,26 @@ class ClockifyAPI(metaclass=Singleton):
def get_api_key(self):
api_key = None
try:
file = open(self.fpath, 'r')
file = open(CREDENTIALS_JSON_PATH, 'r')
api_key = json.load(file).get('api_key', None)
if api_key == '':
api_key = None
except Exception:
file = open(self.fpath, 'w')
file = open(CREDENTIALS_JSON_PATH, 'w')
file.close()
return api_key
def save_api_key(self, api_key):
data = {'api_key': api_key}
file = open(self.fpath, 'w')
file = open(CREDENTIALS_JSON_PATH, 'w')
file.write(json.dumps(data))
file.close()
def get_workspaces(self):
action_url = 'workspaces/'
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
return {
@ -153,8 +161,9 @@ class ClockifyAPI(metaclass=Singleton):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = 'workspaces/{}/projects/'.format(workspace_id)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
@ -168,8 +177,9 @@ class ClockifyAPI(metaclass=Singleton):
action_url = 'workspaces/{}/projects/{}/'.format(
workspace_id, project_id
)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
@ -179,8 +189,9 @@ class ClockifyAPI(metaclass=Singleton):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = 'workspaces/{}/tags/'.format(workspace_id)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
@ -194,8 +205,9 @@ class ClockifyAPI(metaclass=Singleton):
action_url = 'workspaces/{}/projects/{}/tasks/'.format(
workspace_id, project_id
)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
@ -276,8 +288,9 @@ class ClockifyAPI(metaclass=Singleton):
"taskId": task_id,
"tagIds": tag_ids
}
time_check(self)
response = requests.post(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -293,8 +306,9 @@ class ClockifyAPI(metaclass=Singleton):
action_url = 'workspaces/{}/timeEntries/inProgress'.format(
workspace_id
)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
try:
@ -323,8 +337,9 @@ class ClockifyAPI(metaclass=Singleton):
"tagIds": current["tagIds"],
"end": self.get_current_time()
}
time_check(self)
response = requests.put(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -336,8 +351,9 @@ class ClockifyAPI(metaclass=Singleton):
if workspace_id is None:
workspace_id = self.workspace_id
action_url = 'workspaces/{}/timeEntries/'.format(workspace_id)
time_check(self)
response = requests.get(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
return response.json()[:quantity]
@ -348,8 +364,9 @@ class ClockifyAPI(metaclass=Singleton):
action_url = 'workspaces/{}/timeEntries/{}'.format(
workspace_id, tid
)
time_check(self)
response = requests.delete(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers
)
return response.json()
@ -369,8 +386,9 @@ class ClockifyAPI(metaclass=Singleton):
"color": "#f44336",
"billable": "true"
}
time_check(self)
response = requests.post(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -379,8 +397,9 @@ class ClockifyAPI(metaclass=Singleton):
def add_workspace(self, name):
action_url = 'workspaces/'
body = {"name": name}
time_check(self)
response = requests.post(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -398,8 +417,9 @@ class ClockifyAPI(metaclass=Singleton):
"name": name,
"projectId": project_id
}
time_check(self)
response = requests.post(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -412,8 +432,9 @@ class ClockifyAPI(metaclass=Singleton):
body = {
"name": name
}
time_check(self)
response = requests.post(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
json=body
)
@ -427,8 +448,9 @@ class ClockifyAPI(metaclass=Singleton):
action_url = '/workspaces/{}/projects/{}'.format(
workspace_id, project_id
)
time_check(self)
response = requests.delete(
self.endpoint + action_url,
CLOCKIFY_ENDPOINT + action_url,
headers=self.headers,
)
return response.json()

View file

@ -0,0 +1,17 @@
import os
import appdirs
CLOCKIFY_FTRACK_SERVER_PATH = os.path.join(
os.path.dirname(__file__), "ftrack", "server"
)
CLOCKIFY_FTRACK_USER_PATH = os.path.join(
os.path.dirname(__file__), "ftrack", "user"
)
CREDENTIALS_JSON_PATH = os.path.normpath(os.path.join(
appdirs.user_data_dir("pype-app", "pype"),
"clockify.json"
))
ADMIN_PERMISSION_NAMES = ["WORKSPACE_OWN", "WORKSPACE_ADMIN"]
CLOCKIFY_ENDPOINT = "https://api.clockify.me/api/"

View file

@ -0,0 +1,166 @@
import os
import json
from pype.modules.ftrack.lib import BaseAction
from pype.modules.clockify.clockify_api import ClockifyAPI
class SyncClocifyServer(BaseAction):
'''Synchronise project names and task types.'''
identifier = "clockify.sync.server"
label = "Sync To Clockify (server)"
description = "Synchronise data to Clockify workspace"
discover_role_list = ["Pypeclub", "Administrator", "project Manager"]
def __init__(self, *args, **kwargs):
super(SyncClocifyServer, self).__init__(*args, **kwargs)
workspace_name = os.environ.get("CLOCKIFY_WORKSPACE")
api_key = os.environ.get("CLOCKIFY_API_KEY")
self.clockapi = ClockifyAPI(api_key)
self.clockapi.set_workspace(workspace_name)
if api_key is None:
modified_key = "None"
else:
str_len = int(len(api_key) / 2)
start_replace = int(len(api_key) / 4)
modified_key = ""
for idx in range(len(api_key)):
if idx >= start_replace and idx < start_replace + str_len:
replacement = "X"
else:
replacement = api_key[idx]
modified_key += replacement
self.log.info(
"Clockify info. Workspace: \"{}\" API key: \"{}\"".format(
str(workspace_name), str(modified_key)
)
)
def discover(self, session, entities, event):
if (
len(entities) != 1
or entities[0].entity_type.lower() != "project"
):
return False
# Get user and check his roles
user_id = event.get("source", {}).get("user", {}).get("id")
if not user_id:
return False
user = session.query("User where id is \"{}\"".format(user_id)).first()
if not user:
return False
for role in user["user_security_roles"]:
if role["security_role"]["name"] in self.discover_role_list:
return True
return False
def register(self):
self.session.event_hub.subscribe(
"topic=ftrack.action.discover",
self._discover,
priority=self.priority
)
launch_subscription = (
"topic=ftrack.action.launch and data.actionIdentifier={}"
).format(self.identifier)
self.session.event_hub.subscribe(launch_subscription, self._launch)
def launch(self, session, entities, event):
if self.clockapi.workspace_id is None:
return {
"success": False,
"message": "Clockify Workspace or API key are not set!"
}
if self.clockapi.validate_workspace_perm() is False:
return {
"success": False,
"message": "Missing permissions for this action!"
}
# JOB SETTINGS
user_id = event["source"]["user"]["id"]
user = session.query("User where id is " + user_id).one()
job = session.create("Job", {
"user": user,
"status": "running",
"data": json.dumps({"description": "Sync Ftrack to Clockify"})
})
session.commit()
project_entity = entities[0]
if project_entity.entity_type.lower() != "project":
project_entity = self.get_project_from_entity(project_entity)
project_name = project_entity["full_name"]
self.log.info(
"Synchronization of project \"{}\" to clockify begins.".format(
project_name
)
)
task_types = (
project_entity["project_schema"]["_task_type_schema"]["types"]
)
task_type_names = [
task_type["name"] for task_type in task_types
]
try:
clockify_projects = self.clockapi.get_projects()
if project_name not in clockify_projects:
response = self.clockapi.add_project(project_name)
if "id" not in response:
self.log.warning(
"Project \"{}\" can't be created. Response: {}".format(
project_name, response
)
)
return {
"success": False,
"message": (
"Can't create clockify project \"{}\"."
" Unexpected error."
).format(project_name)
}
clockify_workspace_tags = self.clockapi.get_tags()
for task_type_name in task_type_names:
if task_type_name in clockify_workspace_tags:
self.log.debug(
"Task \"{}\" already exist".format(task_type_name)
)
continue
response = self.clockapi.add_tag(task_type_name)
if "id" not in response:
self.log.warning(
"Task \"{}\" can't be created. Response: {}".format(
task_type_name, response
)
)
job["status"] = "done"
except Exception:
self.log.warning(
"Synchronization to clockify failed.",
exc_info=True
)
finally:
if job["status"] != "done":
job["status"] = "failed"
session.commit()
return True
def register(session, **kw):
SyncClocifyServer(session).register()

View file

@ -1,15 +1,15 @@
import json
from pype.modules.ftrack.lib import BaseAction, statics_icon
from pype.modules.clockify import ClockifyAPI
from pype.modules.clockify.clockify_api import ClockifyAPI
class SyncClocify(BaseAction):
class SyncClocifyLocal(BaseAction):
'''Synchronise project names and task types.'''
#: Action identifier.
identifier = 'clockify.sync'
identifier = 'clockify.sync.local'
#: Action label.
label = 'Sync To Clockify'
label = 'Sync To Clockify (local)'
#: Action description.
description = 'Synchronise data to Clockify workspace'
#: roles that are allowed to register this action
@ -119,4 +119,4 @@ class SyncClocify(BaseAction):
def register(session, **kw):
SyncClocify(session).register()
SyncClocifyLocal(session).register()

View file

@ -1,6 +1,6 @@
from avalon import api, io
from pype.api import Logger
from pype.modules.clockify import ClockifyAPI
from pype.modules.clockify.clockify_api import ClockifyAPI
log = Logger().get_logger(__name__, "clockify_start")

View file

@ -1,5 +1,5 @@
from avalon import api, io
from pype.modules.clockify import ClockifyAPI
from pype.modules.clockify.clockify_api import ClockifyAPI
from pype.api import Logger
log = Logger().get_logger(__name__, "clockify_sync")

View file

@ -1,92 +0,0 @@
from Qt import QtCore, QtGui, QtWidgets
from avalon import style
from pype.api import resources
class MessageWidget(QtWidgets.QWidget):
SIZE_W = 300
SIZE_H = 130
closed = QtCore.Signal()
def __init__(self, parent=None, messages=[], title="Message"):
super(MessageWidget, self).__init__()
self._parent = parent
# Icon
if parent and hasattr(parent, 'icon'):
self.setWindowIcon(parent.icon)
else:
icon = QtGui.QIcon(resources.pype_icon_filepath())
self.setWindowIcon(icon)
self.setWindowFlags(
QtCore.Qt.WindowCloseButtonHint |
QtCore.Qt.WindowMinimizeButtonHint
)
# Font
self.font = QtGui.QFont()
self.font.setFamily("DejaVu Sans Condensed")
self.font.setPointSize(9)
self.font.setBold(True)
self.font.setWeight(50)
self.font.setKerning(True)
# Size setting
self.resize(self.SIZE_W, self.SIZE_H)
self.setMinimumSize(QtCore.QSize(self.SIZE_W, self.SIZE_H))
self.setMaximumSize(QtCore.QSize(self.SIZE_W+100, self.SIZE_H+100))
# Style
self.setStyleSheet(style.load_stylesheet())
self.setLayout(self._ui_layout(messages))
self.setWindowTitle(title)
def _ui_layout(self, messages):
if not messages:
messages = ["*Misssing messages (This is a bug)*", ]
elif not isinstance(messages, (tuple, list)):
messages = [messages, ]
main_layout = QtWidgets.QVBoxLayout(self)
labels = []
for message in messages:
label = QtWidgets.QLabel(message)
label.setFont(self.font)
label.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
label.setTextFormat(QtCore.Qt.RichText)
label.setWordWrap(True)
labels.append(label)
main_layout.addWidget(label)
btn_close = QtWidgets.QPushButton("Close")
btn_close.setToolTip('Close this window')
btn_close.clicked.connect(self.on_close_clicked)
btn_group = QtWidgets.QHBoxLayout()
btn_group.addStretch(1)
btn_group.addWidget(btn_close)
main_layout.addLayout(btn_group)
self.labels = labels
self.btn_group = btn_group
self.btn_close = btn_close
self.main_layout = main_layout
return main_layout
def on_close_clicked(self):
self.close()
def close(self, *args, **kwargs):
self.closed.emit()
super(MessageWidget, self).close(*args, **kwargs)

View file

@ -1,9 +1,97 @@
import os
from Qt import QtCore, QtGui, QtWidgets
from avalon import style
from pype.api import resources
class MessageWidget(QtWidgets.QWidget):
SIZE_W = 300
SIZE_H = 130
closed = QtCore.Signal()
def __init__(self, parent=None, messages=[], title="Message"):
super(MessageWidget, self).__init__()
self._parent = parent
# Icon
if parent and hasattr(parent, 'icon'):
self.setWindowIcon(parent.icon)
else:
icon = QtGui.QIcon(resources.pype_icon_filepath())
self.setWindowIcon(icon)
self.setWindowFlags(
QtCore.Qt.WindowCloseButtonHint |
QtCore.Qt.WindowMinimizeButtonHint
)
# Font
self.font = QtGui.QFont()
self.font.setFamily("DejaVu Sans Condensed")
self.font.setPointSize(9)
self.font.setBold(True)
self.font.setWeight(50)
self.font.setKerning(True)
# Size setting
self.resize(self.SIZE_W, self.SIZE_H)
self.setMinimumSize(QtCore.QSize(self.SIZE_W, self.SIZE_H))
self.setMaximumSize(QtCore.QSize(self.SIZE_W+100, self.SIZE_H+100))
# Style
self.setStyleSheet(style.load_stylesheet())
self.setLayout(self._ui_layout(messages))
self.setWindowTitle(title)
def _ui_layout(self, messages):
if not messages:
messages = ["*Misssing messages (This is a bug)*", ]
elif not isinstance(messages, (tuple, list)):
messages = [messages, ]
main_layout = QtWidgets.QVBoxLayout(self)
labels = []
for message in messages:
label = QtWidgets.QLabel(message)
label.setFont(self.font)
label.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
label.setTextFormat(QtCore.Qt.RichText)
label.setWordWrap(True)
labels.append(label)
main_layout.addWidget(label)
btn_close = QtWidgets.QPushButton("Close")
btn_close.setToolTip('Close this window')
btn_close.clicked.connect(self.on_close_clicked)
btn_group = QtWidgets.QHBoxLayout()
btn_group.addStretch(1)
btn_group.addWidget(btn_close)
main_layout.addLayout(btn_group)
self.labels = labels
self.btn_group = btn_group
self.btn_close = btn_close
self.main_layout = main_layout
return main_layout
def on_close_clicked(self):
self.close()
def close(self, *args, **kwargs):
self.closed.emit()
super(MessageWidget, self).close(*args, **kwargs)
class ClockifySettings(QtWidgets.QWidget):
SIZE_W = 300

View file

@ -522,6 +522,21 @@ def main(argv):
help="Load creadentials from apps dir",
action="store_true"
)
parser.add_argument(
"-clockifyapikey", type=str,
help=(
"Enter API key for Clockify actions."
" (default from environment: $CLOCKIFY_API_KEY)"
)
)
parser.add_argument(
"-clockifyworkspace", type=str,
help=(
"Enter workspace for Clockify."
" (default from module presets or "
"environment: $CLOCKIFY_WORKSPACE)"
)
)
ftrack_url = os.environ.get('FTRACK_SERVER')
username = os.environ.get('FTRACK_API_USER')
api_key = os.environ.get('FTRACK_API_KEY')
@ -546,6 +561,12 @@ def main(argv):
if kwargs.ftrackapikey:
api_key = kwargs.ftrackapikey
if kwargs.clockifyworkspace:
os.environ["CLOCKIFY_WORKSPACE"] = kwargs.clockifyworkspace
if kwargs.clockifyapikey:
os.environ["CLOCKIFY_API_KEY"] = kwargs.clockifyapikey
legacy = kwargs.legacy
# Check url regex and accessibility
ftrack_url = check_ftrack_url(ftrack_url)

View file

@ -9,7 +9,7 @@ from pype.modules.ftrack.ftrack_server.lib import (
SocketSession, ProcessEventHub, TOPIC_STATUS_SERVER
)
import ftrack_api
from pype.api import Logger
from pype.api import Logger, config
log = Logger().get_logger("Event processor")
@ -55,6 +55,42 @@ def register(session):
)
def clockify_module_registration():
module_name = "Clockify"
menu_items = config.get_presets()["tray"]["menu_items"]
if not menu_items["item_usage"][module_name]:
return
api_key = os.environ.get("CLOCKIFY_API_KEY")
if not api_key:
log.warning("Clockify API key is not set.")
return
workspace_name = os.environ.get("CLOCKIFY_WORKSPACE")
if not workspace_name:
workspace_name = (
menu_items
.get("attributes", {})
.get(module_name, {})
.get("workspace_name", {})
)
if not workspace_name:
log.warning("Clockify Workspace is not set.")
return
os.environ["CLOCKIFY_WORKSPACE"] = workspace_name
from pype.modules.clockify.constants import CLOCKIFY_FTRACK_SERVER_PATH
current = os.environ.get("FTRACK_EVENTS_PATH") or ""
if current:
current += os.pathsep
os.environ["FTRACK_EVENTS_PATH"] = current + CLOCKIFY_FTRACK_SERVER_PATH
return True
def main(args):
port = int(args[-1])
# Create a TCP/IP socket
@ -66,6 +102,11 @@ def main(args):
sock.connect(server_address)
sock.sendall(b"CreatedProcess")
try:
clockify_module_registration()
except Exception:
log.info("Clockify registration failed.", exc_info=True)
try:
session = SocketSession(
auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub