From 98acfd8dfcbbe7d5d4fe4eee0ec74d0539d3be92 Mon Sep 17 00:00:00 2001 From: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Date: Mon, 28 Apr 2025 17:03:21 +0200 Subject: [PATCH] updated entities model --- .../tools/workfiles/models/workfiles.py | 194 ++++++++++++------ 1 file changed, 126 insertions(+), 68 deletions(-) diff --git a/client/ayon_core/tools/workfiles/models/workfiles.py b/client/ayon_core/tools/workfiles/models/workfiles.py index 6fc76ac458..e4d555261e 100644 --- a/client/ayon_core/tools/workfiles/models/workfiles.py +++ b/client/ayon_core/tools/workfiles/models/workfiles.py @@ -456,107 +456,145 @@ class WorkfileEntitiesModel: """Workfile entities model. Args: - control (AbstractWorkfileController): Controller object. - """ + controller (AbstractWorkfileController): Controller object. + """ def __init__(self, controller): self._controller = controller - self._cache = {} - self._items = {} + self._workfile_entities_by_task_id = {} self._current_username = _NOT_SET def reset(self): - self._cache = {} - self._items = {} + self._workfile_entities_by_task_id = {} - def get_workfile_info( - self, folder_id, task_id, filepath, rootless_path=None + def get_workfile_entities(self, task_id: str): + if not task_id: + return [] + workfile_entities = self._workfile_entities_by_task_id.get(task_id) + if workfile_entities is None: + workfile_entities = list(ayon_api.get_workfiles_info( + self._controller.get_current_project_name(), + task_ids=[task_id], + )) + self._workfile_entities_by_task_id[task_id] = workfile_entities + return workfile_entities + + def save_workfile_info( + self, + task_id: str, + rootless_path: str, + version: Optional[int], + comment: Optional[str], + description: Optional[str], ): - if not folder_id or not task_id or not filepath: - return None - - if rootless_path is None: - rootless_path = self._get_rootless_path(filepath) - - identifier = self._get_workfile_info_identifier( - folder_id, task_id, rootless_path) - item = self._items.get(identifier) - if item is None: - workfile_info = self._get_workfile_info( - folder_id, task_id, identifier - ) - item = self._prepare_workfile_info_item( - folder_id, task_id, workfile_info, filepath - ) - self._items[identifier] = item - return item - - def save_workfile_info(self, folder_id, task_id, filepath, note): - rootless_path = self._get_rootless_path(filepath) - identifier = self._get_workfile_info_identifier( - folder_id, task_id, rootless_path + # TODO create pipeline function for this + workfile_entities = self.get_workfile_entities(task_id) + workfile_entity = next( + ( + _ent + for _ent in workfile_entities + if _ent["path"] == rootless_path + ), + None ) - workfile_info = self._get_workfile_info( - folder_id, task_id, identifier - ) - if not workfile_info: - self._cache[identifier] = self._create_workfile_info_entity( - task_id, rootless_path, note or "") - self._items.pop(identifier, None) + if not workfile_entity: + workfile_entity = self._create_workfile_info_entity( + task_id, + rootless_path, + version, + comment, + description, + ) + workfile_entities.append(workfile_entity) return - old_note = workfile_info.get("attrib", {}).get("note") + data = {} + for key, value in ( + ("host_name", self._controller.get_host_name()), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value + + old_data = workfile_entity["data"] + + changed_data = {} + for key, value in data.items(): + if key not in old_data or old_data[key] != value: + changed_data[key] = value - new_workfile_info = copy.deepcopy(workfile_info) update_data = {} - if note is not None and old_note != note: - update_data["attrib"] = {"description": note} - attrib = new_workfile_info.setdefault("attrib", {}) - attrib["description"] = note + if changed_data: + update_data["data"] = changed_data + + old_description = workfile_entity["attrib"].get("description") + if description is not None and old_description != description: + update_data["attrib"] = {"description": description} + workfile_entity["attrib"]["description"] = description username = self._get_current_username() # Automatically fix 'createdBy' and 'updatedBy' fields # NOTE both fields were not automatically filled by server # until 1.1.3 release. - if workfile_info.get("createdBy") is None: + if workfile_entity.get("createdBy") is None: update_data["createdBy"] = username - new_workfile_info["createdBy"] = username + workfile_entity["createdBy"] = username - if workfile_info.get("updatedBy") != username: + if workfile_entity.get("updatedBy") != username: update_data["updatedBy"] = username - new_workfile_info["updatedBy"] = username + workfile_entity["updatedBy"] = username if not update_data: return - self._cache[identifier] = new_workfile_info - self._items.pop(identifier, None) - project_name = self._controller.get_current_project_name() session = OperationsSession() session.update_entity( project_name, "workfile", - workfile_info["id"], + workfile_entity["id"], update_data, ) session.commit() - def _create_workfile_info_entity(self, task_id, rootless_path, note): + def _create_workfile_info_entity( + self, + task_id: str, + rootless_path: str, + version: Optional[int], + comment: Optional[str], + description: str, + ) -> dict[str, Any]: extension = os.path.splitext(rootless_path)[1] project_name = self._controller.get_current_project_name() + attrib = {} + for key, value in ( + ("extension", extension), + ("description", description), + ): + if value is not None: + attrib[key] = value + + data = {} + for key, value in ( + ("host_name", self._controller.get_host_name()), + ("version", version), + ("comment", comment), + ): + if value is not None: + data[key] = value + username = self._get_current_username() workfile_info = { "id": uuid.uuid4().hex, "path": rootless_path, "taskId": task_id, - "attrib": { - "extension": extension, - "description": note - }, + "attrib": attrib, + "data": data, # TODO remove 'createdBy' and 'updatedBy' fields when server is # or above 1.1.3 . "createdBy": username, @@ -568,7 +606,7 @@ class WorkfileEntitiesModel: session.commit() return workfile_info - def _get_current_username(self): + def _get_current_username(self) -> str: if self._current_username is _NOT_SET: self._current_username = get_ayon_username() return self._current_username @@ -709,18 +747,38 @@ class WorkfilesModel: self._published_model = PublishWorkfilesModel(controller) def reset(self): - self._entities_model.reset() self._workarea_model.reset() + self._entities_model.reset() - def get_workfile_info(self, folder_id, task_id, filepath): - return self._entities_model.get_workfile_info( - folder_id, task_id, filepath + def reset_workarea_file_items(self, task_id): + self._workarea_model.reset_file_items(task_id) + + def get_workfile_info(self, folder_id, task_id, rootless_path): + return self._workarea_model.get_workfile_info( + folder_id, task_id, rootless_path ) - def save_workfile_info(self, folder_id, task_id, filepath, note): + def save_workfile_info( + self, + task_id, + rootless_path, + version, + comment, + description, + ): self._entities_model.save_workfile_info( - folder_id, task_id, filepath, note + task_id, + rootless_path, + version, + comment, + description, ) + self._workarea_model.update_file_description( + task_id, rootless_path, description + ) + + def get_workfile_entities(self, task_id): + return self._entities_model.get_workfile_entities(task_id) def get_workarea_dir_by_context(self, folder_id, task_id): """Workarea dir for passed context. @@ -738,20 +796,20 @@ class WorkfilesModel: return self._workarea_model.get_workarea_dir_by_context( folder_id, task_id) - def get_workarea_file_items(self, folder_id, task_id, task_name): + def get_workarea_file_items(self, folder_id, task_id): """Workfile items for passed context from workarea. Args: folder_id (Union[str, None]): Folder id. task_id (Union[str, None]): Task id. - task_name (Union[str, None]): Task name. Returns: - list[FileItem]: List of file items matching workarea of passed + list[WorkfileInfo]: List of file items matching workarea of passed context. + """ return self._workarea_model.get_file_items( - folder_id, task_id, task_name + folder_id, task_id ) def get_workarea_save_as_data(self, folder_id, task_id):