From 663d2bf8d377d0d54865621301ac3a58e646750e Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 8 Oct 2021 18:40:17 +0200 Subject: [PATCH] base of jobs queue --- .../hosts_job_server/job_server/jobs.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/openpype/modules/default_modules/hosts_job_server/job_server/jobs.py b/openpype/modules/default_modules/hosts_job_server/job_server/jobs.py index dfd5f3f4ec..7782d84f31 100644 --- a/openpype/modules/default_modules/hosts_job_server/job_server/jobs.py +++ b/openpype/modules/default_modules/hosts_job_server/job_server/jobs.py @@ -96,3 +96,56 @@ class Job: output["state"] = state return output + + +class JobQueue: + """Queue holds jobs that should be done and workers that can do them. + + Also asign jobs to a worker. + """ + old_jobs_check_minutes_interval = 30 + + def __init__(self): + self._last_old_jobs_check = datetime.datetime.now() + self._jobs_by_id = {} + self._job_queue_by_host_name = collections.defaultdict( + collections.deque + ) + + def get_job(self, job_id): + """Job by it's id.""" + return self._jobs_by_id.get(job_id) + + def create_job(self, host_name, job_data): + """Create new job from passed data and add it to queue.""" + job = Job(host_name, job_data) + self._jobs_by_id[job.id] = job + self._job_queue_by_host_name[host_name].append(job) + return job + + def _remove_old_jobs(self): + """Once in specific time look if should remove old finished jobs.""" + delta = datetime.datetime.now() - self._last_old_jobs_check + if delta.seconds < self.old_jobs_check_minutes_interval: + return + + for job_id in tuple(self._jobs_by_id.keys()): + job = self._jobs_by_id[job_id] + if not job.keep_in_memory(): + self._jobs_by_id.pop(job_id) + + def remove_job(self, job_id): + """Delete job and eventually stop it.""" + job = self._jobs_by_id.get(job_id) + if job is None: + return + + job.set_deleted() + self._jobs_by_id.pop(job.id) + + def get_job_status(self, job_id): + """Job's status based on id.""" + job = self._jobs_by_id.get(job_id) + if job is None: + return {} + return job.status()