Job Instances¶
Jobs are periodic or on-demand tasks that run in the Performance Server. They are stored in performance_db as rows in the job_instances table, along with their parameters (JSON) and execution state. A pool of workers (JobHandler) polls the database for pending jobs and dispatches them.
Database Structure¶
| Table | Purpose |
|---|---|
job_types |
Defines available job types. Each type has a name (string key linking to a Python class) and a parameters_schema (JSON Schema for parameter validation). |
job_states |
Enum of possible states: Created, Queued, Running, Finished, Failed. |
job_instances |
Stores individual job runs with type, state, parameters (JSON), timestamps, and a comment/log field. |
job_steps |
Stores the individual units of work within a job. Each step has its own state and error message. |
Job Instance vs. Job Step: A job instance is a single request to do some work (e.g., "calculate alarms for LAN-01, LAN-02 with alarms 111, 112"). The job creates multiple steps (one per object-alarm pair). This provides fine-grained progress tracking and partial failure recovery.
State Machine¶
[Created] → [Queued] → [Running] → [Finished]
└──→ [Failed]
- Created → set by whoever creates the
job_instancesrow. - Queued → set by
JobInstance.__init__after successfully creating all steps. - Running → set by
JobInstance.run()when execution starts. - Finished / Failed → set by
JobInstance.run()when all steps complete.
Steps independently transition through Created → Running → Finished / Failed. If any step fails, the parent job is marked Failed but remaining steps still run.
Job Instance Base Class¶
JobInstance provides:
__init__: Validates the DB row exists and is"Created", checks the type matches_name, creates steps (unlessdelay_steps_creation=True).run(): Iterates through all step IDs, calls_run_step(step_id), updates states.- Auto-registration via
__init_subclass__so theJobHandlercan map type names to classes.
Implementing a new job¶
Subclasses must provide:
| Attribute / Method | Purpose |
|---|---|
_name class attribute |
Must match the name column in job_types table |
_create_steps() |
Returns a list[dict] — one dict per step to insert into job_steps |
_run_step(step_id) |
Performs the work for one step; raises JobStepError on failure |
Tip
Use delay_steps_creation=True in super().__init__(job_id, skip_test, delay_steps_creation=True) when you need to pre-process parameters (e.g., resolve object lists from filters) before creating steps. Call self._create_database_steps() manually when ready.
Important
Always raise JobStepError (not a generic exception) in _run_step. The error message is stored in the database for inspection. A generic exception will mark the step as Failed with "Unknown error, please check the logs".
Available Job Types¶
| Class | DB name | Purpose |
|---|---|---|
JobAlarmCalc |
"Alarm Calculation" |
Calculate alarm thresholds for a list of objects and alarms |
JobBazeTagDelete |
"Bazefield Tag Delete" |
Delete tag data from Bazefield for a period |
JobBazeInsertAlloc |
"Bazefield Insert Allocation" |
Insert allocation history into Bazefield for a list of objects |
Class Definitions¶
JobInstance(job_id, skip_test=True, delay_steps_creation=False)
¶
Base class job instance class. All job instance classes must inherit from this class.
This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the _name class attribute, which should match the name of the job type in the job_types table in the database.
Below is an example of how to create a child class. Comments are omitted for brevity.
class JobAlarmCalc(JobInstance):
_name = "Alarm Calculation"
def __init__(
self,
job_id: int,
) -> None:
super().__init__(job_id)
Parameters:
-
(job_id¶int) –Id of the job instance. It must exist in the
job_instancestable in the database, have the state "Created" and the type matching the name of the class. -
(skip_test¶bool, default:True) –If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
-
(delay_steps_creation¶bool, default:False) –If True, the creation of the steps will be delayed until the
create_stepsmethod is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.
Source code in echo_energycalc/job_instance.py
def __init__(
self,
job_id: int,
skip_test: bool = True,
delay_steps_creation: bool = False,
) -> None:
"""Base class job instance class. All job instance classes must inherit from this class.
This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the `_name` class attribute, which should match the name of the job type in the `job_types` table in the database.
Below is an example of how to create a child class. Comments are omitted for brevity.
```python
class JobAlarmCalc(JobInstance):
_name = "Alarm Calculation"
def __init__(
self,
job_id: int,
) -> None:
super().__init__(job_id)
```
Parameters
----------
job_id : int
Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
skip_test : bool, optional
If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
delay_steps_creation : bool, optional
If True, the creation of the steps will be delayed until the `create_steps` method is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.
"""
# checking arguments
if not isinstance(job_id, int):
raise TypeError(f"Argument 'job_id' must be an integer. Got {type(job_id)} instead.")
# check if self._name is defined in child class
if not hasattr(self, "_name"):
raise ValueError(f"AlarmCalculator name is not defined in {self.__class__.__name__}.")
self._job_id = job_id
# connection to performance_db
self._perfdb = PerfDB(application_name=f"{self}")
# getting job information
job_info = self._perfdb.jobs.instances.get(job_ids=[job_id])
if job_id not in job_info:
raise ValueError(f"Job with id {job_id} does not exist in performance_db.")
self._parameters = job_info[job_id]["parameters"]
self._description = job_info[job_id]["description"]
self._skipped: bool = False
self._job_step_ids: list[int] | None = None
if skip_test and "test" in self._description.lower():
logger.info(f"Job Instance {self.job_id}. Skipping job with description '{self._description}' because it contains 'test'.")
self._skipped = True
return
# Canonical state validation: JobInstance owns the contract that a job must
# be in "Created" state before it can be initialised. JobHandler pre-filters
# at the DB level as an optimisation, but this check is the authoritative one
# and protects callers that instantiate JobInstance directly.
if job_info[job_id]["job_state_name"] != "Created":
raise ValueError(f"Job with id {job_id} is not in state 'Created'. Got {job_info[job_id]['job_state_name']} instead.")
# checking if the job_type_name matches the name of the class
if job_info[job_id]["job_type_name"] != self.name:
raise ValueError(
f"Job type name {job_info[job_id]['job_type_name']} for job ID {self.job_id} does not match the class name {self._name}.",
)
# creating steps
if not delay_steps_creation:
self._create_database_steps()
description
property
¶
Description of the job instance.
Returns:
-
str–Description of the job instance.
job_id
property
¶
ID of the job instance.
Returns:
-
int–ID of the job instance.
job_step_ids
property
¶
IDs of the job steps.
Returns:
-
list[int] | None–IDs of the job steps.
name
property
¶
Name of the job instance class.
Returns:
-
str–Name of the job instance class.
parameters
property
¶
Parameters of the job instance.
Returns:
-
dict[str, Any]–Parameters of the job instance.
run()
¶
Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns:
-
list[int]–List of IDs of the steps that failed.
Source code in echo_energycalc/job_instance.py
def run(self) -> list[int]:
"""Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns
-------
list[int]
List of IDs of the steps that failed.
"""
if self._skipped:
logger.info(f"Job Instance {self.job_id}. Job was skipped during init — nothing to run.")
return []
if self._job_step_ids is None:
raise ValueError(f"Job {self.job_id} has no steps. Ensure _create_database_steps() was called during init.")
# making sure that the job is in the correct state
job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
if job_info[self.job_id]["job_state_name"] != "Queued":
raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")
# changing the state of the job to "Running"
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")
# iterating over steps
failed_steps = []
for step_id in self._job_step_ids:
# change the state of the step to "Running"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")
error_message = None
try:
self._run_step(step_id)
except JobStepError as e:
error_message = str(e)
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
except Exception as e:
error_message = f"Unknown error, please check the logs: {e}"
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
# updating step state
state = "Failed" if error_message is not None else "Finished"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})
if error_message is not None:
failed_steps.append(step_id)
# updating the state of this job to finished or failed
state = "Failed" if failed_steps else "Finished"
comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
if failed_steps:
logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
else:
logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")
return failed_steps
JobAlarmCalc(job_id, skip_test=True)
¶
Class for handling alarm calculation jobs in the performance database.
The parameters required for this job are: - object_names: list of object names to calculate alarms for - alarm_ids: list of alarm manufacturer ids to calculate - start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S" - end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
Parameters:
-
(job_id¶int) –Id of the job instance. It must exist in the
job_instancestable in the database, have the state "Created" and the type matching the name of the class. -
(skip_test¶bool, default:True) –If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
Source code in echo_energycalc/job_instance_alarm_calc.py
def __init__(
self,
job_id: int,
skip_test: bool = True,
) -> None:
"""Class for handling alarm calculation jobs in the performance database.
The parameters required for this job are:
- object_names: list of object names to calculate alarms for
- alarm_ids: list of alarm manufacturer ids to calculate
- start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
- end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
Parameters
----------
job_id : int
Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
skip_test : bool, optional
If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
"""
super().__init__(job_id, skip_test)
if self._skipped:
return
self._parse_period_parameters()
self._cached_data = {"features": None}
description
property
¶
Description of the job instance.
Returns:
-
str–Description of the job instance.
job_id
property
¶
ID of the job instance.
Returns:
-
int–ID of the job instance.
job_step_ids
property
¶
IDs of the job steps.
Returns:
-
list[int] | None–IDs of the job steps.
name
property
¶
Name of the job instance class.
Returns:
-
str–Name of the job instance class.
parameters
property
¶
Parameters of the job instance.
Returns:
-
dict[str, Any]–Parameters of the job instance.
run()
¶
Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns:
-
list[int]–List of IDs of the steps that failed.
Source code in echo_energycalc/job_instance.py
def run(self) -> list[int]:
"""Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns
-------
list[int]
List of IDs of the steps that failed.
"""
if self._skipped:
logger.info(f"Job Instance {self.job_id}. Job was skipped during init — nothing to run.")
return []
if self._job_step_ids is None:
raise ValueError(f"Job {self.job_id} has no steps. Ensure _create_database_steps() was called during init.")
# making sure that the job is in the correct state
job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
if job_info[self.job_id]["job_state_name"] != "Queued":
raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")
# changing the state of the job to "Running"
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")
# iterating over steps
failed_steps = []
for step_id in self._job_step_ids:
# change the state of the step to "Running"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")
error_message = None
try:
self._run_step(step_id)
except JobStepError as e:
error_message = str(e)
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
except Exception as e:
error_message = f"Unknown error, please check the logs: {e}"
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
# updating step state
state = "Failed" if error_message is not None else "Finished"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})
if error_message is not None:
failed_steps.append(step_id)
# updating the state of this job to finished or failed
state = "Failed" if failed_steps else "Finished"
comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
if failed_steps:
logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
else:
logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")
return failed_steps