Skip to content

Job Handler

The job handler is a class that is responsible for managing the job instances. It is a simple class that should be instantiated with a job type and will look into all the job instances of this type and run them if they are in the correct state.

The main usage of this is in a airflow DAG that runs periodically, checking for new jobs to run. The job handler will be instantiated with the job type that the DAG is responsible for and will run all the job instances of this type that are in the Queued state.

Class Definition

JobHandler(job_type, n_jobs=None, job_ids=None)

Class for handling jobs in the performance database.

Parameters:

  • job_type

    (str) –

    Type of the job that will be handled. It must match the name of the job type in the job_types table in the database. Currently only "Alarm Calculation" is supported.

  • n_jobs

    (int | None, default: None ) –

    Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.

  • job_ids

    (list[int] | None, default: None ) –

    IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.

    If this argument is used, n_jobs will be ignored. Default is None.

Source code in echo_energycalc/job_handler.py
Python
@validate_call
def __init__(
    self,
    job_type: str,
    n_jobs: int | None = None,
    job_ids: list[int] | None = None,
) -> None:
    """
    Class for handling jobs in the performance database.

    Parameters
    ----------
    job_type : str
        Type of the job that will be handled. It must match the name of the job type in the `job_types` table in the database. Currently only "Alarm Calculation" is supported.
    n_jobs : int | None, optional
        Number of job instances that this handler will process. If None, all job instances of the given type will be processed. Default is None.
    job_ids : list[int] | None, optional
        IDs of the job instances that will be processed. If None, the job instances will be fetched from the database.

        If this argument is used, `n_jobs` will be ignored. Default is None.
    """
    # creating structure that will be used to connect to performance_db
    self._job_type = job_type
    self._perfdb = PerfDB(application_name=f"{self}")

    # checking if job_type is in JOB_TYPE_CLASS_MAPPING
    if job_type not in JOB_TYPE_CLASS_MAPPING:
        raise ValueError(f"Job type {job_type} is not supported. Supported types are {list(JOB_TYPE_CLASS_MAPPING.keys())}.")
    self._job_type_class: type[JobInstance] = JOB_TYPE_CLASS_MAPPING[job_type]

    # checking if job type exists in performance_db
    job_types = self._perfdb.jobs.types.get_ids()
    if job_type not in job_types:
        raise ValueError(f"Job type {job_type} does not exist in performance_db.")
    self._job_type_id = job_types[job_type]

    self._n_jobs = n_jobs
    self._job_ids: list[int] = self._get_jobs(job_ids)

job_ids property

IDs of the job instances that will be processed.

Returns:

  • list[int]

    IDs of the job instances that will be processed.

job_type property

Type of the job that will be handled. It matches the name of the job type in the job_types table in the database.

Returns:

  • str

    Type of the job that will be handled.

job_type_id property

ID of the job type in the database.

Returns:

  • int

    ID of the job type in the database.

run_jobs(skip_test=True, max_workers=2)

Runs the job instances that this handler will process.

Parameters:

  • skip_test

    (bool, default: True ) –

    If True, jobs that have "test" in the description will be skipped. Default is True.

  • max_workers

    (int, default: 2 ) –

    Number of jobs to execute concurrently using threads. The GIL is released during all DB/network I/O (the dominant cost of each job), so threads provide real parallelism here. Each job opens its own PerfDB connection — set this no higher than your database's connection limit allows. Default is 2.

Returns:

  • list[int]

    List of job instance ids that failed to run.

Source code in echo_energycalc/job_handler.py
Python
def run_jobs(self, skip_test: bool = True, max_workers: int = 2) -> list[int]:
    """Runs the job instances that this handler will process.

    Parameters
    ----------
    skip_test : bool, optional
        If True, jobs that have "test" in the description will be skipped. Default is True.
    max_workers : int, optional
        Number of jobs to execute concurrently using threads.  The GIL is
        released during all DB/network I/O (the dominant cost of each job),
        so threads provide real parallelism here.  Each job opens its own
        ``PerfDB`` connection — set this no higher than your database's
        connection limit allows.  Default is 2.

    Returns
    -------
    list[int]
        List of job instance ids that failed to run.
    """

    def _run_one(job_id: int) -> int | None:
        """Run a single job; return job_id on failure, None on success."""
        try:
            job_instance = self._job_type_class(job_id, skip_test=skip_test)
            job_instance.run()
            return None
        except Exception:
            logger.exception(f"Error running job {job_id}.")
            PerfDB(application_name=f"{self}").jobs.instances.update(
                jobs={job_id: {"job_state_name": "Failed", "comment": "Job failed due to an unknown error. Check logs."}},
            )
            return job_id

    if max_workers == 1:
        # Sequential path — no executor overhead.
        return [jid for jid in (_run_one(j) for j in self.job_ids) if jid is not None]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(_run_one, job_id): job_id for job_id in self.job_ids}
        return [r for r in (fut.result() for fut in as_completed(futures)) if r is not None]