Skip to content

Job Instances

Jobs are periodic or on-demand tasks that run in the Performance Server. They are stored in performance_db as rows in the job_instances table, along with their parameters (JSON) and execution state. A pool of workers (JobHandler) polls the database for pending jobs and dispatches them.


Database Structure

Table Purpose
job_types Defines available job types. Each type has a name (string key linking to a Python class) and a parameters_schema (JSON Schema for parameter validation).
job_states Enum of possible states: Created, Queued, Running, Finished, Failed.
job_instances Stores individual job runs with type, state, parameters (JSON), timestamps, and a comment/log field.
job_steps Stores the individual units of work within a job. Each step has its own state and error message.

Job Instance vs. Job Step: A job instance is a single request to do some work (e.g., "calculate alarms for LAN-01, LAN-02 with alarms 111, 112"). The job creates multiple steps (one per object-alarm pair). This provides fine-grained progress tracking and partial failure recovery.


State Machine

Text Only
[Created]  →  [Queued]  →  [Running]  →  [Finished]
                                    └──→  [Failed]
  • Created → set by whoever creates the job_instances row.
  • Queued → set by JobInstance.__init__ after successfully creating all steps.
  • Running → set by JobInstance.run() when execution starts.
  • Finished / Failed → set by JobInstance.run() when all steps complete.

Steps independently transition through Created → Running → Finished / Failed. If any step fails, the parent job is marked Failed but remaining steps still run.


Job Instance Base Class

JobInstance provides:

  • __init__: Validates the DB row exists and is "Created", checks the type matches _name, creates steps (unless delay_steps_creation=True).
  • run(): Iterates through all step IDs, calls _run_step(step_id), updates states.
  • Auto-registration via __init_subclass__ so the JobHandler can map type names to classes.

Implementing a new job

Subclasses must provide:

Attribute / Method Purpose
_name class attribute Must match the name column in job_types table
_create_steps() Returns a list[dict] — one dict per step to insert into job_steps
_run_step(step_id) Performs the work for one step; raises JobStepError on failure

Tip

Use delay_steps_creation=True in super().__init__(job_id, skip_test, delay_steps_creation=True) when you need to pre-process parameters (e.g., resolve object lists from filters) before creating steps. Call self._create_database_steps() manually when ready.

Important

Always raise JobStepError (not a generic exception) in _run_step. The error message is stored in the database for inspection. A generic exception will mark the step as Failed with "Unknown error, please check the logs".


Available Job Types

Class DB name Purpose
JobAlarmCalc "Alarm Calculation" Calculate alarm thresholds for a list of objects and alarms
JobBazeTagDelete "Bazefield Tag Delete" Delete tag data from Bazefield for a period
JobBazeInsertAlloc "Bazefield Insert Allocation" Insert allocation history into Bazefield for a list of objects

Class Definitions

JobInstance(job_id, skip_test=True, delay_steps_creation=False)

Base class job instance class. All job instance classes must inherit from this class.

This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the _name class attribute, which should match the name of the job type in the job_types table in the database.

Below is an example of how to create a child class. Comments are omitted for brevity.

Python
class JobAlarmCalc(JobInstance):
    _name = "Alarm Calculation"

    def __init__(
        self,
        job_id: int,
    ) -> None:
        super().__init__(job_id)

Parameters:

  • job_id

    (int) –

    Id of the job instance. It must exist in the job_instances table in the database, have the state "Created" and the type matching the name of the class.

  • skip_test

    (bool, default: True ) –

    If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.

  • delay_steps_creation

    (bool, default: False ) –

    If True, the creation of the steps will be delayed until the create_steps method is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.

Source code in echo_energycalc/job_instance.py
Python
def __init__(
    self,
    job_id: int,
    skip_test: bool = True,
    delay_steps_creation: bool = False,
) -> None:
    """Base class job instance class. All job instance classes must inherit from this class.

    This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the `_name` class attribute, which should match the name of the job type in the `job_types` table in the database.

    Below is an example of how to create a child class. Comments are omitted for brevity.

    ```python
    class JobAlarmCalc(JobInstance):
        _name = "Alarm Calculation"

        def __init__(
            self,
            job_id: int,
        ) -> None:
            super().__init__(job_id)
    ```

    Parameters
    ----------
    job_id : int
        Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
    skip_test : bool, optional
        If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
    delay_steps_creation : bool, optional
        If True, the creation of the steps will be delayed until the `create_steps` method is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.

    """
    # checking arguments
    if not isinstance(job_id, int):
        raise TypeError(f"Argument 'job_id' must be an integer. Got {type(job_id)} instead.")
    # check if self._name is defined in child class
    if not hasattr(self, "_name"):
        raise ValueError(f"AlarmCalculator name is not defined in {self.__class__.__name__}.")

    self._job_id = job_id

    # connection to performance_db
    self._perfdb = PerfDB(application_name=f"{self}")

    # getting job information
    job_info = self._perfdb.jobs.instances.get(job_ids=[job_id])

    if job_id not in job_info:
        raise ValueError(f"Job with id {job_id} does not exist in performance_db.")

    self._parameters = job_info[job_id]["parameters"]
    self._description = job_info[job_id]["description"]
    self._skipped: bool = False
    self._job_step_ids: list[int] | None = None

    if skip_test and "test" in self._description.lower():
        logger.info(f"Job Instance {self.job_id}. Skipping job with description '{self._description}' because it contains 'test'.")
        self._skipped = True
        return

    # Canonical state validation: JobInstance owns the contract that a job must
    # be in "Created" state before it can be initialised.  JobHandler pre-filters
    # at the DB level as an optimisation, but this check is the authoritative one
    # and protects callers that instantiate JobInstance directly.
    if job_info[job_id]["job_state_name"] != "Created":
        raise ValueError(f"Job with id {job_id} is not in state 'Created'. Got {job_info[job_id]['job_state_name']} instead.")

    # checking if the job_type_name matches the name of the class
    if job_info[job_id]["job_type_name"] != self.name:
        raise ValueError(
            f"Job type name {job_info[job_id]['job_type_name']} for job ID {self.job_id} does not match the class name {self._name}.",
        )

    # creating steps
    if not delay_steps_creation:
        self._create_database_steps()

description property

Description of the job instance.

Returns:

  • str

    Description of the job instance.

job_id property

ID of the job instance.

Returns:

  • int

    ID of the job instance.

job_step_ids property

IDs of the job steps.

Returns:

  • list[int] | None

    IDs of the job steps.

name property

Name of the job instance class.

Returns:

  • str

    Name of the job instance class.

parameters property

Parameters of the job instance.

Returns:

  • dict[str, Any]

    Parameters of the job instance.

run()

Runs the job instance.

It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".

Returns:

  • list[int]

    List of IDs of the steps that failed.

Source code in echo_energycalc/job_instance.py
Python
def run(self) -> list[int]:
    """Runs the job instance.

    It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".

    Returns
    -------
    list[int]
        List of IDs of the steps that failed.

    """
    if self._skipped:
        logger.info(f"Job Instance {self.job_id}. Job was skipped during init — nothing to run.")
        return []
    if self._job_step_ids is None:
        raise ValueError(f"Job {self.job_id} has no steps. Ensure _create_database_steps() was called during init.")
    # making sure that the job is in the correct state
    job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
    if job_info[self.job_id]["job_state_name"] != "Queued":
        raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")

    # changing the state of the job to "Running"
    self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})

    logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")

    # iterating over steps
    failed_steps = []
    for step_id in self._job_step_ids:
        # change the state of the step to "Running"
        self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
        logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")

        error_message = None
        try:
            self._run_step(step_id)
        except JobStepError as e:
            error_message = str(e)
            logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
        except Exception as e:
            error_message = f"Unknown error, please check the logs: {e}"
            logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")

        # updating step state
        state = "Failed" if error_message is not None else "Finished"
        self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})

        if error_message is not None:
            failed_steps.append(step_id)

    # updating the state of this job to finished or failed
    state = "Failed" if failed_steps else "Finished"
    comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None

    self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
    if failed_steps:
        logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
    else:
        logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")

    return failed_steps

JobAlarmCalc(job_id, skip_test=True)

Class for handling alarm calculation jobs in the performance database.

The parameters required for this job are: - object_names: list of object names to calculate alarms for - alarm_ids: list of alarm manufacturer ids to calculate - start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S" - end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"

Parameters:

  • job_id

    (int) –

    Id of the job instance. It must exist in the job_instances table in the database, have the state "Created" and the type matching the name of the class.

  • skip_test

    (bool, default: True ) –

    If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.

Source code in echo_energycalc/job_instance_alarm_calc.py
Python
def __init__(
    self,
    job_id: int,
    skip_test: bool = True,
) -> None:
    """Class for handling alarm calculation jobs in the performance database.

    The parameters required for this job are:
    - object_names: list of object names to calculate alarms for
    - alarm_ids: list of alarm manufacturer ids to calculate
    - start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
    - end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"

    Parameters
    ----------
    job_id : int
        Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
    skip_test : bool, optional
        If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
    """
    super().__init__(job_id, skip_test)
    if self._skipped:
        return

    self._parse_period_parameters()
    self._cached_data = {"features": None}

description property

Description of the job instance.

Returns:

  • str

    Description of the job instance.

job_id property

ID of the job instance.

Returns:

  • int

    ID of the job instance.

job_step_ids property

IDs of the job steps.

Returns:

  • list[int] | None

    IDs of the job steps.

name property

Name of the job instance class.

Returns:

  • str

    Name of the job instance class.

parameters property

Parameters of the job instance.

Returns:

  • dict[str, Any]

    Parameters of the job instance.

run()

Runs the job instance.

It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".

Returns:

  • list[int]

    List of IDs of the steps that failed.

Source code in echo_energycalc/job_instance.py
Python
def run(self) -> list[int]:
    """Runs the job instance.

    It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".

    Returns
    -------
    list[int]
        List of IDs of the steps that failed.

    """
    if self._skipped:
        logger.info(f"Job Instance {self.job_id}. Job was skipped during init — nothing to run.")
        return []
    if self._job_step_ids is None:
        raise ValueError(f"Job {self.job_id} has no steps. Ensure _create_database_steps() was called during init.")
    # making sure that the job is in the correct state
    job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
    if job_info[self.job_id]["job_state_name"] != "Queued":
        raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")

    # changing the state of the job to "Running"
    self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})

    logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")

    # iterating over steps
    failed_steps = []
    for step_id in self._job_step_ids:
        # change the state of the step to "Running"
        self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
        logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")

        error_message = None
        try:
            self._run_step(step_id)
        except JobStepError as e:
            error_message = str(e)
            logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
        except Exception as e:
            error_message = f"Unknown error, please check the logs: {e}"
            logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")

        # updating step state
        state = "Failed" if error_message is not None else "Finished"
        self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})

        if error_message is not None:
            failed_steps.append(step_id)

    # updating the state of this job to finished or failed
    state = "Failed" if failed_steps else "Finished"
    comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None

    self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
    if failed_steps:
        logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
    else:
        logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")

    return failed_steps