Config
Configuration of the pipeline stages is implemented via a yaml file. The 'pipeline' section of the yaml file specifies the order in which to run the stages. The 'parameters' section of the yaml file specifies the particular parameters for each of the stages. Each stage has a name, and the names must match between the 'pipeline' and 'parameters' sections.
Standard field names for each stage are: - name - type (e.g. ingest, extract, insights) - subtype (e.g. file ingest, promql ingest) - input_data (list of input (lists)) - output_data (list of output (lists)) - config (configuration specific to this stage)
A sample config file might look like this:
pipeline:
- name: ingest_stage
- name: extract_stage
follows: [ingest_stage]
- name: insights_stage
follows: [extract_stage]
parameters:
- name: ingest_stage
type: ingest
subtype: file
input_data: []
output_data: [signals]
config:
file_name: <../some_file.json>
- name: extract_stage
type: extract
subtype: <some extract subtype>
input_data: [signals]
output_data: [extracted_signals]
config:
- name: insights_stage
type: insights
subtype: <some insights subtype>
input_data: [extracted_signals]
output_data: [text_insights]
config:
A stage may follow multiple other stages, and may receive input from multiple earlier stages.
Data Types
Time Series
Time Series data is typically provided using the prometheus model with the following fields:
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"__name__": "commit_latency_ms",
"instance": "02:00:06:ff:fe:4b:b5:ac",
"plugin": "ceph",
"label": "ceph",
"snapshotId": "T9Arslj9-1Y3UYv6biDGDizVqzI",
"job": "prometheus"
},
"values": [
[1715252491.0, 0.0],
[1715252492.0, 0.0],
....
]
}
.....
]
}
}
The metric
field contains various metadata, including the __name__
of the metric being reported; other fields are optional.
The time series data are contained in the values
field as a list of <timestamp, value>
ordered pairs.
Signal
Signal
is an internal data type used to store time-series data.
It stores the metadata provided in the metric
field and the time-series provided in the values
field.
Signals
Signals
is an internal data type used to store multiple objects of type Signal
.
It has some internally defined metadata plus a list of Signal
structures.
Details about some types of stages
Ingest
An ingest
-type stage typically reads data from some external source.
The details of the external source (file location, url, security parameters) are provided in the config
section of the stage parameters.
An ingest
stage is expected to have no input_data
(input_data: []
).
An ingest
type stage outputs a list containing a single element (of type Signals
).
Extract
An extract
-type stage typically performs some kind of transformation or metadata generation on Signals
.
The input_data
should contain a single Signals
element and the output_data
should contain a single Signals
element.
For example:
- name: feature_extraction_tsfel
type: extract
subtype: tsfel
input_data: [classified_signals]
output_data: [extracted_signals]
Map Reduce
A map_reduce stage takes some input, breaks it up into some number of pieces,
and then runs some computation (possibly in parallel) on each of the pieces.
The output of the map
operation is a list of sublists.
Each sublist is provided to a different instance of the compute
.
The compute
part of a map_reduce stage takes a single sublist and outputs a single sublist.
The outputs of each of the computations are then collected by the reduce
operation into a combined output.
The config of a map_reduce stage looks like the following:
- name: parallel_extract_stage
type: map_reduce
input_data: [signals]
output_data: [extracted_signals]
config:
map_function:
name: map1
type: map
subtype: simple
config:
<any parameters needed to customize the map operation>
compute_function:
name: extract_in_parallel
type: extract
subtype: tsfel
config:
<any parameters needed to customize the compute operation>
reduce_function:
name: reduce1
type: reduce
subtype: simple
config:
<any parameters needed to customize the reduce operation>
All map_reduce compute
operations are of the same structure.
A single list as input (of type Signals) and a single list as output (of type Signals).
These must be preregistered in the code base as valid map_reduce compute
operations.
The map
and reduce
operations must likewise be preregistered in the code base as such operations.
The map_reduce operation may be performed in parallel on multiple processes. This is achieved by specifying an additional configuration parameter under global_settings.
global_settings:
number_of_workers: 8