Source code for hypex.transformers.filters

from __future__ import annotations

from typing import Any, Sequence

from ..dataset.dataset import Dataset, ExperimentData
from ..dataset.roles import FeatureRole, InfoRole, PreTargetRole, TargetRole
from ..utils.adapter import Adapter
from .abstract import Transformer


[docs] class CVFilter(Transformer):
[docs] def __init__( self, target_roles: str | Sequence[str] | None = None, lower_bound: float | None = None, upper_bound: float | None = None, key: Any = "", ): """Initialize coefficient of variation filter of the columns in which it does not fit into the defined borders. Args: lower_bound: The minimum acceptable coefficient of variation below which we consider the column to be constant upper_bound: The maximum acceptable coefficient of variation above which we consider the to be incorrect """ super().__init__(key=key) self.target_roles = target_roles or FeatureRole() self.lower_bound = lower_bound self.upper_bound = upper_bound self.type_filter: bool = True
@property def search_types(self): return [float, int, bool] @staticmethod def _inner_function( data: Dataset, target_cols: str | None = None, lower_bound: float | None = None, upper_bound: float | None = None, ) -> Dataset: target_cols = Adapter.to_list(target_cols) for column in target_cols: cv = data[column].coefficient_of_variation() drop = False if (upper_bound and cv > upper_bound) or (lower_bound and cv < lower_bound): drop = True if drop: data.roles[column] = InfoRole() return data def execute(self, data: ExperimentData) -> ExperimentData: if self.type_filter: target_cols = data.ds.search_columns( roles=self.target_roles, search_types=self.search_types ) else: target_cols = data.ds.search_columns(roles=FeatureRole()) result = data.copy( data=self.calc( data=data.ds, target_cols=target_cols, lower_bound=self.lower_bound, upper_bound=self.upper_bound, ) ) return result
[docs] class ConstFilter(Transformer):
[docs] def __init__( self, target_roles: str | Sequence[str] | None = None, threshold: float = 0.95, key: Any = "", ): """Initialize constants filter of the values which occur more often than defined by threshold. Args: target: The column or columns to be filtered threshold: The maximum acceptable frequency above which we consider the column to be constant """ super().__init__(key=key) self.target_roles = target_roles or FeatureRole() self.threshold = threshold
@staticmethod def _inner_function( data: Dataset, target_cols: str | None = None, threshold: float = 0.95, ) -> Dataset: target_cols = Adapter.to_list(target_cols) for column in target_cols: value_counts = data[column].value_counts(normalize=True, sort=True) if value_counts.get_values(0, "proportion") > threshold: data.roles[column] = InfoRole() return data def execute(self, data: ExperimentData) -> ExperimentData: target_cols = data.ds.search_columns(roles=self.target_roles) result = data.copy( data=self.calc( data=data.ds, target_cols=target_cols, threshold=self.threshold ) ) return result
[docs] class NanFilter(Transformer):
[docs] def __init__( self, target_roles: str | Sequence[str] | None = None, threshold: float = 0.8, key: Any = "", ): """Initialize filter of the columns in which NaN occurs more often than defined by threshold. Args: target: The column or columns to be filtered threshold: The maximum acceptable frequency of NaN values in a column """ super().__init__(key=key) self.target_roles = target_roles or FeatureRole() self.threshold = threshold
@staticmethod def _inner_function( data: Dataset, target_cols: str | None = None, threshold: float = 0.8, ) -> Dataset: target_cols = Adapter.to_list(target_cols) for column in target_cols: nan_share = data[column].isna().sum() / len(data) if nan_share > threshold: data.roles[column] = InfoRole() return data def execute(self, data: ExperimentData) -> ExperimentData: target_cols = data.ds.search_columns(roles=self.target_roles) result = data.copy( data=self.calc( data=data.ds, target_cols=target_cols, threshold=self.threshold ) ) return result
[docs] class CorrFilter(Transformer): def __init__( self, target_roles: str | Sequence[str] | None = None, corr_space_roles: str | Sequence[str] | None = None, threshold: float = 0.8, method: str = "pearson", numeric_only: bool = True, key: Any = "", ): super().__init__(key=key) self.target_roles = target_roles or FeatureRole() self.corr_space_roles = corr_space_roles or [FeatureRole(), TargetRole()] self.threshold = threshold self.method = method self.numeric_only = numeric_only @staticmethod def _inner_function( data: Dataset, target_cols: str | None = None, corr_space_cols: str | None = None, threshold: float = 0.8, method: str = "pearson", numeric_only: bool = True, drop_policy: str = "cv", ) -> Dataset: target_cols = Adapter.to_list(target_cols) corr_space_cols = Adapter.to_list(corr_space_cols) corr_matrix = data[corr_space_cols].corr( method=method, numeric_only=numeric_only ) pre_target_column = None if drop_policy == "corr": pre_target_columns = data.search_columns([PreTargetRole()]) if (pre_target_columns[0] not in corr_space_cols) | len( pre_target_columns ) != 1: raise ValueError( "Correlation-based filtering cannot be applied if there are more than one PreTarget columns" ) else: pre_target_column = pre_target_columns[0] corr_target_cols = [ column for column in target_cols if column in corr_matrix.columns ] for target in corr_target_cols: for column in corr_matrix.columns: if (target != column) and ( abs(corr_matrix.get_values(row=target, column=column)) > threshold ): drop = target if data.roles[column] in corr_target_cols: if drop_policy == "corr": if abs( corr_matrix.get_values(target, pre_target_column) ) > abs(corr_matrix.get_values(column, pre_target_column)): drop = target else: drop = column elif drop_policy == "cv": drop = ( target if data[target].coefficient_of_variation() < data[column].coefficient_of_variation() else column ) data.roles[drop] = InfoRole() return data def execute(self, data: ExperimentData) -> ExperimentData: target_cols = data.ds.search_columns(roles=self.target_roles) corr_space_cols = data.ds.search_columns(roles=self.corr_space_roles) result = data.copy( data=self.calc( data=data.ds, target_cols=target_cols, corr_space_cols=corr_space_cols, threshold=self.threshold, method=self.method, numeric_only=self.numeric_only, ) ) return result
[docs] class OutliersFilter(Transformer):
[docs] def __init__( self, target_roles: str | Sequence[str] | None = None, lower_percentile: float = 0, upper_percentile: float = 1, key: Any = "", ): """Initialize outliers filter of the values laying beyond the given percentile and NaNs. Args: target: The name of target column to be filtered from outlier values percentile: The value of the percentile to filter outliers """ super().__init__(key=key) self.target_roles = target_roles or FeatureRole() self.lower_percentile = lower_percentile self.upper_percentile = upper_percentile
@property def search_types(self): return [float, int, bool] @staticmethod def _inner_function( data: Dataset, target_cols: str | None = None, lower_percentile: float = 0, upper_percentile: float = 1, ) -> Dataset: mask = data[target_cols].apply( func=lambda x: (x < x.quantile(lower_percentile)) | (x > x.quantile(upper_percentile)), role={column: InfoRole() for column in target_cols}, axis=0, ) mask = mask.apply(func=lambda x: x.any(), role={"filter": InfoRole()}, axis=1) drop_indexes = mask[mask["filter"]].dropna().index data = data.drop(drop_indexes, axis=0) return data def execute(self, data: ExperimentData) -> ExperimentData: target_cols = data.ds.search_columns( roles=self.target_roles, search_types=self.search_types, ) t_ds = self.calc( data=data.ds, target_cols=target_cols, lower_percentile=self.lower_percentile, upper_percentile=self.upper_percentile, ) result = data.copy(data=t_ds) result.additional_fields = result.additional_fields.filter(t_ds.index, axis=0) return result