mleko.pipeline.steps.convert_step#

Module for data conversion as a part of the pipeline process.

This module contains the ConvertStep class which is a specialized pipeline step for handling data format conversion. It uses the provided BaseDataConverter for converting the data into the desired format.

Module Contents#

Classes#

ConvertStepInputType

The input type of the ConvertStep.

ConvertStepOutputType

The output type of the ConvertStep.

ConvertStep

Pipeline step that manages data conversion from one format to another.

Attributes#

logger

The logger for the module.

TypedDictType

Type variable for TypedDict type annotations.

mleko.pipeline.steps.convert_step.logger#

The logger for the module.

mleko.pipeline.steps.convert_step.TypedDictType#

Type variable for TypedDict type annotations.

class mleko.pipeline.steps.convert_step.ConvertStepInputType#

Bases: typing_extensions.TypedDict

The input type of the ConvertStep.

Initialize self. See help(type(self)) for accurate signature.

file_paths: str | List[pathlib.Path] | List[str]#

List of file paths or the key identifying the list of file paths to be converted.

clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class mleko.pipeline.steps.convert_step.ConvertStepOutputType#

Bases: typing_extensions.TypedDict

The output type of the ConvertStep.

Initialize self. See help(type(self)) for accurate signature.

data_schema: str#

The key for the DataSchema after conversion.

dataframe: str#

The key for the DataFrame after conversion.

clear()#

D.clear() -> None. Remove all items from D.

copy()#

D.copy() -> a shallow copy of D

get()#

Return the value for key if key is in the dictionary, else default.

items()#

D.items() -> a set-like object providing a view on D’s items

keys()#

D.keys() -> a set-like object providing a view on D’s keys

pop()#

D.pop(k[,d]) -> v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()#

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault()#

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update()#

D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()#

D.values() -> an object providing a view on D’s values

class mleko.pipeline.steps.convert_step.ConvertStep(converter: mleko.dataset.convert.BaseConverter, inputs: ConvertStepInputType, outputs: ConvertStepOutputType, cache_group: str | None = None)#

Bases: mleko.pipeline.pipeline_step.PipelineStep

Pipeline step that manages data conversion from one format to another.

Initialize the ConvertStep with the specified data converter.

Parameters:
  • converter (mleko.dataset.convert.BaseConverter) – The DataConverter responsible for handling data format conversion.

  • inputs (ConvertStepInputType) – A dictionary of input keys following the ConvertStepInputType schema.

  • outputs (ConvertStepOutputType) – A dictionary of output keys following the ConvertStepOutputType schema.

  • cache_group (str | None) – The cache group to use.

_inputs: ConvertStepInputType#
_outputs: ConvertStepOutputType#
execute(data_container: mleko.pipeline.data_container.DataContainer, force_recompute: bool, disable_cache: bool) mleko.pipeline.data_container.DataContainer#

Perform data format conversion using the configured converter.

Parameters:
  • data_container (mleko.pipeline.data_container.DataContainer) – Contains a list of file Paths to be converted.

  • force_recompute (bool) – Whether to force the step to recompute its output, even if it already exists.

  • disable_cache (bool) – If set to True, disables the cache.

Raises:

ValueError – If the input data types are invalid.

Returns:

A DataContainer containing the result.

Return type:

mleko.pipeline.data_container.DataContainer

_get_input_model() type[ConvertStepInputType]#

Get the input type for the ConvertStep.

Returns:

Input type for the ConvertStep.

Return type:

type[ConvertStepInputType]

_get_output_model() type[ConvertStepOutputType]#

Get the output type for the ConvertStep.

Returns:

Output type for the ConvertStep.

Return type:

type[ConvertStepOutputType]

_validate_and_get_input(input_object: str | T, expected_type: type[T], data_container: mleko.pipeline.data_container.DataContainer, is_optional: bool = False) T#

Validate and get the input from the data container or as a direct value.

If the input is a string, it is treated as a key to look up in the data container. If it is not a string, it is treated as the input value itself. The method validates the input type, and returns the input value if it is valid.

Warning

This method does not handle subscripted types, such as List[str]. It only handles simple types. If you need to validate a subscripted type, you should do so manually after retrieving the input value from this method.

Parameters:
  • input_object (str | T) – The input key or value to validate and retrieve.

  • expected_type (type[T]) – The expected type of the input value.

  • data_container (mleko.pipeline.data_container.DataContainer) – The data container containing the input data.

  • is_optional (bool) – Whether the input is optional.

Raises:

ValueError – If the input is invalid or not found in the data container.

Returns:

The input value if it is valid, or None if it is optional and not found in the data container.

Return type:

T

_validate_inputs() None#

Validates the step’s inputs using TypeDict models.

Raises:

ValueError – If the inputs are not a dictionary.

Return type:

None

_validate_outputs() None#

Validates the step’s outputs using TypeDict models.

Raises:

ValueError – If the outputs are not a dictionary.

Return type:

None