from __future__ import annotations
import warnings
from collections.abc import Iterable
from copy import deepcopy
from typing import Any, Callable, Hashable, Literal, Sequence
import numpy as np
import pandas as pd # type: ignore
from numpy import ndarray
from ..utils import (
ID_SPLIT_SYMBOL,
BackendsEnum,
BackendTypeError,
ConcatBackendError,
ConcatDataError,
DataTypeError,
ExperimentDataEnum,
FromDictTypes,
MultiFieldKeyTypes,
NotFoundInExperimentDataError,
ScalarType,
)
from ..utils.adapter import Adapter
from ..utils.errors import InvalidArgumentError
from .abstract import DatasetBase
from .roles import (
ABCRole,
AdditionalRole,
DefaultRole,
FilterRole,
InfoRole,
StatisticRole,
)
[docs]
class Dataset(DatasetBase):
class Locker:
def __init__(self, backend, roles):
self.backend = backend
self.roles = roles
def __getitem__(self, item) -> Dataset:
t_data = self.backend.loc(item)
return Dataset(
data=t_data,
roles={k: v for k, v in self.roles.items() if k in t_data.columns},
)
def __setitem__(self, item, value):
column_name = item[1]
column_data_type = self.roles[column_name].data_type
if (
column_data_type is None
or (
isinstance(value, Iterable)
and all(isinstance(v, column_data_type) for v in value)
)
or isinstance(value, column_data_type)
):
if column_name not in self.backend.data.columns:
raise KeyError("Column must be added by using add_column method.")
else:
self.backend.data.loc[item] = value
else:
raise TypeError("Value type does not match the expected data type.")
class ILocker:
def __init__(self, backend, roles):
self.backend = backend
self.roles = roles
def __getitem__(self, item) -> Dataset:
t_data = self.backend.iloc(item)
return Dataset(
data=t_data,
roles={k: v for k, v in self.roles.items() if k in t_data.columns},
)
def __setitem__(self, item, value):
column_index = item[1]
column_name = self.backend.data.columns[column_index]
column_data_type = self.roles[column_name].data_type
if (
column_data_type is None
or (
isinstance(value, Iterable)
and all(isinstance(v, column_data_type) for v in value)
) # check for backend specific list (?)
or isinstance(value, column_data_type)
):
if column_index >= len(self.backend.data.columns):
raise IndexError("Column must be added by using add_column method.")
else:
self.backend.data.iloc[item] = value
else:
raise TypeError("Value type does not match the expected data type.")
def __init__(
self,
roles: dict[ABCRole, list[str] | str] | dict[str, ABCRole],
data: pd.DataFrame | str | None = None,
backend: BackendsEnum | None = None,
default_role: ABCRole | None = None,
):
super().__init__(roles, data, backend, default_role)
self.loc = self.Locker(self._backend, self.roles)
self.iloc = self.ILocker(self._backend, self.roles)
def __getitem__(self, item: Iterable | str | int) -> Dataset:
if isinstance(item, Dataset):
item = item.data
items = (
[item] if isinstance(item, str) or not isinstance(item, Iterable) else item
)
roles: dict = {
column: (
self.roles[column]
if column in self.columns and self.roles.get(column, False)
else InfoRole()
)
for column in items
}
result = Dataset(data=self._backend.__getitem__(item), roles=roles)
result.tmp_roles = {
key: value for key, value in self.tmp_roles.items() if key in items
}
return result
def __setitem__(self, key: str, value: Any):
if isinstance(value, Dataset):
value = value.data.iloc[:, 0]
if key not in self.columns and isinstance(key, str):
self.add_column(value, {key: InfoRole()})
warnings.warn(
"Column must be added by using add_column method.",
category=SyntaxWarning,
)
self.data[key] = value
else:
column_data_type = self.roles[key].data_type
if (
column_data_type is None
or (
isinstance(value, Iterable)
and all(isinstance(v, column_data_type) for v in value)
) # check for backend specific list (?)
or isinstance(value, column_data_type)
):
self.data[key] = value
else:
raise TypeError("Value type does not match the expected data type.")
def __binary_magic_operator(self, other, func_name: str) -> Any:
if not any(
isinstance(other, t) for t in [Dataset, str, int, float, bool, Sequence]
):
raise DataTypeError(type(other))
func = getattr(self._backend, func_name)
t_roles = deepcopy(self.roles)
for role in t_roles.values():
role.data_type = None
if isinstance(other, Dataset):
if type(other._backend) is not type(self._backend):
raise BackendTypeError(type(other._backend), type(self._backend))
other = other.rename(
{
other.columns[i]: self.data.columns[i]
for i in range(len(other.columns))
}
).backend
return Dataset(roles=t_roles, data=func(other))
# comparison operators:
def __eq__(self, other):
return self.__binary_magic_operator(other=other, func_name="__eq__")
def __ne__(self, other):
return self.__binary_magic_operator(other=other, func_name="__ne__")
def __le__(self, other):
return self.__binary_magic_operator(other=other, func_name="__le__")
def __lt__(self, other):
return self.__binary_magic_operator(other=other, func_name="__lt__")
def __ge__(self, other):
return self.__binary_magic_operator(other=other, func_name="__ge__")
def __gt__(self, other):
return self.__binary_magic_operator(other=other, func_name="__gt__")
# unary operators:
def __pos__(self):
return Dataset(roles=self.roles, data=(+self._backend))
def __neg__(self):
return Dataset(roles=self.roles, data=(-self._backend))
def __abs__(self):
return Dataset(roles=self.roles, data=abs(self._backend))
def __invert__(self):
return Dataset(roles=self.roles, data=(~self._backend))
def __round__(self, ndigits: int = 0):
return Dataset(roles=self.roles, data=round(self._backend, ndigits))
def __bool__(self):
return not self._backend.is_empty()
# Binary math operators:
def __add__(self, other):
return self.__binary_magic_operator(other=other, func_name="__add__")
def __sub__(self, other):
return self.__binary_magic_operator(other=other, func_name="__sub__")
def __mul__(self, other):
return self.__binary_magic_operator(other=other, func_name="__mul__")
def __floordiv__(self, other):
return self.__binary_magic_operator(other=other, func_name="__floordiv__")
def __div__(self, other):
return self.__binary_magic_operator(other=other, func_name="__div__")
def __truediv__(self, other):
return self.__binary_magic_operator(other=other, func_name="__truediv__")
def __mod__(self, other):
return self.__binary_magic_operator(other=other, func_name="__mod__")
def __pow__(self, other):
return self.__binary_magic_operator(other=other, func_name="__pow__")
def __and__(self, other):
return self.__binary_magic_operator(other=other, func_name="__and__")
def __or__(self, other):
return self.__binary_magic_operator(other=other, func_name="__or__")
# Right math operators:
def __radd__(self, other):
return self.__binary_magic_operator(other=other, func_name="__radd__")
def __rsub__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rsub__")
def __rmul__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rmul__")
def __rfloordiv__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rfloordiv__")
def __rdiv__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rdiv__")
def __rtruediv__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rtruediv__")
def __rmod__(self, other):
return self.__binary_magic_operator(other=other, func_name="__rmod__")
def __rpow__(self, other) -> Any:
return self.__binary_magic_operator(other=other, func_name="__rpow__")
@property
def index(self):
return self.backend.index
@index.setter
def index(self, value):
self.backend.data.index = value
@property
def data(self):
return self._backend.data
@data.setter
def data(self, value):
self.backend.data = value
@property
def columns(self):
return self.backend.columns
@staticmethod
def create_empty(roles=None, index=None, backend=BackendsEnum.pandas) -> Dataset:
if roles is None:
roles = {}
index = [] if index is None else index
columns = list(roles.keys())
ds = Dataset(roles=roles, backend=backend)
ds._backend = ds._backend.create_empty(index, columns)
ds.data = ds.backend.data
return ds
def _convert_data_after_agg(self, result) -> Dataset | float:
if isinstance(result, float):
return result
role: ABCRole = StatisticRole()
return Dataset(data=result, roles={column: role for column in result.columns})
def get(
self,
key,
default=None,
) -> Dataset:
return Dataset(data=self._backend.get(key, default), roles=deepcopy(self.roles))
def take(
self,
indices: int | list[int],
axis: Literal["index", "columns", "rows"] | int = 0,
) -> Dataset:
new_data = self._backend.take(indices=indices, axis=axis)
new_roles = (
{k: deepcopy(v) for k, v in self.roles.items() if k in new_data.columns}
if axis == 1
else deepcopy(self.roles)
)
return Dataset(data=new_data, roles=new_roles)
def add_column(
self,
data,
role: dict[str, ABCRole] | None = None,
index: Iterable[Hashable] | None = None,
):
if role is None:
if not isinstance(data, Dataset):
raise ValueError("If role is None, data must be a Dataset")
if any([col in self.columns for col in data.columns]):
raise ValueError("Columns with the same name already exist")
self.roles.update(data.roles)
self._backend.add_column(
data.data,
data.columns,
index,
)
else:
if any([col in self.columns for col in role.keys()]):
raise ValueError("Columns with the same name already exist")
if isinstance(role, dict) and any(
[not isinstance(r, ABCRole) for r in role.values()]
):
raise TypeError("Role values must be of type ABCRole")
if isinstance(data, Dataset):
data = data.data
self.roles.update(role)
self._backend.add_column(data, list(role.keys()), index)
return self
def _check_other_dataset(self, other):
if not isinstance(other, Dataset):
raise ConcatDataError(type(other))
if type(other._backend) is not type(self._backend):
raise ConcatBackendError(type(other._backend), type(self._backend))
def astype(
self, dtype: dict[str, type], errors: Literal["raise", "ignore"] = "raise"
) -> Dataset:
for col, _ in dtype.items():
if (errors == "raise") and (col not in self.columns):
raise KeyError(f"Column '{col}' does not exist in the Dataset.")
new_backend = deepcopy(self._backend)
new_backend.data = new_backend.astype(dtype, errors)
new_roles = deepcopy(self.roles)
if errors == "ignore":
for col, target_type in dtype.items():
if new_backend.get_column_type(col) == target_type:
new_roles[col].data_type = target_type
elif errors == "raise":
for col, target_type in dtype.items():
new_roles[col].data_type = target_type
return Dataset(roles=new_roles, data=new_backend.data)
def append(self, other, reset_index=False, axis=0) -> Dataset:
other = Adapter.to_list(other)
new_roles = deepcopy(self.roles)
for o in other:
self._check_other_dataset(o)
new_roles.update(o.roles)
return Dataset(
roles=new_roles, data=self.backend.append(other, reset_index, axis)
)
# TODO: set backend by backend object
@staticmethod
def from_dict(
data: FromDictTypes,
roles: dict[ABCRole, list[str] | str] | dict[str, ABCRole],
backend: BackendsEnum = BackendsEnum.pandas,
index=None,
) -> Dataset:
ds = Dataset(roles=roles, backend=backend)
# if all([isinstance(v, Dataset) for v in data.values()]):
# ds._backend = ds._backend.from_dict({k: v.data for k, v in data.items()}, data, index)
# else:
ds._backend = ds._backend.from_dict(data, index)
ds.data = ds._backend.data
return ds
# What is going to happen when a matrix is returned?
def apply(
self,
func: Callable,
role: dict[str, ABCRole],
axis: int = 0,
**kwargs,
) -> Dataset:
if self.is_empty():
return deepcopy(self)
tmp_data = self._backend.apply(
func=func, axis=axis, column_name=next(iter(role.keys())), **kwargs
)
tmp_roles = (
{next(iter(role.keys())): next(iter(role.values()))}
if ((not tmp_data.any().any()) and len(role) > 1)
else role
)
return Dataset(
data=tmp_data,
roles=tmp_roles,
)
def map(self, func, na_action=None, **kwargs) -> Dataset:
return Dataset(
roles=self.roles,
data=self._backend.map(func=func, na_action=na_action, **kwargs),
)
def is_empty(self) -> bool:
return self._backend.is_empty()
def unique(self) -> dict[str, list[Any]]:
return self._backend.unique()
def nunique(self, dropna: bool = False) -> dict[str, int]:
return self._backend.nunique(dropna)
def isin(self, values: Iterable) -> Dataset:
role: ABCRole = FilterRole()
return Dataset(
roles={column: role for column in self.roles.keys()},
data=self._backend.isin(values),
)
def groupby(
self,
by: Any,
func: str | list | None = None,
fields_list: str | list | None = None,
reset_index: bool = True,
**kwargs,
) -> list[tuple[str, Dataset]]:
if isinstance(by, Dataset) and len(by.columns) == 1:
# if reset_index:
# self.data = self.data.reset_index(drop=True)
datasets = [
(group, Dataset(roles=self.roles, data=self.data.loc[group_data.index]))
for group, group_data in by._backend.groupby(by=by.columns[0], **kwargs)
]
else:
datasets = [
(group, Dataset(roles=self.roles, data=data))
for group, data in self._backend.groupby(by=by, **kwargs)
]
if fields_list:
fields_list = Adapter.to_list(fields_list)
datasets = [(i, data[fields_list]) for i, data in datasets]
if func:
datasets = [(i, data.agg(func)) for i, data in datasets]
for dataset in datasets:
if isinstance(dataset, Dataset):
dataset[1].tmp_roles = self.tmp_roles
return datasets
def sort(
self,
by: MultiFieldKeyTypes | None = None,
ascending: bool = True,
**kwargs,
):
if by is None:
return Dataset(
roles=self.roles,
data=self.backend.sort_index(ascending=ascending, **kwargs),
)
return Dataset(
roles=self.roles,
data=self.backend.sort_values(by=by, ascending=ascending, **kwargs),
)
def fillna(
self,
values: ScalarType | dict[str, ScalarType] | None = None,
method: Literal["bfill", "ffill"] | None = None,
**kwargs,
):
if values is None and method is None:
raise ValueError("Value or filling method must be provided")
return Dataset(
roles=self.roles,
data=self.backend.fillna(values=values, method=method, **kwargs),
)
def mean(self):
return self._convert_data_after_agg(self._backend.mean())
def max(self):
return self._convert_data_after_agg(self._backend.max())
def reindex(self, labels, fill_value: Any | None = None) -> Dataset:
return Dataset(
self.roles, data=self.backend.reindex(labels, fill_value=fill_value)
)
def idxmax(self):
return self._convert_data_after_agg(self._backend.idxmax())
def min(self):
return self._convert_data_after_agg(self._backend.min())
def count(self):
if self.is_empty():
return Dataset.create_empty({role: InfoRole() for role in self.roles})
return self._convert_data_after_agg(self._backend.count())
def sum(self):
return self._convert_data_after_agg(self._backend.sum())
def log(self):
return self._convert_data_after_agg(self._backend.log())
def mode(self, numeric_only: bool = False, dropna: bool = True):
t_data = self._backend.mode(numeric_only=numeric_only, dropna=dropna)
return Dataset(data=t_data, roles={role: InfoRole() for role in t_data.columns})
def var(self, skipna: bool = True, ddof: int = 1, numeric_only: bool = False):
return self._convert_data_after_agg(
self._backend.var(skipna=skipna, ddof=ddof, numeric_only=numeric_only)
)
def agg(self, func: str | list):
return self._convert_data_after_agg(self._backend.agg(func))
def std(self, skipna: bool = True, ddof: int = 1):
return self._convert_data_after_agg(self._backend.std(skipna=skipna, ddof=ddof))
def quantile(self, q: float = 0.5):
return self._convert_data_after_agg(self._backend.quantile(q=q))
def coefficient_of_variation(self):
return self._convert_data_after_agg(self._backend.coefficient_of_variation())
def corr(self, method="pearson", numeric_only=False):
t_data = self._backend.corr(method=method, numeric_only=numeric_only)
t_roles = {column: self.roles[column] for column in t_data.columns}
return Dataset(roles=t_roles, data=t_data)
def value_counts(
self,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
dropna: bool = True,
):
t_data = self._backend.value_counts(
normalize=normalize, sort=sort, ascending=ascending, dropna=dropna
)
t_roles = deepcopy(self.roles)
column_name = "proportion" if normalize else "count"
if column_name not in t_data:
t_data = t_data.rename(columns={0: column_name})
t_roles[column_name] = StatisticRole()
return Dataset(roles=t_roles, data=t_data)
[docs]
def na_counts(self):
"""Count NA values"""
return self._convert_data_after_agg(self._backend.na_counts())
def dropna(
self,
how: Literal["any", "all"] = "any",
subset: str | Iterable[str] | None = None,
axis: Literal["index", "rows", "columns"] | int = 0,
):
# Drop NA values using backend implementation
new_data = self._backend.dropna(how=how, subset=subset, axis=axis)
# Update roles based on axis - keep all roles for row drops, filter for column drops
new_roles = (
self.roles
if axis == 0
else {column: self.roles[column] for column in new_data.columns}
)
# Return new dataset with updated data and roles
return Dataset(
roles=new_roles,
data=new_data,
)
def isna(self):
return self._convert_data_after_agg(self._backend.isna())
def select_dtypes(self, include: Any = None, exclude: Any = None):
# Filter data by dtypes
t_data = self._backend.select_dtypes(include=include, exclude=exclude)
# Keep only roles for remaining columns
t_roles = {k: v for k, v in self.roles.items() if k in t_data.columns}
return Dataset(roles=t_roles, data=t_data)
def merge(
self,
right,
on: str | None = None,
left_on: str | None = None,
right_on: str | None = None,
left_index: bool = False,
right_index: bool = False,
suffixes: tuple[str, str] = ("_x", "_y"),
how: Literal["left", "right", "outer", "inner", "cross"] = "inner",
):
# Default to index merge if no columns specified
if not any([on, left_on, right_on, left_index, right_index]):
left_index = True
right_index = True
# Validate input types
if not isinstance(right, Dataset):
raise DataTypeError(type(right))
if type(right._backend) is not type(self._backend):
raise BackendTypeError(type(right._backend), type(self._backend))
# Perform merge operation
t_data = self._backend.merge(
right=right._backend,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
suffixes=suffixes,
how=how,
)
# Combine roles from both datasets
t_roles = deepcopy(self.roles)
t_roles.update(right.roles)
# Handle suffixed column roles
for c in t_data.columns:
if f"{c}".endswith(suffixes[0]) and c[: -len(suffixes[0])] in self.columns:
t_roles[c] = self.roles[c[: -len(suffixes[0])]]
if f"{c}".endswith(suffixes[1]) and c[: -len(suffixes[1])] in right.columns:
t_roles[c] = right.roles[c[: -len(suffixes[1])]]
# Create final roles dict with only merged columns
new_roles = {c: t_roles[c] for c in t_data.columns}
return Dataset(roles=new_roles, data=t_data)
def drop(
self,
labels: str | None = None,
axis: int | None = None,
columns: str | Iterable[str] | None = None,
):
# Convert Dataset labels to list of indices
if isinstance(labels, Dataset):
labels = list(labels.index)
# Drop specified labels
t_data = self._backend.drop(labels=labels, axis=axis, columns=columns)
# Update roles based on axis
t_roles = (
deepcopy(self.roles)
if axis == 0
else {c: self.roles[c] for c in t_data.columns}
)
return Dataset(roles=t_roles, data=t_data)
def filter(
self,
items: list | None = None,
like: str | None = None,
regex: str | None = None,
axis: int | None = None,
) -> Dataset:
t_data = self._backend.filter(items=items, like=like, regex=regex, axis=axis)
t_roles = {c: self.roles[c] for c in t_data.columns if c in self.roles.keys()}
return Dataset(roles=t_roles, data=t_data)
def dot(self, other: Dataset | ndarray) -> Dataset:
return Dataset(
roles=deepcopy(other.roles) if isinstance(other, Dataset) else {},
data=self.backend.dot(
other.backend if isinstance(other, Dataset) else other
),
)
def transpose(
self,
roles: dict[str, ABCRole] | list[str] | None = None,
) -> Dataset:
# Get role names if provided
roles_names: list[str | None] = (
list(roles.keys()) or [] if isinstance(roles, dict) else roles
)
# Transpose data
result_data = self.backend.transpose(roles_names)
# Create default roles if none provided
if roles is None or isinstance(roles, list):
names = result_data.columns if roles is None else roles
roles = {column: DefaultRole() for column in names}
return Dataset(roles=roles, data=result_data)
def sample(
self,
frac: float | None = None,
n: int | None = None,
random_state: int | None = None,
) -> Dataset:
return Dataset(
self.roles,
data=self.backend.sample(frac=frac, n=n, random_state=random_state),
)
def cov(self):
t_data = self.backend.cov()
return Dataset(
{column: DefaultRole() for column in t_data.columns}, data=t_data
)
def rename(self, names: dict[str, str]):
roles = {names.get(column, column): role for column, role in self.roles.items()}
return Dataset(roles, data=self.backend.rename(names))
def replace(
self,
to_replace: Any = None,
value: Any = None,
regex: bool = False,
) -> Dataset:
return Dataset(
self.roles,
data=self._backend.replace(to_replace=to_replace, value=value, regex=regex),
)
def list_to_columns(self, column: str) -> Dataset:
if not pd.api.types.is_list_like(self.backend[column][0]):
return self
extended_data = self.backend.list_to_columns(column)
extended_roles = {
c: deepcopy(self.roles[column]) for c in extended_data.columns
}
extended_ds = Dataset(roles=extended_roles, data=extended_data)
return self.append(extended_ds, axis=1).drop(column, axis=1)
[docs]
class ExperimentData:
def __init__(self, data: Dataset):
self._data = data
self.additional_fields = Dataset.create_empty(index=data.index)
self.variables: dict[str, dict[str, int | float]] = {}
self.groups: dict[str, dict[str, Dataset]] = {}
self.analysis_tables: dict[str, Dataset] = {}
self.id_name_mapping: dict[str, str] = {}
@property
def ds(self):
"""
Get the base dataset.
"""
return self._data
@staticmethod
def create_empty(
roles=None, backend=BackendsEnum.pandas, index=None
) -> ExperimentData:
ds = Dataset.create_empty(backend, roles, index)
return ExperimentData(ds)
def check_hash(self, executor_id: int, space: ExperimentDataEnum) -> bool:
if space == ExperimentDataEnum.additional_fields:
return executor_id in self.additional_fields.columns
elif space == ExperimentDataEnum.variables:
return executor_id in self.variables.keys()
elif space == ExperimentDataEnum.analysis_tables:
return executor_id in self.analysis_tables
else:
return any(self.check_hash(executor_id, s) for s in ExperimentDataEnum)
def set_value(
self,
space: ExperimentDataEnum,
executor_id: str | dict[str, str],
value: Any,
key: str | None = None,
role=None,
) -> ExperimentData:
# Handle additional fields
if space == ExperimentDataEnum.additional_fields:
if not isinstance(value, Dataset):
self.additional_fields = self.additional_fields.add_column(
data=value, role={executor_id: role}
)
elif len(value.columns) == 1:
role = role[0] if isinstance(role, list) else role
role = next(iter(role.values())) if isinstance(role, dict) else role
executor_id = (
executor_id[0] if isinstance(executor_id, list) else executor_id
)
executor_id = (
next(iter(executor_id.keys()))
if isinstance(executor_id, dict)
else executor_id
)
self.additional_fields = self.additional_fields.add_column(
data=value, role={executor_id: role}
)
else:
rename_dict = (
{value.columns[0]: executor_id}
if isinstance(executor_id, str)
else executor_id
)
value = value.rename(names=rename_dict)
self.additional_fields = self.additional_fields.merge(
right=value, left_index=True, right_index=True
)
# Handle analysis tables
elif space == ExperimentDataEnum.analysis_tables:
self.analysis_tables[executor_id] = value
# Handle variables
elif space == ExperimentDataEnum.variables:
if executor_id in self.variables:
self.variables[executor_id][key] = value
elif isinstance(value, dict):
self.variables[executor_id] = value
else:
self.variables[executor_id] = {key: value}
# Handle groups
elif space == ExperimentDataEnum.groups:
if executor_id not in self.groups:
self.groups[executor_id] = {key: value}
else:
self.groups[executor_id][key] = value
return self
def get_ids(
self,
classes: type | Iterable[type] | str | Iterable[str],
searched_space: ExperimentDataEnum | Iterable[ExperimentDataEnum] | None = None,
key: str | None = None,
) -> dict[str, dict[str, list[str]]]:
def check_id(id_: str, class_: str) -> bool:
result = id_[: id_.find(ID_SPLIT_SYMBOL)] == class_
if result and key is not None:
result = id_[id_.rfind(ID_SPLIT_SYMBOL) + 1 :] == key
return result
# Define spaces to search
spaces = {
ExperimentDataEnum.additional_fields: self.additional_fields.columns,
ExperimentDataEnum.analysis_tables: self.analysis_tables.keys(),
ExperimentDataEnum.groups: self.groups.keys(),
ExperimentDataEnum.variables: self.variables.keys(),
}
# Convert classes to names
classes = [
c.__name__ if isinstance(c, type) else c for c in Adapter.to_list(classes)
]
# Get spaces to search
searched_space = (
Adapter.to_list(searched_space) if searched_space else list(spaces.keys())
)
# Return matching IDs
return {
class_: {
space.value: [
str(id_) for id_ in spaces[space] if check_id(id_, class_)
]
for space in searched_space
}
for class_ in classes
}
def get_one_id(
self,
class_: type | str,
space: ExperimentDataEnum,
key: str | None = None,
) -> str:
class_ = class_ if isinstance(class_, str) else class_.__name__
result = self.get_ids(class_, space, key)
if (class_ not in result) or (not len(result[class_][space.value])):
raise NotFoundInExperimentDataError(class_)
return result[class_][space.value][0]
def copy(self, data: Dataset | None = None) -> ExperimentData:
result = deepcopy(self)
if data is not None:
result._data = data
return result
def field_search(
self,
roles: ABCRole | Iterable[ABCRole],
tmp_role: bool = False,
search_types=None,
) -> list[str]:
searched_field = []
roles = Adapter.to_list(roles)
# Split roles by type
field_in_additional = [
role for role in roles if isinstance(role, AdditionalRole)
]
field_in_data = [role for role in roles if role not in field_in_additional]
# Search in main data
if field_in_data:
searched_field += self.ds.search_columns(
field_in_data, tmp_role=tmp_role, search_types=search_types
)
# Search in additional fields
if field_in_additional and isinstance(self, ExperimentData):
searched_field += self.additional_fields.search_columns(
field_in_additional, tmp_role=tmp_role, search_types=search_types
)
return searched_field
def field_data_search(
self,
roles: ABCRole | Iterable[ABCRole],
tmp_role: bool = False,
search_types=None,
) -> Dataset:
searched_data: Dataset = Dataset.create_empty()
roles = Adapter.to_list(roles)
# Map roles to columns
roles_columns_map = {
role: self.field_search(role, tmp_role, search_types) for role in roles
}
# Build dataset from matching columns
for role, columns in roles_columns_map.items():
for column in columns:
t_data = (
self.additional_fields[column]
if isinstance(role, AdditionalRole)
else self.ds[column]
)
searched_data = searched_data.add_column(
data=t_data, role={column: role}
)
if not searched_data.is_empty():
searched_data.index = self.ds.index
return searched_data
class DatasetAdapter(Adapter):
@staticmethod
def to_dataset(
data: dict | Dataset | pd.DataFrame | list | str | int | float | bool,
roles: ABCRole | dict[str, ABCRole],
) -> Dataset:
# Convert data based on its type
if isinstance(data, dict):
return DatasetAdapter.dict_to_dataset(data, roles)
elif isinstance(data, pd.DataFrame):
if isinstance(roles, ABCRole):
raise InvalidArgumentError("roles", "dict[str, ABCRole]")
return DatasetAdapter.frame_to_dataset(data, roles)
elif isinstance(data, list):
if isinstance(roles, ABCRole):
raise InvalidArgumentError("roles", "dict[str, ABCRole]")
return DatasetAdapter.list_to_dataset(data, roles)
elif isinstance(data, np.ndarray):
return DatasetAdapter.ndarray_to_dataset(data, roles)
elif any(isinstance(data, t) for t in [str, int, float, bool]):
return DatasetAdapter.value_to_dataset(data, roles)
elif isinstance(data, Dataset):
return data
else:
raise InvalidArgumentError("data", "dict, pd.DataFrame, list, Dataset")
@staticmethod
def value_to_dataset(
data: ScalarType, roles: ABCRole | dict[str, ABCRole]
) -> Dataset:
if isinstance(roles, ABCRole):
roles = {"value": roles}
return Dataset(
roles=roles, data=pd.DataFrame({next(iter(roles.keys())): [data]})
)
@staticmethod
def dict_to_dataset(data: dict, roles: ABCRole | dict[str, ABCRole]) -> Dataset:
roles_names = list(data.keys())
if any(
[
any(isinstance(i, t) for t in [int, str, float, bool])
for i in list(data.values())
]
):
data = [data]
if isinstance(roles, dict):
return Dataset.from_dict(data=data, roles=roles)
elif isinstance(roles, ABCRole):
return Dataset.from_dict(
data=data, roles={name: roles for name in roles_names}
)
@staticmethod
def list_to_dataset(data: list, roles: dict[str, ABCRole]) -> Dataset:
return Dataset(
roles=roles if len(roles) > 0 else {0: DefaultRole()},
data=pd.DataFrame(
data=data, columns=[next(iter(roles.keys()))] if len(roles) > 0 else [0]
),
)
@staticmethod
def frame_to_dataset(data: pd.DataFrame, roles: dict[str, ABCRole]) -> Dataset:
return Dataset(
roles=roles,
data=data,
)
@staticmethod
def ndarray_to_dataset(data: np.ndarray, roles: dict[str, ABCRole]) -> Dataset:
columns = range(data.shape[1]) if len(roles) == 0 else list(roles.keys())
data = pd.DataFrame(data=data, columns=columns)
return Dataset(
roles=roles,
data=data,
)