Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions dte_adj/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC
from tqdm.auto import tqdm
import dte_adj
from dte_adj.util import _infer_default_locations


class DistributionEstimatorBase(ABC):
Expand All @@ -19,12 +20,13 @@ def __init__(self):
self.covariates = None
self.outcomes = None
self.treatment_arms = None
self.last_locations = None

def predict_dte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
variance_type="moment",
n_bootstrap=500,
Expand All @@ -40,7 +42,11 @@ def predict_dte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
locations (np.ndarray, optional): Scalar values to be used for computing the cumulative
distribution. If None, evenly-spaced locations spanning the observed outcome range
are generated automatically. The number of points is determined from data size and
distribution via ``np.histogram_bin_edges(outcomes, bins='auto')``. The actual array
used is stored on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
variance_type (str, optional): Variance type to be used to compute confidence intervals.
Available values are "moment", "simple", and "uniform". Defaults to "moment".
Expand Down Expand Up @@ -80,6 +86,11 @@ def predict_dte(
print(f"DTE shape: {dte.shape}") # Should match locations.shape
print(f"Average DTE: {dte.mean():.3f}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=False
)
self.last_locations = locations
return self._compute_dtes(
target_treatment_arm,
control_treatment_arm,
Expand All @@ -94,7 +105,7 @@ def predict_pte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
variance_type="moment",
n_bootstrap=500,
Expand All @@ -110,8 +121,14 @@ def predict_pte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values defining interval boundaries for probability computation.
For each interval (locations[i], locations[i+1]], the PTE is computed.
locations (np.ndarray, optional): Scalar values defining interval boundaries for
probability computation. For each interval (locations[i], locations[i+1]], the PTE
is computed. If None, boundaries spanning the observed outcome range are generated
automatically with the left endpoint placed just below ``outcomes.min()`` so that
minimum-valued samples fall inside the first interval. The number of boundaries is
determined from data size and distribution via
``np.histogram_bin_edges(outcomes, bins='auto')``. The actual array used is stored
on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
variance_type (str, optional): Variance type to be used to compute confidence intervals.
Available values are "moment", "simple", and "uniform". Defaults to "moment".
Expand Down Expand Up @@ -154,6 +171,11 @@ def predict_pte(
print(f"PTE shape: {pte.shape}") # Should be (4,) for 4 intervals
print(f"Interval effects: {pte}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=True
)
self.last_locations = locations
return self._compute_ptes(
target_treatment_arm,
control_treatment_arm,
Expand Down
68 changes: 56 additions & 12 deletions dte_adj/local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from __future__ import annotations

import numpy as np
from typing import Tuple
from typing import Optional, Tuple
from dte_adj.stratified import (
SimpleStratifiedDistributionEstimator,
AdjustedStratifiedDistributionEstimator,
)
from dte_adj.util import ArrayLike, compute_ldte, compute_lpte, _convert_to_ndarray
from dte_adj.util import (
ArrayLike,
compute_ldte,
compute_lpte,
_convert_to_ndarray,
_infer_default_locations,
)


class SimpleLocalDistributionEstimator(SimpleStratifiedDistributionEstimator):
Expand Down Expand Up @@ -59,7 +65,7 @@ def predict_ldte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
display_progress: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -73,7 +79,11 @@ def predict_ldte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
locations (np.ndarray, optional): Scalar values to be used for computing the cumulative
distribution. If None, evenly-spaced locations spanning the observed outcome range
are generated automatically. The number of points is determined from data size and
distribution via ``np.histogram_bin_edges(outcomes, bins='auto')``. The actual
array used is stored on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
display_progress (bool, optional): Whether to display a progress bar. Defaults to True.

Expand Down Expand Up @@ -113,6 +123,11 @@ def predict_ldte(
print(f"LDTE shape: {ldte.shape}") # Should match locations.shape
print(f"Average LDTE: {ldte.mean():.3f}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=False
)
self.last_locations = locations
return compute_ldte(
self,
target_treatment_arm,
Expand All @@ -126,7 +141,7 @@ def predict_lpte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
display_progress: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -140,8 +155,13 @@ def predict_lpte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values defining interval boundaries for probability computation.
For each interval (locations[i], locations[i+1]], the LPTE is computed.
locations (np.ndarray, optional): Scalar values defining interval boundaries for
probability computation. For each interval (locations[i], locations[i+1]], the LPTE
is computed. If None, boundaries spanning the observed outcome range are generated
automatically with the left endpoint placed just below ``outcomes.min()``. The
number of boundaries is determined from data size and distribution via
``np.histogram_bin_edges(outcomes, bins='auto')``. The actual array used is stored
on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
display_progress (bool, optional): Whether to display a progress bar. Defaults to True.

Expand Down Expand Up @@ -183,6 +203,11 @@ def predict_lpte(
print(f"LPTE shape: {lpte.shape}") # Should be (4,) for 4 intervals
print(f"Interval effects: {lpte}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=True
)
self.last_locations = locations
return compute_lpte(
self,
target_treatment_arm,
Expand Down Expand Up @@ -234,7 +259,7 @@ def predict_ldte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
display_progress: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -247,7 +272,11 @@ def predict_ldte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
locations (np.ndarray, optional): Scalar values to be used for computing the cumulative
distribution. If None, evenly-spaced locations spanning the observed outcome range
are generated automatically. The number of points is determined from data size and
distribution via ``np.histogram_bin_edges(outcomes, bins='auto')``. The actual
array used is stored on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
display_progress (bool, optional): Whether to display a progress bar. Defaults to True.

Expand Down Expand Up @@ -289,6 +318,11 @@ def predict_ldte(

print(f"Adjusted LDTE: {ldte.mean():.3f}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=False
)
self.last_locations = locations
return compute_ldte(
self,
target_treatment_arm,
Expand All @@ -302,7 +336,7 @@ def predict_lpte(
self,
target_treatment_arm: int,
control_treatment_arm: int,
locations: np.ndarray,
locations: Optional[np.ndarray] = None,
alpha: float = 0.05,
display_progress: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -315,8 +349,13 @@ def predict_lpte(
Args:
target_treatment_arm (int): The index of the treatment arm of the treatment group.
control_treatment_arm (int): The index of the treatment arm of the control group.
locations (np.ndarray): Scalar values defining interval boundaries for probability computation.
For each interval (locations[i], locations[i+1]], the LPTE is computed.
locations (np.ndarray, optional): Scalar values defining interval boundaries for
probability computation. For each interval (locations[i], locations[i+1]], the LPTE
is computed. If None, boundaries spanning the observed outcome range are generated
automatically with the left endpoint placed just below ``outcomes.min()``. The
number of boundaries is determined from data size and distribution via
``np.histogram_bin_edges(outcomes, bins='auto')``. The actual array used is stored
on ``self.last_locations``.
alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
display_progress (bool, optional): Whether to display a progress bar. Defaults to True.

Expand Down Expand Up @@ -361,6 +400,11 @@ def predict_lpte(

print(f"Adjusted LPTE: {lpte}")
"""
if locations is None:
locations = _infer_default_locations(
self.outcomes, for_intervals=True
)
self.last_locations = locations
return compute_lpte(
self,
target_treatment_arm,
Expand Down
36 changes: 36 additions & 0 deletions dte_adj/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,42 @@ def _convert_to_ndarray(data: ArrayLike) -> np.ndarray:
return np.asarray(data)


def _infer_default_locations(
outcomes: np.ndarray,
for_intervals: bool = False,
) -> np.ndarray:
"""Generate evenly-spaced default locations from observed outcomes.

The number of points is determined from data size and distribution using
``np.histogram_bin_edges(outcomes, bins='auto')`` (which combines the
Sturges and Freedman-Diaconis rules).

Args:
outcomes (np.ndarray): Observed outcomes used to determine the range.
for_intervals (bool, optional): If True, the left endpoint is placed
slightly below ``outcomes.min()`` so that observations equal to the
minimum fall inside the first interval ``(loc[0], loc[1]]``. Set
this for PTE/LPTE estimation. Defaults to False.

Returns:
np.ndarray: Evenly-spaced locations array.
"""
n_locations = len(np.histogram_bin_edges(outcomes, bins="auto"))

y_min = float(outcomes.min())
y_max = float(outcomes.max())

if for_intervals:
# Place the left endpoint strictly below y_min so that the smallest
# observation falls inside the first interval (loc[0], loc[1]]. The
# offset scales with the magnitude of the data so that ``y_min - eps``
# is representable even when the outcome range is zero.
scale = max(y_max - y_min, abs(y_min), abs(y_max), 1.0)
eps = scale * 1e-9
return np.linspace(y_min - eps, y_max, n_locations)
return np.linspace(y_min, y_max, n_locations)


def compute_confidence_intervals(
vec_y: np.ndarray,
vec_d: np.ndarray,
Expand Down
55 changes: 55 additions & 0 deletions tests/test_local_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,61 @@ def test_simple_local_estimator_predict_lpte(self):
self.assertTrue(np.all(lower_bound <= beta))
self.assertTrue(np.all(beta <= upper_bound))

def test_simple_local_estimator_predict_ldte_without_locations(self):
"""LDTE auto-infers locations from outcomes when none are passed."""
estimator = SimpleLocalDistributionEstimator()
estimator.fit(
self.covariates,
self.treatment_arms,
self.treatment_indicator,
self.outcomes,
self.strata,
)

beta, lower, upper = estimator.predict_ldte(
target_treatment_arm=1,
control_treatment_arm=0,
alpha=0.05,
)

n = estimator.last_locations.shape[0]
self.assertGreater(n, 1)
self.assertEqual(beta.shape, (n,))
self.assertEqual(lower.shape, (n,))
self.assertEqual(upper.shape, (n,))
self.assertAlmostEqual(
estimator.last_locations[0], float(self.outcomes.min())
)
self.assertAlmostEqual(
estimator.last_locations[-1], float(self.outcomes.max())
)

def test_simple_local_estimator_predict_lpte_without_locations(self):
"""LPTE auto-infers interval boundaries, with left endpoint below min."""
estimator = SimpleLocalDistributionEstimator()
estimator.fit(
self.covariates,
self.treatment_arms,
self.treatment_indicator,
self.outcomes,
self.strata,
)

beta, lower, upper = estimator.predict_lpte(
target_treatment_arm=1,
control_treatment_arm=0,
alpha=0.05,
)

n = estimator.last_locations.shape[0]
# LPTE output length is len(locations) - 1
self.assertEqual(beta.shape, (n - 1,))
self.assertEqual(lower.shape, (n - 1,))
self.assertEqual(upper.shape, (n - 1,))
self.assertLess(
estimator.last_locations[0], float(self.outcomes.min())
)

def test_adjusted_local_estimator_predict_lpte(self):
"""Test that AdjustedLocalDistributionEstimator can predict LPTE."""
base_model = LogisticRegression(random_state=42)
Expand Down
Loading
Loading