import copy
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
[docs]
class Matching:
"""
Matching with propensity score.
Attributes
----------
p_score : numpy.ndarray
Propensity Score.
"""
def __init__(self, learner, min_match_dist=1e-2) -> None:
"""
Parameters
----------
learner : object
Learner to estimate propensity score. Must have fit and predict_proba methods.
min_match_dist : float, default=1e-2
Minimum distance for matching.
"""
self.p_score: np.ndarray
self.eps: float = 1e-8
self.learner = learner
self.min_match_dist = min_match_dist
[docs]
def fit(
self, X: pd.DataFrame, treatment: np.ndarray, y: Optional[np.ndarray] = None
) -> None:
"""
Fit learner and Estimate Propensity Score.
Parameters
----------
X : pd.DataFrame
Covariates for propensity score.
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
"""
self.learner.fit(X, treatment)
self.p_score = np.clip(
self.learner.predict_proba(X)[:, 1], self.eps, 1 - self.eps
)
[docs]
def get_score(self) -> np.ndarray:
"""
Return propensity score.
Returns
-------
numpy.ndarray
Propensity score.
"""
return self.p_score
[docs]
def get_weight(self, treatment: np.ndarray, mode: str = "ate") -> np.ndarray:
"""
Return sample weight representing matching.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
mode : str
Adjustment method. raw or ate.
Returns
-------
numpy.ndarray
Sample weight.
"""
self._check_mode(mode)
if mode == "raw":
return np.ones(treatment.shape[0])
elif mode == "ate":
return self._get_matching_weight(treatment)
else:
raise ValueError("mode must be raw or ate.")
def _check_mode(self, mode: str) -> None:
"""
Check if it is a supported mode.
Parameters
----------
mode : str
Adjustment method. raw or ate.
"""
mode_list = ["raw", "ate"]
assert mode in mode_list, "mode must be string and it is raw or ate."
def _get_matching_weight(self, treatment: np.ndarray) -> np.ndarray:
"""
Match using propensity score and return sample_weight.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
Returns
-------
numpy.ndarray
Sample weight.
"""
score = self.p_score
treat_idx, control_idx = self._get_nearest_idx(treatment, score)
matching_idx = np.concatenate((treat_idx, control_idx), axis=0)
idx, counts = np.unique(matching_idx, return_counts=True)
weight = np.zeros(treatment.shape[0])
weight[idx] = counts
return weight
def _get_nearest_idx(
self, treatment: np.ndarray, score: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
"""
Match the closest data between groups with and without intervention.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
score : numpy.ndarray
Propensity Score.
Returns
-------
treat_idx : numpy.ndarray
Sample index of treatment group.
control_idx : numpy.ndarray
Sample index of control group.
"""
score = score.reshape(-1, 1)
control_size, treat_size = (~treatment).sum(), treatment.sum()
major_sample_group = np.argmax([control_size, treat_size])
neigh = NearestNeighbors(n_neighbors=5, metric="manhattan")
neigh.fit(score[treatment == major_sample_group])
distance, match_idx = neigh.kneighbors(
score[treatment != major_sample_group], 1, return_distance=True
)
# Flatten distance array before comparison
distance = distance.flatten()
match_idx = match_idx[distance < self.min_match_dist].flatten()
if major_sample_group == 1:
treat_idx = np.where(treatment)[0][match_idx]
control_idx = np.where(~treatment)[0]
else:
treat_idx = np.where(treatment)[0]
control_idx = np.where(~treatment)[0][match_idx]
return treat_idx, control_idx
[docs]
def estimate_effect(
self, treatment: np.ndarray, y: np.ndarray, mode: str = "ate"
) -> tuple[float, float, float]:
"""
Match using propensity score and return sample_weight.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
mode : str
Adjustment method. raw or ate.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
"""
self._check_mode(mode)
weight = self.get_weight(treatment, mode=mode)
return self._estimate_outcomes(treatment, y, weight)
def _estimate_outcomes(
self, treatment: np.ndarray, y: np.ndarray, weight: np.ndarray
) -> tuple[float, float, float]:
"""
Match using propensity score and return sample_weight.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
weight : numpy.ndarray
sample weight with matching.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
"""
avg_y_control = np.average(y[~treatment], axis=0, weights=weight[~treatment])
avg_y_treat = np.average(y[treatment], axis=0, weights=weight[treatment])
effect_size = avg_y_treat - avg_y_control
return (avg_y_control, avg_y_treat, effect_size)
[docs]
class IPW:
"""Inverse Probability Weighting Method."""
def __init__(self, learner) -> None:
"""
Parameters
----------
learner : object
Learner to estimate propensity score. Must have fit and predict_proba methods.
"""
self.p_score: np.ndarray = None # type: ignore
self.learner = learner
[docs]
def fit(
self,
X: pd.DataFrame,
treatment: np.ndarray,
y: Optional[np.ndarray] = None,
eps: float = 1e-8,
) -> None:
"""
Fit learner and Estimate Propensity Score.
Parameters
----------
X : pd.DataFrame
Covariates for propensity score.
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
eps : float, default=1e-8
Extreme Value Trend Score Rounding Value.
Raises
------
ValueError
If eps is not in range [0, 1).
"""
if not 0 <= eps < 1:
raise ValueError("eps must be in range [0, 1).")
self.learner.fit(X, treatment)
pred = self.learner.predict_proba(X)[:, 1]
self.p_score = np.clip(pred, eps, 1 - eps)
[docs]
def get_score(self) -> np.ndarray:
"""
Return propensity score.
Returns
-------
p_score : numpy.ndarray
Propensity score for each sample.
Raises
------
ValueError
If model is not fitted.
"""
if not hasattr(self, "p_score") or self.p_score is None:
raise ValueError("Model not fitted. Call fit() first.")
return self.p_score
[docs]
def get_weight(self, treatment: np.ndarray, mode: str = "ate") -> np.ndarray:
"""
Return sample weight representing matching.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
mode : str, default="ate"
Adjustment method. Must be 'raw', 'ate', 'att' or 'atu'.
Returns
-------
sample_weight : numpy.ndarray
Sample weights.
Raises
------
ValueError
If mode is not 'raw', 'ate', 'att' or 'atu'.
"""
self._check_mode(mode)
if not hasattr(self, "p_score") or self.p_score is None:
raise ValueError("Model not fitted. Call fit() first.")
if mode == "raw":
return np.ones(treatment.shape[0])
elif mode == "ate":
return np.where(treatment == 1, 1 / self.p_score, 1 / (1 - self.p_score))
elif mode == "att":
return np.where(treatment == 1, 1, self.p_score / (1 - self.p_score))
elif mode == "atu":
return np.where(treatment == 1, (1 - self.p_score) / self.p_score, 1)
else:
raise ValueError("mode must be raw, ate, att or atu.")
[docs]
def estimate_effect(
self, treatment: np.ndarray, y: np.ndarray, mode: str = "ate"
) -> tuple[float, float, float]:
"""
Calculate treatment effect using inverse probability weighting.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
mode : str
Adjustment method. must be raw, ate, att or atu.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
"""
self._check_mode(mode)
weight = self.get_weight(treatment, mode=mode)
return self._estimate_outcomes(treatment, y, weight)
def _check_mode(self, mode: str) -> None:
"""
Check if it is a supported mode.
Parameters
----------
mode : str
Adjustment method. must be raw, ate, att or atu.
"""
mode_list = ["raw", "ate", "att", "atu"]
assert mode in mode_list, (
"mode must be string and it is must be raw, ate, att or atu."
)
def _estimate_outcomes(
self, treatment: np.ndarray, y: np.ndarray, weight: np.ndarray
) -> tuple[float, float, float]:
"""
Calculate treatment effect using provided weights.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables.
weight : numpy.ndarray
sample weight with ipw.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
"""
avg_y_control = np.average(y[~treatment], axis=0, weights=weight[~treatment])
avg_y_treat = np.average(y[treatment], axis=0, weights=weight[treatment])
effect_size = avg_y_treat - avg_y_control
return (avg_y_control, avg_y_treat, effect_size)
[docs]
class DoubleRobust(IPW):
def __init__(self, learner, second_learner) -> None:
"""
Parameters
----------
learner : object
Learner to estimate propensity score. Must have fit and predict_proba methods.
second_learner : object
Learner to estimate anti-real virtual intervention effect. Must have fit and predict methods.
"""
super(DoubleRobust, self).__init__(learner)
self.treat_learner = copy.deepcopy(second_learner)
self.control_learner = copy.deepcopy(second_learner)
[docs]
def fit(
self,
X: pd.DataFrame,
treatment: np.ndarray,
y: np.ndarray,
eps: float = 1e-8,
) -> None:
"""
Fit learner and Estimate Propensity Score.
Parameters
----------
X : pd.DataFrame
Covariates for propensity score.
treatment : numpy.ndarray[bool]
Flags with or without intervention.
y : numpy.ndarray
Outcome variables. Can be 1D or 2D array.
eps : float, default=1e-8
Extreme Value Trend Score Rounding Value.
Raises
------
ValueError
If eps is not in range [0, 1).
"""
# 型を明示的にブール値に変換
treatment = treatment.astype(bool)
# DataFrameのコピーを作成し操作する(参照渡しによる問題を回避)
X_df = X.copy()
self.learner.fit(X_df, treatment)
if not 0 <= eps < 1:
raise ValueError("eps must be in range [0, 1).")
self.p_score = np.clip(self.learner.predict_proba(X_df)[:, 1], eps, 1 - eps)
# 入力が1次元配列の場合は2次元に変換する
if len(y.shape) == 1:
y = y.reshape(-1, 1)
self.y_control = np.zeros(y.shape)
self.y_treat = np.zeros(y.shape)
# treatment=TrueとFalseの両方のサンプルが存在することを確認
if np.sum(treatment) == 0 or np.sum(~treatment) == 0:
raise ValueError("Both treatment and control groups must have samples.")
# pandasのDataFrameからtreatment=TrueとFalse用のサブセットを抽出
X_treat = X_df.iloc[np.where(treatment)[0]]
X_control = X_df.iloc[np.where(~treatment)[0]]
# Fit second models
for i, _y in enumerate(y.T):
y_treat = _y[treatment]
y_control = _y[~treatment]
self.treat_learner.fit(X_treat, y_treat)
self.control_learner.fit(X_control, y_control)
# 予測時はindexではなくマスクを使用
self.y_control[:, i] = np.where(
~treatment, _y, self.control_learner.predict(X_df)
)
self.y_treat[:, i] = np.where(
treatment, _y, self.treat_learner.predict(X_df)
)
[docs]
def estimate_effect(
self, treatment: np.ndarray, mode: str = "ate"
) -> tuple[float, float, float]:
"""
Calculate the treatment effect using double robust method.
Parameters
----------
treatment : numpy.ndarray[bool]
Flags with or without intervention.
mode : str, default="ate"
Adjustment method. Must be 'raw', 'ate', 'att' or 'atu'.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
Raises
------
ValueError
If model is not fitted or mode is invalid.
"""
if not hasattr(self, "y_control") or not hasattr(self, "y_treat"):
raise ValueError("Model not fitted. Call fit() first.")
self._check_mode(mode)
weight = self.get_weight(treatment, mode=mode)
return self._estimate_outcomes(weight)
def _estimate_outcomes(self, weight: np.ndarray) -> tuple[float, float, float]:
"""
Calculate outcome estimates using double robust method.
Parameters
----------
weight : numpy.ndarray
Sample weight of IPW.
Returns
-------
tuple
A tuple containing (avg_y_control, avg_y_treat, effect_size)
"""
# Use scalar averaging if possible to avoid array comparison issues
avg_y_control = np.average(self.y_control, axis=0, weights=weight)
avg_y_treat = np.average(self.y_treat, axis=0, weights=weight)
effect_size = avg_y_treat - avg_y_control
# Convert to scalar if single dimension to avoid array comparison issues
if hasattr(effect_size, "__len__") and len(effect_size) == 1:
avg_y_control = float(avg_y_control)
avg_y_treat = float(avg_y_treat)
effect_size = float(effect_size)
return (avg_y_control, avg_y_treat, effect_size)