mlm_insights.core.data_sources package

Submodules

mlm_insights.core.data_sources.data_source module

class mlm_insights.core.data_sources.data_source.DataSource(type: str, **kwargs: Any)

Bases: ABC

This interface is responsible for encapsulating the file_path, file type. It can be used to implement special functionality that allow for taking parameters and forming a list of file paths to be read by the readers.

For example: if current date needs to be used for reading a specific folder with today’s date, a data source can be used for this purpose.

It is an optional component to implement and use. It can be omitted if the customer explicitly passes the file paths/glob expressions to be read by the readers.

fetch(filename: str, **kwargs: Any) Any

This method is responsible for fetching the contents of the file for the underlying data source

Parameters

filename:

The canonical file path for which the client has to fetch the raw content

kwargs:

Extra keyword arguments.

Returns

Any:

The raw content of the file in the accepted format by underlying engine read method by default returns the file path

get_client(**kwargs: Any) Any

Parameters

kwargs:

Extra keyword arguments.

Returns

Any:

the underlying authenticated client if any

abstract get_data_location(**kwargs: Any) List[str]

This method is responsible for returning the list of files for the underlying datasource path

Parameters

kwargs:

Extra keyword arguments.

Returns

List of file Paths

mlm_insights.core.data_sources.local_date_prefix_data_source module

class mlm_insights.core.data_sources.local_date_prefix_data_source.LocalDatePrefixDataSource(base_location: str, file_type: str, offset: int = -1, date_range: Dict[Any, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range. These set of locations are passed to the reader for reading.

User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.

Configuration

base_location: str

The prefix to the folder location

file_type: str

File format for the input data files. eg. csv, jsonl etc.

date_range: Dict[str, str]
  • Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg. {'start': '2023-03-18', 'end': '2023-03-19'}

  • Either date range or offset needs to be provided by the user

offset: int, default=-1

No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2

Returns

List[str]:

List of file locations

Example code

# For using date_range
data = {
    "file_type": "csv",
    "date_range": {"start": "2023-03-18", "end": "2023-03-19"}
}
ds = LocalDatePrefixDataSource(base_location, **data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 2 data locations ['<base_location>/2023-03-18/*.csv', '<base_location>/2023-03-19/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations

# For using offset
data = {
    "file_type": "csv",
    "offset": 1
}
ds = LocalDatePrefixDataSource(base_location, **data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 1 data location, given today's date is 2023-03-19: ['<base_location>/2023-03-18/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations
get_data_location(**kwargs: Any) List[str]

Returns

List[str]:

List of files present in the data location

Raises

DataSourceException :

Exception if the list of files returned is empty.

Notes

Data source returns list of glob expressions

mlm_insights.core.data_sources.local_date_prefix_data_source.validate(base_location: str, offset: int, date_range: Dict[Any, Any], file_type: str) None

mlm_insights.core.data_sources.local_file_data_source module

class mlm_insights.core.data_sources.local_file_data_source.LocalFileDataSource(file_path: List[str] | str = '', **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a simple file path string or list of strings or a glob string

Configuration

file_path: Union[List[str], str]

A simple file path string / list of string / glob string

Returns

List[str]:

List of files present on the file path in the local system

Example code

ds = LocalFileDataSource(file_path = 'location/csv/*.csv')
csv_reader = CSVDaskDataReader(data_source=ds)
# Data source will return a list of csv files within the folder location/csv/

actual_df = csv_reader.read(None)  # Reads all the files returned by the LocalFileDataSource
get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List of files present on the file path in the local system

Raises

DataSourceException :

Exception if the file list is empty

mlm_insights.core.data_sources.oci_date_prefix_data_source module

class mlm_insights.core.data_sources.oci_date_prefix_data_source.OCIDatePrefixDataSource(bucket_name: str, namespace: str, file_type: str, object_prefix: str, offset: int = -1, date_range: Dict[Any, Any] = {}, storage_options: Dict[str, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range from OCI Object storage. These set of locations are passed to the reader for reading

User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.

Configuration

bucket_name: str

Name of the bucket

namespace: str

oci cloud namespace of the bucket location

object_prefix: str

folder path of the data relative to the bucket location, cannot be empty

file_type: str

File format for the input data files. eg. csv, jsonl etc.

date_range: Dict[str, str]
  • Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg. {'start': '2023-03-18', 'end': '2023-03-19'}

  • Either date range or offset needs to be provided by the user

offset: int, default=-1

No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2

storage_options: Dict[str, Any]

storage options are the authentication provided to the underlying ocifs client

Returns

List[str]:

List of OCI Object storage file locations

Example code

# For using date_range
data = {
    "bucket_name": "mlm",
    "namespace": "mlm",
    "object_prefix": "mlm",
    "file_type": "csv",
    "date_range": {"start": "2023-03-18", "end": "2023-03-19"}
}
ds = OCIDatePrefixDataSource(**data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 2 data locations ['oci://mlm@mlm/mlm/2023-03-18/*.csv', 'oci://mlm@mlm/mlm/2023-03-19/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations

# For using offset
data = {
    "bucket_name": "mlm",
    "namespace": "mlm",
    "object_prefix": "mlm",
    "file_type": "csv",
    "offset": 1
}
ds = OCIDatePrefixDataSource(**data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 1 data location, given today's date is 2023-03-19: ['oci://mlm@mlm/mlm/2023-03-18/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations
get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List of files present in the Object storage data location :List[str]

Raises

DataSourceException

If the list of files returned is empty.

Notes

Data source returns list of glob expressions

mlm_insights.core.data_sources.oci_date_prefix_data_source.validate(bucket_name: str, namespace: str, object_prefix: str, offset: int, date_range: Dict[Any, Any], file_type: str) None

mlm_insights.core.data_sources.oci_object_storage_data_source module

class mlm_insights.core.data_sources.oci_object_storage_data_source.OCIObjectStorageDataSource(file_path: List[str] | str = '', storage_options: Dict[str, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on an OCI file path string or list of OCI file path strings or a glob string.

Configuration

file_path: Union[List[str], str]

A simple file path string / list of string / glob string

Returns

List[str]:

List of files present on the file path in the oci object system

Example code

ds = OCIObjectStorageDataSource(file_path = 'oci://location/csv/*.csv')
csv_reader = CSVDaskDataReader(data_source=ds)
# Data source will return a list of csv files within the OCI Object store location: oci://location/csv/

actual_df = csv_reader.read(None)  # Reads all the files returned by the OCIObjectStorageDataSource
get_client(**kwargs: Any) OCIFileSystem

Parameters

kwargs:

Extra keyword arguments

Returns

object_storage_client: ocifs.OCIFileSystem

Object store client

get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List[str]:

List of files present on the file path in the oci object system

Raises

DataSourceException: DataSourceException

Exception if the file list is empty

mlm_insights.core.data_sources.oci_object_storage_data_source.validate(file_path: List[str] | str) None