Source code for hypex.operators.operators

from __future__ import annotations

from copy import deepcopy
from typing import Any, Literal

import numpy as np

from ..dataset import (
    ABCRole,
    AdditionalMatchingRole,
    AdditionalTargetRole,
    Dataset,
    ExperimentData,
    FeatureRole,
    InfoRole,
    TargetRole,
)
from ..extensions.scipy_stats import NormCDF
from ..utils.enums import ExperimentDataEnum
from ..utils.errors import NoneArgumentError
from .abstract import GroupOperator


[docs] class SMD(GroupOperator): def execute(self, data: ExperimentData) -> ExperimentData: pass @classmethod def _inner_function( cls, data: Dataset, test_data: Dataset | None = None, **kwargs ) -> Any: test_data = cls._check_test_data(test_data=test_data) return (data.mean() + test_data.mean()) / data.std()
class MatchingMetrics(GroupOperator): def __init__( self, grouping_role: ABCRole | None = None, target_roles: ABCRole | list[ABCRole] | None = None, metric: Literal["auto", "atc", "att", "ate"] | None = None, n_neighbors: int = 1, key: Any = "", ): self.metric = metric or "auto" self.n_neighbors = n_neighbors self.__scaled_counts = {} target_roles = target_roles or TargetRole() super().__init__( grouping_role=grouping_role, target_roles=( target_roles if isinstance(target_roles, list) else [target_roles] ), key=key, ) def _calc_scaled_counts(self, matches, indexes, group): matches_counts = Dataset({}) matches_counts = matches_counts.add_column( indexes.index, {"indexes": InfoRole()} ) matches_counts = matches_counts.add_column([0], {"count": InfoRole(float)}) for col in matches.columns: v_counts = matches[col].value_counts() matches_counts = matches_counts.merge( v_counts, how="left", left_on="indexes", right_on=col, suffixes=(("", col)), ).drop(columns=col) matches_counts.index = indexes.index matches_counts = matches_counts.drop(columns="indexes").fillna(0) for col in matches_counts.columns: if col != "count": matches_counts["count"] += matches_counts[col] self.__scaled_counts[group] = matches_counts["count"] / self.n_neighbors @staticmethod def _calc_vars(value): var = 0 if value[value.columns[0]].isna().sum() > 0 else value.var() return value * 0 + var @staticmethod def _calc_se(var_c, var_t, scaled_counts, group=None): n_c, n_t = len(var_c), len(var_t) if group is not None: groups = list(scaled_counts.keys()) groups.remove(group) group_other = groups[0] weights_c = scaled_counts[group_other] * 0 + 1 weights_t = scaled_counts[group] * n_t / n_c else: n = n_c + n_t weights_c = (n_c / n) * (scaled_counts["test"] + 1) weights_t = (n_t / n) * (scaled_counts["control"] + 1) return np.sqrt( (weights_t**2 * var_t).sum() / n_t**2 + (weights_c**2 * var_c).sum() / n_c**2 ) @classmethod def _inner_function( cls, data: Dataset, test_data: Dataset | None = None, target_fields: list[str] | None = None, **kwargs, ) -> Any: if target_fields is None or test_data is None: raise NoneArgumentError( ["target_fields", "test_data"], "att, atc, ate estimation" ) metric = kwargs.get("metric", "ate") scaled_counts = kwargs.get("scaled_counts") itt = test_data[target_fields[0]] - test_data[target_fields[1]] itc = data[target_fields[1]] - data[target_fields[0]] bias = kwargs.get("bias", {}) if bias and len(bias) > 0: if metric in ["atc", "ate"]: itc -= Dataset.from_dict( {"test": bias["control"]}, roles={}, index=itc.index ) if metric in ["att", "ate"]: itt += Dataset.from_dict( {"control": bias["test"]}, roles={}, index=itt.index ) var_t = cls._calc_vars(itc) var_c = cls._calc_vars(itt) itt_se = cls._calc_se(var_c, var_t, scaled_counts, "control") itc_se = cls._calc_se(var_t, var_c, scaled_counts, "test") itt = itt.mean() itc = itc.mean() p_val_itt = ( NormCDF() .calc( Dataset.from_dict( {"value": [itt / itt_se]}, roles={"value": InfoRole()} ) ) .get_values()[0][0] ) p_val_itc = ( NormCDF() .calc( Dataset.from_dict( {"value": [itc / itc_se]}, roles={"value": InfoRole()} ) ) .get_values()[0][0] ) if metric == "atc": return { "ATC": [ itc, itc_se, p_val_itc, itc - 1.96 * itc_se, itc + 1.96 * itc_se, ] } if metric == "att": return { "ATT": [ itt, itt_se, p_val_itt, itt - 1.96 * itt_se, itt + 1.96 * itt_se, ] } len_control, len_test = len(data), len(test_data) ate = (itt * len_test + itc * len_control) / (len_test + len_control) ate_se = cls._calc_se(var_c, var_t, scaled_counts) p_val_ate = ( NormCDF() .calc( Dataset.from_dict( {"value": [ate / ate_se]}, roles={"value": InfoRole()} ) ) .get_values()[0][0] ) return { "ATT": [itt, itt_se, p_val_itt, itt - 1.96 * itt_se, itt + 1.96 * itt_se], "ATC": [itc, itc_se, p_val_itc, itc - 1.96 * itc_se, itc + 1.96 * itc_se], "ATE": [ate, ate_se, p_val_ate, ate - 1.96 * ate_se, ate + 1.96 * ate_se], } @classmethod def _execute_inner_function( cls, grouping_data, target_fields: list[str] | None = None, **kwargs ) -> dict: metric = kwargs.get("metric", "ate") if target_fields is None or len(target_fields) != 2: raise ValueError( f"This operator works with 2 targets, but got {len(target_fields) if target_fields else None}" ) return cls._inner_function( data=grouping_data[0][1], test_data=grouping_data[1][1], target_fields=target_fields, metric=metric, bias=kwargs.get("bias_estimation", None), scaled_counts=kwargs.get("scaled_counts"), ) def _prepare_new_target( self, data: ExperimentData, t_data: Dataset, group_field: str, ) -> Dataset: new_target = data.ds.search_columns(TargetRole())[0] indexes, matched_data = Bias.prepare_data(data, t_data) matched_data = matched_data[new_target + "_matched"] grouped_data = data.ds.groupby(group_field) control_indexes = indexes.loc[grouped_data[0][1].index, :] test_indexes = indexes.loc[grouped_data[1][1].index, :] self._calc_scaled_counts(control_indexes, test_indexes, "test") self._calc_scaled_counts(test_indexes, control_indexes, "control") return matched_data def execute(self, data: ExperimentData) -> ExperimentData: group_field, target_fields = self._get_fields(data=data) bias = ( data.variables[data.get_one_id(Bias, ExperimentDataEnum.variables)] if len( data.get_ids(Bias, ExperimentDataEnum.variables)["Bias"]["variables"] ) > 0 else None ) t_data = deepcopy(data.ds) if len(target_fields) != 2: matched_data = self._prepare_new_target(data, t_data, group_field) target_fields += [matched_data.search_columns(TargetRole())[0]] data.set_value( ExperimentDataEnum.additional_fields, self.id, matched_data, role=AdditionalTargetRole(), ) t_data = t_data.add_column( matched_data.reindex(t_data.index), role={target_fields[1]: TargetRole()}, ) self.key = str( target_fields[0] if len(target_fields) == 1 else (target_fields or "") ) if ( not target_fields and data.ds.tmp_roles ): # if the column is not suitable for the test, then the target will be empty, but if there is a role tempo, then this is normal behavior return data compare_result = self.calc( data=t_data, group_field=group_field, target_fields=target_fields, metric=self.metric, bias_estimation=bias, scaled_counts=self.__scaled_counts, ) return self._set_value(data, compare_result) class Bias(GroupOperator): def __init__( self, grouping_role: ABCRole | None = None, target_roles: list[ABCRole] | None = None, key: Any = "", ): super().__init__( grouping_role=grouping_role, target_roles=target_roles, key=key ) @staticmethod def calc_coefficients(X: Dataset, Y: Dataset) -> list[float]: X_l = Dataset.create_empty(roles={"temp": InfoRole()}, index=X.index).fillna(1) X = X_l.append(X, axis=1).data.values return np.linalg.lstsq(X, Y.data.values, rcond=-1)[0][1:] @staticmethod def calc_bias( X: Dataset, X_matched: Dataset, coefficients: list[float] ) -> list[float]: return [ (j - i).dot(coefficients)[0] for i, j in zip(X.data.values, X_matched.data.values) ] @classmethod def _inner_function( cls, data: Dataset, test_data: Dataset | None = None, target_fields: list[str] | None = None, features_fields: list[str] | None = None, **kwargs, ) -> dict: if target_fields is None or features_fields is None or test_data is None: raise NoneArgumentError( ["target_fields", "features_fields", "test_data"], "bias_estimation" ) if data[target_fields[1]].isna().sum() > 0: return { "test": cls.calc_bias( test_data[features_fields[: len(features_fields) // 2]], test_data[features_fields[len(features_fields) // 2 :]], cls.calc_coefficients( test_data[features_fields[len(features_fields) // 2 :]], test_data[target_fields[1]], ), ) } if test_data[target_fields[1]].isna().sum() > 0: return { "control": cls.calc_bias( data[features_fields[: len(features_fields) // 2]], data[features_fields[len(features_fields) // 2 :]], cls.calc_coefficients( data[features_fields[len(features_fields) // 2 :]], data[target_fields[1]], ), ) } return { "test": cls.calc_bias( test_data[features_fields[: len(features_fields) // 2]], test_data[features_fields[len(features_fields) // 2 :]], cls.calc_coefficients( test_data[features_fields[len(features_fields) // 2 :]], test_data[target_fields[1]], ), ), "control": cls.calc_bias( data[features_fields[: len(features_fields) // 2]], data[features_fields[len(features_fields) // 2 :]], cls.calc_coefficients( data[features_fields[len(features_fields) // 2 :]], data[target_fields[1]], ), ), } @classmethod def _execute_inner_function( cls, grouping_data, target_fields: list[str] | None = None, features_fields: list[str] | None = None, **kwargs, ) -> dict: return cls._inner_function( grouping_data[0][1], test_data=grouping_data[1][1], target_fields=target_fields, features_fields=features_fields, **kwargs, ) @staticmethod def prepare_data(data: ExperimentData, t_data: Dataset) -> Dataset: indexes = data.field_search(AdditionalMatchingRole()) if len(indexes) == 0: raise ValueError("No indexes were found") indexes = data.additional_fields[indexes] indexes.index = t_data.index filtered_field = indexes.drop( indexes[indexes[indexes.columns[0]] == -1], axis=0 ) matched_data = Dataset({}) matched_data.index = filtered_field.index numeric_cols = t_data.search_columns( [FeatureRole(), TargetRole()], search_types=[int, float] ) for d_col in numeric_cols: matched_data_col = Dataset({}) matched_data_col.index = filtered_field.index for i, i_col in enumerate(indexes.columns): index_matched_data = data.ds.loc[ list(filtered_field[i_col].get_values(column=i_col)) ][d_col].rename( {d_col: d_col + f"_matched_{i}" for _ in data.ds.columns} ) matched_data_col = matched_data_col.add_column(index_matched_data) default_value = [t_data.roles[d_col].data_type(0)] matched_data_col = matched_data_col.add_column( default_value * matched_data_col.shape[0], {d_col + "_matched": t_data.roles[d_col]}, ) for col in matched_data_col.columns: if col != d_col + "_matched": matched_data_col[d_col + "_matched"] += matched_data_col[col] matched_data = matched_data.add_column( default_value * matched_data_col.shape[0], {d_col + "_matched": t_data.roles[d_col]}, ) matched_data[d_col + "_matched"] = matched_data_col[d_col + "_matched"] / ( matched_data_col.shape[1] - 1 ) return indexes, matched_data def execute(self, data: ExperimentData) -> ExperimentData: group_field, target_fields = self._get_fields(data) t_data = deepcopy(data.ds) if len(target_fields) < 2: _, matched_data = self.prepare_data(data, t_data) target_fields += [matched_data.search_columns(TargetRole())[0]] t_data = t_data.append(matched_data.reindex(t_data.index), axis=1) self.key = str( target_fields[0] if len(target_fields) == 1 else (target_fields or "") ) if ( not target_fields and data.ds.tmp_roles ): # if the column is not suitable for the test, then the target will be empty, but if there is a role tempo, then this is normal behavior return data compare_result = self.calc( data=t_data, group_field=group_field, target_fields=target_fields, features_fields=t_data.search_columns( FeatureRole(), search_types=[int, float] ), ) return self._set_value(data, compare_result)