few minor modifications and changes

This commit is contained in:
Jakub Trllo 2022-07-29 19:20:47 +02:00
parent c429a41188
commit cc5abb1514

View file

@ -15,9 +15,14 @@ REMOVED_VALUE = object()
class AbstractOperation(object):
"""Base operation class."""
def __init__(self, entity_type):
def __init__(self, project_name, entity_type):
self._project_name = project_name
self._entity_type = entity_type
self._id = uuid.uuid4()
self._id = str(uuid.uuid4())
@property
def project_name(self):
return self._project_name
@property
def id(self):
@ -27,14 +32,28 @@ class AbstractOperation(object):
def entity_type(self):
return self._entity_type
@abstractproperty
def operation_name(self):
pass
@abstractmethod
def to_mongo_operation(self):
pass
def to_data(self):
return {
"id": self._id,
"entity_type": self.entity_type,
"project_name": self.project_name,
"operation": self.operation_name
}
class CreateOperation(AbstractOperation):
operation_name = "create"
def __init__(self, project_name, entity_type, data):
super(CreateOperation, self).__init__(entity_type)
super(CreateOperation, self).__init__(project_name, entity_type)
if not data:
data = {}
@ -73,32 +92,32 @@ class CreateOperation(AbstractOperation):
return InsertOne(copy.deepcopy(self._data))
def to_data(self):
return {
"operation": "create",
"entity_type": self.entity_type,
"data": copy.deepcopy(self.data)
}
output = super(CreateOperation, self).to_data()
output["data"] = copy.deepcopy(self.data)
return output
class UpdateOperation(AbstractOperation):
def __init__(self, project_name, entity_type, entity_id, update_fields):
super(CreateOperation, self).__init__(entity_type)
operation_name = "update"
def __init__(self, project_name, entity_type, entity_id, update_data):
super(UpdateOperation, self).__init__(project_name, entity_type)
self._entity_id = ObjectId(entity_id)
self._update_fields = update_fields
self._update_data = update_data
@property
def entity_id(self):
return self._entity_id
@property
def update_fields(self):
return self._update_fields
def update_data(self):
return self._update_data
def to_mongo_operation(self):
unset_data = {}
set_data = {}
for key, value in self._update_fields.items():
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
unset_data[key] = value
else:
@ -120,22 +139,24 @@ class UpdateOperation(AbstractOperation):
def to_data(self):
fields = {}
for key, value in self._update_fields.items():
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
value = None
fields[key] = value
return {
"operation": "update",
"entity_type": self.entity_type,
output = super(UpdateOperation, self).to_data()
output.update({
"entity_id": str(self.entity_id),
"fields": fields
}
})
return output
class DeleteOperation(AbstractOperation):
def __init__(self, entity_type, entity_id):
super(DeleteOperation, self).__init__(entity_type)
operation_name = "delete"
def __init__(self, project_name, entity_type, entity_id):
super(DeleteOperation, self).__init__(project_name, entity_type)
self._entity_id = ObjectId(entity_id)
@ -147,11 +168,9 @@ class DeleteOperation(AbstractOperation):
return DeleteOne({"_id": self.entity_id})
def to_data(self):
return {
"operation": "delete",
"entity_type": self.entity_type,
"entity_id": str(self.entity_id)
}
output = super(DeleteOperation, self).to_data()
output["entity_id"] = self.entity_id
return output
class OperationsSession(object):
@ -168,14 +187,9 @@ class OperationsSession(object):
project_name (str): Project name to which are operations related.
"""
def __init__(self, project_name):
self._project_name = project_name
def __init__(self):
self._operations = []
@property
def project_name(self):
return self._project_name
def add(self, operation):
"""Add operation to be processed.
@ -223,13 +237,10 @@ class OperationsSession(object):
self._operations = []
def to_data(self):
return {
"project_name": self.project_name,
"operations": [
operation.to_data()
for operation in self._operations
]
}
return [
operation.to_data()
for operation in self._operations
]
def commit(self):
"""Commit session operations."""
@ -238,12 +249,34 @@ class OperationsSession(object):
if not operations:
return
bulk_writes = []
operations_by_project = collections.defaultdict(list)
for operation in operations:
mongo_op = operation.to_mongo_operation()
if mongo_op is not None:
bulk_writes.append(mongo_op)
operations_by_project[operation.project_name].append(operation)
if bulk_writes:
collection = get_project_connection(self.project_name)
collection.bulk_write(bulk_writes)
for project_name, operations in operations_by_project.items():
bulk_writes = []
for operation in operations:
mongo_op = operation.to_mongo_operation()
if mongo_op is not None:
bulk_writes.append(mongo_op)
if bulk_writes:
collection = get_project_connection(project_name)
collection.bulk_write(bulk_writes)
def create_entity(self, project_name, entity_type, data):
operation = CreateOperation(project_name, entity_type, data)
self.add(operation)
return operation
def update_entity(self, project_name, entity_type, entity_id, update_data):
operation = UpdateOperation(
project_name, entity_type, entity_id, update_data
)
self.add(operation)
return operation
def delete_entity(self, project_name, entity_type, entity_id):
operation = DeleteOperation(project_name, entity_type, entity_id)
self.add(operation)
return operation