Job Handler¶
The job handler is a class that is responsible for managing the job instances. It is a simple class that should be instantiated with a job type and will look into all the job instances of this type and run them if they are in the correct state.
The main usage of this is in a airflow DAG that runs periodically, checking for new jobs to run. The job handler will be instantiated with the job type that the DAG is responsible for and will run all the job instances of this type that are in the Queued state.
Class Definition¶
JobHandler(job_type, n_jobs=None, job_ids=None)
¶
Class for handling jobs in the performance database.
Parameters:
-
(job_type¶str) –Type of the job that will be handled. It must match the name of the job type in the
job_typestable in the database. Currently only "Alarm Calculation" is supported. -
(n_jobs¶int | None, default:None) –Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.
-
(job_ids¶list[int] | None, default:None) –IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.
If this argument is used,
n_jobswill be ignored. Default is None.
Source code in echo_energycalc/job_handler.py
@validate_call
def __init__(
self,
job_type: str,
n_jobs: int | None = None,
job_ids: list[int] | None = None,
) -> None:
"""
Class for handling jobs in the performance database.
Parameters
----------
job_type : str
Type of the job that will be handled. It must match the name of the job type in the `job_types` table in the database. Currently only "Alarm Calculation" is supported.
n_jobs : int | None, optional
Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.
job_ids : list[int] | None, optional
IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.
If this argument is used, `n_jobs` will be ignored. Default is None.
"""
# creating structure that will be used to connect to performance_db
self._job_type = job_type
self._perfdb = PerfDB(application_name=f"{self}")
# checking if job_type is in JOB_TYPE_CLASS_MAPPING
if job_type not in JOB_TYPE_CLASS_MAPPING:
raise ValueError(f"Job type {job_type} is not supported. Supported types are {list(JOB_TYPE_CLASS_MAPPING.keys())}.")
self._job_type_class: type[JobInstance] = JOB_TYPE_CLASS_MAPPING[job_type]
# checking if job type exists in performance_db
job_types = self._perfdb.jobs.types.get_ids()
if job_type not in job_types:
raise ValueError(f"Job type {job_type} does not exist in performance_db.")
self._job_type_id = job_types[job_type]
# setting number of jobs
self._n_jobs = n_jobs
# list of job instances that will be processed
self._job_ids = job_ids
self._job_ids: list[int] = self._get_jobs()
job_ids
property
¶
IDs of the job instances that will be processed.
Returns:
-
list[int] | None–IDs of the job instances that will be processed.
job_type
property
¶
Type of the job that will be handled. It matches the name of the job type in the job_types table in the database.
Returns:
-
str–Type of the job that will be handled.
job_type_id
property
¶
ID of the job type in the database.
Returns:
-
int–ID of the job type in the database.
run_jobs(skip_test=True)
¶
Runs the job instances that this handler will process.
Parameters:
-
(skip_test¶bool, default:True) –If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
Returns:
-
list[int]–List of job instance ids that failed to run
Source code in echo_energycalc/job_handler.py
def run_jobs(self, skip_test: bool = True) -> list[int]:
"""Runs the job instances that this handler will process.
Parameters
----------
skip_test : bool, optional
If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
Returns
-------
list[int]
List of job instance ids that failed to run
"""
failed_jobs = []
for job_id in self.job_ids:
try:
job_instance = self._job_type_class(job_id, skip_test=skip_test)
job_instance.run()
except Exception:
logger.exception(f"Error running job {job_id}.")
failed_jobs.append(job_id)
self._perfdb.jobs.instances.update(
jobs={job_id: {"job_state_name": "Failed", "comment": "Job failed due to an unknown error. Check logs."}},
)
return failed_jobs