transforms

class transforms.ColumnFunctionTransform(dataset_name: str, output_dataset_name: str, input_column: str, output_column: str, function: Callable[[Series], Series] | str | None = None, extra_code: str | None = None)[source]

Bases: Transform, ExtraCodeFunctionMixin, YAMLObject

A transform for applying a function to every value in a column of a DataFrame

Parameters:
dataset_name: str

the dataset to operate on

output_dataset_name: str

the name given to the output dataset

input_column: str

the name of the column the function should be applied to

output_column: str

the name given to the output column containing the results of applying the function

function: Union[Callable[[pandas.Series], pandas.Series], str]

The unary function to apply to the values in the chosen column.

extra_code: Optional[str]

This can contain additional code for the transform function, such as the definition of a function over multiple lines or split into multiple functions for readibility.

Attributes:
yaml_flow_style

Methods

evaluate_function(function, extra_code)

Compile and evaluate the given function and an additional, optional code fragment within a separate global environment and return the executable function object.

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

prepare

process

set_name

yaml_dumper

prepare()[source]
process(data, attributes)[source]
yaml_tag = '!ColumnFunctionTransform'
class transforms.ConcatTransform(dataset_names: List[str] | None, output_dataset_name: str)[source]

Bases: Transform, YAMLObject

A transform for concatenating all DataFrames from the given datasets.

Parameters:
dataset_names: Optional[List[str]]

the list of datasets to concatenate

output_dataset_name: str

the name given to the output dataset

Attributes:
yaml_flow_style

Methods

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

concat

prepare

process

set_name

yaml_dumper

concat(dfs: List[DataFrame])[source]
prepare()[source]
yaml_tag = '!ConcatTransform'
class transforms.FunctionTransform(dataset_name: str, output_dataset_name: str, function: Callable[[DataFrame], DataFrame] | str | None = None, extra_code: str | None = None)[source]

Bases: Transform, ExtraCodeFunctionMixin, YAMLObject

A transform for applying a arbitrary function to a whole DataFrame.

Parameters:
dataset_name: str

The dataset to operate on.

output_dataset_name: str

The name given to the output dataset.

function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], str]

The unary function to apply to each DataFrame of the dataset. It takes the full DataFrame as its only argument and returns a DataFrame.

extra_code: Optional[str]

This can contain additional code for the transform function, such as the definition of a function over multiple lines or split into multiple functions for readibility.

Attributes:
yaml_flow_style

Methods

evaluate_function(function, extra_code)

Compile and evaluate the given function and an additional, optional code fragment within a separate global environment and return the executable function object.

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

prepare

process

set_name

yaml_dumper

prepare()[source]
process(data, attributes) DataFrame[source]
yaml_tag = '!FunctionTransform'
class transforms.GroupedAggregationTransform(dataset_name: str, output_dataset_name: str, input_column: str, output_column: str, grouping_columns: ~typing.List, raw: bool = False, pre_concatenate: bool = False, aggregation_function: ~typing.Callable[[~pandas.core.series.Series], object] | str | None = None, extra_code: str | None = None, timestamp_selector: ~typing.Callable = <function NDFrame.head>)[source]

Bases: Transform, ExtraCodeFunctionMixin, YAMLObject

A transform for dividing a dataset into distinct partitions with pandas.DataFrame.groupby, each sharing the same value in the specified list of grouping/partitioning column names, and then applying a function to the values in a given column of a that partition, producing a aggregate scalar value.

Parameters:
dataset_name: str

the dataset to operate on

output_dataset_name: str

the name given to the output dataset

input_column: str

the name of the column the function should be applied to

output_column: str

the name given to the output column containing the results of applying the function

grouping_columns: List

the set of columns used for partitioning the dataset

raw: bool

whether to append the raw output of transform_function to the result list

pre_concatenate: bool

concatenate all input DataFrames before processing

aggregation_function: Union[Callable[[pandas.Series], object], str]

The unary function to apply to a each partition. Should expect an pandas.Series as argument and return a scalar value.

extra_code: Optional[str]

This can contain additional code for the transform function, such as the definition of a function over multiple lines or split into multiple functions for readibility.

timestamp_selector: Callable

the function to select the row in the partition data as template for the output in case of aggregation

Attributes:
yaml_flow_style

Methods

evaluate_function(function, extra_code)

Compile and evaluate the given function and an additional, optional code fragment within a separate global environment and return the executable function object.

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

aggregate_frame

prepare

process

set_name

yaml_dumper

aggregate_frame(data)[source]
prepare()[source]
yaml_tag = '!GroupedAggregationTransform'
class transforms.GroupedFunctionTransform(dataset_name: str, output_dataset_name: str, input_column: str, output_column: str, grouping_columns: ~typing.List, raw: bool = False, aggregate: bool = False, pre_concatenate: bool = False, transform_function: ~typing.Callable[[~pandas.core.frame.DataFrame], ~pandas.core.frame.DataFrame] | ~typing.Callable[[~pandas.core.frame.DataFrame], object] | str | None = None, extra_code: str | None = None, timestamp_selector: ~typing.Callable = <function NDFrame.head>)[source]

Bases: Transform, ExtraCodeFunctionMixin, YAMLObject

A transform for dividing a dataset into distinct partitions with pandas.DataFrame.groupby, each sharing the same value in the specified list of grouping/partitioning column names, and then applying a function to that partition.

Parameters:
dataset_name: str

the dataset to operate on

output_dataset_name: str

the name given to the output dataset

input_column: str

the name of the column the function should be applied to

output_column: str

the name given to the output column containing the results of applying the function

grouping_columns: List

the set of columns used for partitioning the dataset

raw: bool

whether to append the raw output of transform_function to the result list

aggregate: bool

whether the transform function returns a scalar or an object (like a pandas.DataFrame)

pre_concatenate: bool

concatenate all input DataFrames before processing

transform_function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object], str]

The unary function to apply to a each partition. Should expect an pandas.DataFrame as argument and return a pandas.DataFrame (or an arbitrary object if raw is true).

extra_code: Optional[str]

This can contain additional code for the transform function, such as the definition of a function over multiple lines or split into multiple functions for readibility.

timestamp_selector: Callable

the function to select the row in the partition data as template for the output in case of aggregation

Attributes:
yaml_flow_style

Methods

evaluate_function(function, extra_code)

Compile and evaluate the given function and an additional, optional code fragment within a separate global environment and return the executable function object.

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

aggregate_frame

prepare

process

set_name

yaml_dumper

aggregate_frame(data)[source]
prepare()[source]
yaml_tag = '!GroupedFunctionTransform'
class transforms.MergeTransform(dataset_name_left: str, dataset_name_right: str, output_dataset_name: str, left_key_columns: List[str] | None = None, right_key_columns: List[str] | None = None, match_by_filename: bool = True, matching_attribute: str = 'source_files')[source]

Bases: Transform, YAMLObject

A transform for merging the columns from two DataFrames, from two distinct datasets, similarly to a SQL INNER JOIN.

Basically a wrapper around pandas.merge

Parameters:
dataset_name_left: str

the left dataset to operate on

dataset_name_right: str

the right dataset to operate on

output_dataset_name: str

the name given to the output dataset

right_key_columns: str

the name of the column from the right dataset taht is used as key for joining

left_key_columns: str

the name of the column from the left dataset taht is used as key for joining

match_by_filename: bool

whether to match merge input by the filename the data has been extracted from

matching_attribute: str

the attribute to match the datasets on

Attributes:
yaml_flow_style

Methods

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

merge

prepare

prepare_matched_by_attribute

prepare_simple_sequential

process

set_name

yaml_dumper

merge(data_l: DataFrame, data_r: DataFrame, left_key_columns: List[str] | None = None, right_key_columns: List[str] | None = None)[source]
prepare()[source]
prepare_matched_by_attribute()[source]
prepare_simple_sequential()[source]
yaml_tag = '!MergeTransform'
class transforms.Transform[source]

Bases: YAMLObject

The base class for all transforms

Attributes:
yaml_flow_style

Methods

from_yaml(loader, node)

Convert a representation node to a Python object.

get_data(dataset_name)

Retrieve a dataset with the given name from the data repository associated with this transform

set_data_repo(data_repo)

to_yaml(dumper, data)

Convert a Python object to a representation node.

prepare

process

set_name

yaml_dumper

get_data(dataset_name: str)[source]

Retrieve a dataset with the given name from the data repository associated with this transform

Parameters:
dataset_namestr

The name of the dataset to retrieve from the data repository

prepare()[source]
process(data: DataFrame)[source]
set_data_repo(data_repo: dict)[source]
Parameters:
data_repodict

The dictionary containing all loaded datasets necessary for this transform

set_name(name: str)[source]
yaml_tag = '!Transform'
transforms.register_constructors()[source]

Register YAML constructors for all transforms