Skip to content

Job Handler

The job handler is a class that is responsible for managing the job instances. It is a simple class that should be instantiated with a job type and will look into all the job instances of this type and run them if they are in the correct state.

The main usage of this is in a airflow DAG that runs periodically, checking for new jobs to run. The job handler will be instantiated with the job type that the DAG is responsible for and will run all the job instances of this type that are in the Queued state.

Class Definition

JobHandler(job_type, n_jobs=None, job_ids=None)

Class for handling jobs in the performance database.

Parameters:

  • job_type

    (str) –

    Type of the job that will be handled. It must match the name of the job type in the job_types table in the database. Currently only "Alarm Calculation" is supported.

  • n_jobs

    (int | None, default: None ) –

    Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.

  • job_ids

    (list[int] | None, default: None ) –

    IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.

    If this argument is used, n_jobs will be ignored. Default is None.

Source code in echo_energycalc/job_handler.py
@validate_call
def __init__(
    self,
    job_type: str,
    n_jobs: int | None = None,
    job_ids: list[int] | None = None,
) -> None:
    """
    Class for handling jobs in the performance database.

    Parameters
    ----------
    job_type : str
        Type of the job that will be handled. It must match the name of the job type in the `job_types` table in the database. Currently only "Alarm Calculation" is supported.
    n_jobs : int | None, optional
        Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.
    job_ids : list[int] | None, optional
        IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.

        If this argument is used, `n_jobs` will be ignored. Default is None.
    """
    # creating structure that will be used to connect to performance_db
    self._job_type = job_type
    self._perfdb = PerfDB(application_name=f"{self}")

    # checking if job_type is in JOB_TYPE_CLASS_MAPPING
    if job_type not in JOB_TYPE_CLASS_MAPPING:
        raise ValueError(f"Job type {job_type} is not supported. Supported types are {list(JOB_TYPE_CLASS_MAPPING.keys())}.")
    self._job_type_class: type[JobInstance] = JOB_TYPE_CLASS_MAPPING[job_type]

    # checking if job type exists in performance_db
    job_types = self._perfdb.jobs.types.get_ids()
    if job_type not in job_types:
        raise ValueError(f"Job type {job_type} does not exist in performance_db.")
    self._job_type_id = job_types[job_type]

    # setting number of jobs
    self._n_jobs = n_jobs

    # list of job instances that will be processed
    self._job_ids = job_ids
    self._job_ids: list[int] = self._get_jobs()

job_ids property

IDs of the job instances that will be processed.

Returns:

  • list[int] | None

    IDs of the job instances that will be processed.

job_type property

Type of the job that will be handled. It matches the name of the job type in the job_types table in the database.

Returns:

  • str

    Type of the job that will be handled.

job_type_id property

ID of the job type in the database.

Returns:

  • int

    ID of the job type in the database.

run_jobs(skip_test=True)

Runs the job instances that this handler will process.

Parameters:

  • skip_test

    (bool, default: True ) –

    If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.

Returns:

  • list[int]

    List of job instance ids that failed to run

Source code in echo_energycalc/job_handler.py
def run_jobs(self, skip_test: bool = True) -> list[int]:
    """Runs the job instances that this handler will process.

    Parameters
    ----------
    skip_test : bool, optional
        If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.

    Returns
    -------
    list[int]
        List of job instance ids that failed to run
    """
    failed_jobs = []
    for job_id in self.job_ids:
        try:
            job_instance = self._job_type_class(job_id, skip_test=skip_test)
            job_instance.run()
        except Exception:
            logger.exception(f"Error running job {job_id}.")
            failed_jobs.append(job_id)
            self._perfdb.jobs.instances.update(
                jobs={job_id: {"job_state_name": "Failed", "comment": "Job failed due to an unknown error. Check logs."}},
            )

    return failed_jobs