Job Instances¶
Jobs are period tasks that run in the performance server. They can be used to perform calculations, import data, etc. Anything can be done, depending on the job implementation. The idea behind them is to have a centralized queue of jobs in the performance database and a series of workers that will look for jobs to run periodically. This allows easy management of the jobs as a frontend application can easily create, update, and delete jobs, having no need to interact with the backend that runs the jobs.
Database Structure¶
The database has a series of tables for storing the jobs, it's states and configurations. They are:
job_types: Defines the type of the job. Each type will be linked to a specific class that will run the job. This table also has a column calledparameters_shema, which is a JSON schema that defines the parameters that the job will receive. This schema is used to validate the parameters of the job when it is created.job_states: Defines the possible states of a job. Currently, the states areCreated,Queued,Running,FinishedandFailed.job_instances: Stores the instances of the jobs. Each instance will have a type, a state, a configuration, the main timestamps (created, started, finished and last_state_change) and a free comment field for useful logs.job_steps: Stores the steps of a job. Each step is linked to a job instance and a specific object. It can also be linked to an alarm or a feature if needed.
TO better understand the difference from a job instance and a job step, let's consider the following example: A job instance could be an alarm calculation for objects LAN-01 and LAN-02 and alarms 111 and 112. In this case 4 job steps will exist, one for each object-alarm pair. Each of these steps will have it's status updated as the job runs, allowing for a more detailed view of the job progress.
Job Instance Base Class¶
The base class for a job instance is JobInstance. It is defined in the job_instance module and must be used as a base class for all job instances. This base class already implements the following:
- A default
__init__method that receives the job instance id and do some database checks to make sure this job instance exists and is of the correct type. It also creates all the job steps for this job instance. - A
runmethod that is the main method that will be called by the worker to run the job. This consists mainly of a for loop that calls the_run_stepmethod for each step of the job and updates the states depending on the return of this method. - Multiple properties used to retrieve attributes of the job instance, like the job type, the job id, etc.
Subclasses of JobInstance must implement the following attributes and methods:
-
_nameclass attribute: A string with the name of the job instance. This is used to identify the job instance type in the in the database. For example, in case of alarm calculations, the class definition starts like this:class JobAlarmCalc(JobInstance): _name = "Alarm Calculation" -
_create_stepsmethod: This should create all the job steps for this job instance based on the parameters of the job instance. Ideally, this should take into account the order of the steps so that dependent calculations are run in the correct order. The method will create a list of dictionaries, being the keys of each dict the necessary columns of thejob_stepstable. _run_stepmethod: This should run the step of the job. It receives the id of the step and returns None. The method should perform all the logic of the step and raise anJobStepErrorin case of failure. It's important to use theJobStepErroras the message of this exception will be stored in the database and can be used to debug the job.
Currently implemented job instances¶
Currently, the following job instances types are implemented:
JobAlarmCalc: Used to create alarms based on object features. This uses theAlarmCalcThresholdclass to perform the calculations.- Required parameters:
object_names: A list of object names to calculate the alarms.alarm_ids: A list of alarm ids to calculate.start: The start date of the calculation.end: The end date of the calculation.
- Required parameters:
Class Definitions¶
JobInstance(job_id, skip_test=True, delay_steps_creation=False)
¶
Base class job instance class. All job instance classes must inherit from this class.
This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the _name class attribute, which should match the name of the job type in the job_types table in the database.
Below is an example of how to create a child class. Comments are omitted for brevity.
class JobAlarmCalc(JobInstance):
_name = "Alarm Calculation"
def __init__(
self,
job_id: int,
) -> None:
super().__init__(job_id)
Parameters:
-
(job_id¶int) –Id of the job instance. It must exist in the
job_instancestable in the database, have the state "Created" and the type matching the name of the class. -
(skip_test¶bool, default:True) –If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
-
(delay_steps_creation¶bool, default:False) –If True, the creation of the steps will be delayed until the
create_stepsmethod is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.
Source code in echo_energycalc/job_instance.py
def __init__(
self,
job_id: int,
skip_test: bool = True,
delay_steps_creation: bool = False,
) -> None:
"""Base class job instance class. All job instance classes must inherit from this class.
This class should not be instantiated directly. It should be inherited by child classes defining specific job types. Each child class must define the `_name` class attribute, which should match the name of the job type in the `job_types` table in the database.
Below is an example of how to create a child class. Comments are omitted for brevity.
```python
class JobAlarmCalc(JobInstance):
_name = "Alarm Calculation"
def __init__(
self,
job_id: int,
) -> None:
super().__init__(job_id)
```
Parameters
----------
job_id : int
Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
skip_test : bool, optional
If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
delay_steps_creation : bool, optional
If True, the creation of the steps will be delayed until the `create_steps` method is called. This is useful for subclasses that need to do some processing before creating the steps. Default is False.
"""
# checking arguments
if not isinstance(job_id, int):
raise TypeError(f"Argument 'job_id' must be an integer. Got {type(job_id)} instead.")
# check if self._name is defined in child class
if not hasattr(self, "_name"):
raise ValueError(f"AlarmCalculator name is not defined in {self.__class__.__name__}.")
self._job_id = job_id
# connection to performance_db
self._perfdb = PerfDB(application_name=f"{self}")
# getting job information
job_info = self._perfdb.jobs.instances.get(job_ids=[job_id])
if job_id not in job_info:
raise ValueError(f"Job with id {job_id} does not exist in performance_db.")
self._parameters = job_info[job_id]["parameters"]
self._description = job_info[job_id]["description"]
if skip_test and "test" in self._description.lower():
logger.info(f"Job Instance {self.job_id}. Skipping job with description '{self._description}' because it contains 'test'.")
return
# checking if state of the job is "Created"
if job_info[job_id]["job_state_name"] != "Created":
raise ValueError(f"Job with id {job_id} is not in state 'Created'. Got {job_info[job_id]['job_state_name']} instead.")
# checking if the job_type_name matches the name of the class
if job_info[job_id]["job_type_name"] != self.name:
raise ValueError(
f"Job type name {job_info[job_id]['job_type_name']} for job ID {self.job_id} does not match the class name {self._name}.",
)
# creating steps
if not delay_steps_creation:
self._create_database_steps()
description
property
¶
Description of the job instance.
Returns:
-
str–Description of the job instance.
job_id
property
¶
ID of the job instance.
Returns:
-
int–ID of the job instance.
job_step_ids
property
¶
IDs of the job steps.
Returns:
-
list[int] | None–IDs of the job steps.
name
property
¶
Name of the job instance class.
Returns:
-
str–Name of the job instance class.
parameters
property
¶
Parameters of the job instance.
Returns:
-
dict[str, Any]–Parameters of the job instance.
run()
¶
Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns:
-
list[int]–List of IDs of the steps that failed.
Source code in echo_energycalc/job_instance.py
def run(self) -> list[int]:
"""Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns
-------
list[int]
List of IDs of the steps that failed.
"""
# making sure that the job is in the correct state
job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
if job_info[self.job_id]["job_state_name"] != "Queued":
raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")
# changing the state of the job to "Running"
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")
# iterating over steps
failed_steps = []
for step_id in self._job_step_ids:
# change the state of the step to "Running"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")
error_message = None
try:
self._run_step(step_id)
except JobStepError as e:
error_message = str(e)
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
except Exception as e:
error_message = f"Unknown error, please check the logs: {e}"
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
# updating step state
state = "Failed" if error_message is not None else "Finished"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})
if error_message is not None:
failed_steps.append(step_id)
# updating the state of this job to finished or failed
state = "Failed" if failed_steps else "Finished"
comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
if failed_steps:
logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
else:
logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")
return failed_steps
JobAlarmCalc(job_id, skip_test=True)
¶
Class for handling alarm calculation jobs in the performance database.
The parameters required for this job are: - object_names: list of object names to calculate alarms for - alarm_ids: list of alarm manufacturer ids to calculate - start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S" - end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
Parameters:
-
(job_id¶int) –Id of the job instance. It must exist in the
job_instancestable in the database, have the state "Created" and the type matching the name of the class. -
(skip_test¶bool, default:True) –If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
Source code in echo_energycalc/job_instance_alarm_calc.py
def __init__(
self,
job_id: int,
skip_test: bool = True,
) -> None:
"""Class for handling alarm calculation jobs in the performance database.
The parameters required for this job are:
- object_names: list of object names to calculate alarms for
- alarm_ids: list of alarm manufacturer ids to calculate
- start: start datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
- end: end datetime of the calculation period in the format "%Y-%m-%d %H:%M:%S"
Parameters
----------
job_id : int
Id of the job instance. It must exist in the `job_instances` table in the database, have the state "Created" and the type matching the name of the class.
skip_test : bool, optional
If True, jobs that have "test" in the description will be skipped. This is useful for allowing creation of test jobs in automatic tests. Default is True.
"""
super().__init__(job_id, skip_test)
# converting start and end from parameters to datetime. They are in the format "%Y-%m-%d %H:%M:%S"
self._parameters["start"] = datetime.strptime(self._parameters["start"], "%Y-%m-%d %H:%M:%S")
self._parameters["end"] = datetime.strptime(self._parameters["end"], "%Y-%m-%d %H:%M:%S")
# adding period to parameters
self._parameters["period"] = DateTimeRange(self._parameters["start"], self._parameters["end"])
# cached data
self._cached_data = {"features": None}
description
property
¶
Description of the job instance.
Returns:
-
str–Description of the job instance.
job_id
property
¶
ID of the job instance.
Returns:
-
int–ID of the job instance.
job_step_ids
property
¶
IDs of the job steps.
Returns:
-
list[int] | None–IDs of the job steps.
name
property
¶
Name of the job instance class.
Returns:
-
str–Name of the job instance class.
parameters
property
¶
Parameters of the job instance.
Returns:
-
dict[str, Any]–Parameters of the job instance.
run()
¶
Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns:
-
list[int]–List of IDs of the steps that failed.
Source code in echo_energycalc/job_instance.py
def run(self) -> list[int]:
"""Runs the job instance.
It will run all steps in the job instance. If a step fails, the state of the job instance will be set to "Failed".
Returns
-------
list[int]
List of IDs of the steps that failed.
"""
# making sure that the job is in the correct state
job_info = self._perfdb.jobs.instances.get(job_ids=[self.job_id])
if job_info[self.job_id]["job_state_name"] != "Queued":
raise ValueError(f"Job with id {self.job_id} is not in state 'Queued'. Got {job_info[self.job_id]['job_state_name']} instead.")
# changing the state of the job to "Running"
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running {len(self._job_step_ids)} steps in the order {self._job_step_ids}.")
# iterating over steps
failed_steps = []
for step_id in self._job_step_ids:
# change the state of the step to "Running"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": "Running", "comment": None}})
logger.info(f"Job Instance {self.job_id}. Running step {step_id}.")
error_message = None
try:
self._run_step(step_id)
except JobStepError as e:
error_message = str(e)
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
except Exception as e:
error_message = f"Unknown error, please check the logs: {e}"
logger.exception(f"Job Instance {self.job_id}. Step {step_id} failed")
# updating step state
state = "Failed" if error_message is not None else "Finished"
self._perfdb.jobs.steps.update(job_steps={step_id: {"job_state_name": state, "comment": error_message}})
if error_message is not None:
failed_steps.append(step_id)
# updating the state of this job to finished or failed
state = "Failed" if failed_steps else "Finished"
comment = f"{len(failed_steps)} ({len(failed_steps) / len(self._job_step_ids):.2%}) steps failed" if failed_steps else None
self._perfdb.jobs.instances.update(jobs={self.job_id: {"job_state_name": state, "comment": comment}})
if failed_steps:
logger.error(f"Job Instance {self.job_id}. {len(failed_steps)} steps failed. State is now '{state}'.")
else:
logger.info(f"Job Instance {self.job_id}. All steps finished successfully. State is now '{state}'.")
return failed_steps