from __future__ import annotations
import warnings
from typing import Any, Literal
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import root_mean_squared_error
from sklearn.neighbors import KDTree
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted, validate_data
from .utils import default_none_kwargs
def select_analogs(analogs: NDArray[Any], inds: NDArray[np.integer[Any]]) -> NDArray[Any]:
# todo: this is possible with fancy indexing
out = np.empty(len(analogs))
for i, ind in enumerate(inds):
out[i] = analogs[i, ind]
return out
class NamedColumnBaseEstimator(BaseEstimator):
def _validate_data(
self,
X='no_validation',
y='no_validation',
**check_params,
):
if isinstance(X, pd.DataFrame):
feature_names = X.columns
else:
feature_names = None
# Check if we should validate y
should_validate_y = not (isinstance(y, str) and y == 'no_validation')
if should_validate_y:
result = validate_data(self, X=X, y=y, **check_params)
X, y = result
else:
result = validate_data(self, X=X, **check_params)
X = result
if feature_names is not None:
X = pd.DataFrame(X, columns=feature_names)
return (X, y) if should_validate_y else X
class AnalogBase(RegressorMixin, NamedColumnBaseEstimator):
_fit_attributes = ['kdtree_', 'X_', 'y_', 'k_']
def fit(self, X, y):
"""Fit Analog model using a KDTree
Parameters
----------
X : pd.Series or pd.DataFrame, shape (n_samples, n_features)
Training data
y : pd.Series or pd.DataFrame, shape (n_samples, 1)
Target values.
Returns
-------
self : returns an instance of self.
"""
X, y = self._validate_data(X, y=y, y_numeric=True)
if len(X) >= self.n_analogs:
self.k_ = self.n_analogs
else:
warnings.warn('length of X is less than n_analogs, setting n_analogs = len(X)')
self.k_ = len(X)
kdtree_kwargs = default_none_kwargs(self.kdtree_kwargs)
self.kdtree_ = KDTree(X, **kdtree_kwargs)
self.X_ = X
self.y_ = y
return self
def __sklearn_tags__(self):
from dataclasses import replace
tags = super().__sklearn_tags__()
# Skip tests - GARD models output 3 columns pandas dataframe
tags = replace(
tags,
_skip_test='GARD models output 3 columns pandas dataframe instead of one during predict',
)
return tags
[docs]
class AnalogRegression(AnalogBase):
"""AnalogRegression
Parameters
----------
n_analogs: int
Number of analogs to use when building linear regression
thresh: float or int
Threshold value. If provided, the model will predict:
1) the probability of this threshold being exceeded, and
2) the value given the threshold is exceeded
kdtree_kwargs : dict
Keyword arguments to pass to the sklearn.neighbors.KDTree constructor
query_kwargs : dict
Keyword arguments to pass to the sklearn.neighbors.KDTree.query method
lr_kwargs : dict
Keyword arguments to pass to the sklear.linear_model.LinearRegression
constructor
Attributes
----------
kdtree_ : sklearn.neighbors.KDTree
KDTree object
Notes
-----
GARD models generates three columns in the predict function, the columns include `pred`, the mean prediction value;
`exceedance_prob`, the probability of exceeding self.thresh value; and `prediction_error`, the RMSE associated
with the mean prediction.
"""
# the number of columns outputed in the predict method
n_outputs = 3
output_names = ['pred', 'exceedance_prob', 'prediction_error']
[docs]
def __init__(
self,
n_analogs: int = 200,
thresh: float | None = None,
kdtree_kwargs: dict[str, Any] | None = None,
query_kwargs: dict[str, Any] | None = None,
logistic_kwargs: dict[str, Any] | None = None,
lr_kwargs: dict[str, Any] | None = None,
) -> None:
self.n_analogs = n_analogs
self.thresh = thresh
self.kdtree_kwargs = kdtree_kwargs
self.query_kwargs = query_kwargs
self.logistic_kwargs = logistic_kwargs
self.lr_kwargs = lr_kwargs
[docs]
def predict(self, X):
"""Predict using the AnalogRegression model
Parameters
----------
X : DataFrame, shape (n_samples, 1)
Samples.
Returns
-------
C : pd.DataFrame, shape (n_samples, self.n_outputs)
Returns predicted values, including the mean prediction, exceedance probability, and prediction error
"""
# validate input data
return_df = isinstance(X, pd.DataFrame)
check_is_fitted(self)
X = check_array(X)
# not used if self.thresh = None, instantiating to keep the code clean
logistic_kwargs = default_none_kwargs(self.logistic_kwargs)
logistic_model = LogisticRegression(**logistic_kwargs) if self.thresh is not None else None
lr_kwargs = default_none_kwargs(self.lr_kwargs)
lr_model = LinearRegression(**lr_kwargs)
out = np.empty((len(X), self.n_outputs), dtype=np.float64)
for i in range(len(X)):
# predict for this time step
result = self._predict_one_step(
logistic_model,
lr_model,
X[None, i],
)
out[i, :] = result
# if the input is a dataframe, return dataframe, otherwise return a numpy array
# the output_names can be used to determine the order of columns
return pd.DataFrame(out, columns=self.output_names) if return_df else out
def _predict_one_step(self, logistic_model, lr_model, X):
# get analogs
query_kwargs = default_none_kwargs(self.query_kwargs)
inds = self.kdtree_.query(X, k=self.k_, return_distance=False, **query_kwargs).squeeze()
# extract data to train linear regression model
x = np.asarray(self.kdtree_.data)[inds]
y = self.y_[inds]
# figure out if there's a threshold of interest
if self.thresh is not None:
exceed_ind = y > self.thresh
else:
exceed_ind = np.ones(len(y), dtype=bool)
# train logistic regression model
binary_y = exceed_ind.astype(np.int8)
if not np.all(binary_y == 1):
logistic_model.fit(x, binary_y)
exceedance_prob = logistic_model.predict_proba(X)[0, 0]
else:
exceedance_prob = 1.0
# train linear regression model on data above threshold of interest
lr_model.fit(x[exceed_ind], y[exceed_ind])
# calculate the rmse of prediction
y_hat = lr_model.predict(x[exceed_ind])
error = root_mean_squared_error(y[exceed_ind], y_hat)
predicted = lr_model.predict(X).item()
# this order needs to be the same as output_names
return [predicted, exceedance_prob, error]
[docs]
class PureAnalog(AnalogBase):
"""PureAnalog
Parameters
----------
n_analogs : int
Number of analogs to use
thresh : float
Subset analogs based on threshold
stats : bool
Calculate fit statistics during predict step
kdtree_kwargs : dict
Dictionary of keyword arguments to pass to cKDTree constructor
query_kwargs : dict
Dictionary of keyword arguments to pass to `cKDTree.query`
Attributes
----------
kdtree_ : sklearn.neighbors.KDTree
KDTree object
Notes
-----
GARD models generates three columns in the predict function, the columns include `pred`, the mean prediction value;
`exceedance_prob`, the probability of exceeding self.thresh value; and `prediction_error`, the RMSE associated
with the mean prediction.
"""
n_outputs = 3
output_names = ['pred', 'exceedance_prob', 'prediction_error']
[docs]
def __init__(
self,
n_analogs: int = 200,
kind: Literal[
'best_analog', 'sample_analogs', 'weight_analogs', 'mean_analogs'
] = 'best_analog',
thresh: float | None = None,
kdtree_kwargs: dict[str, Any] | None = None,
query_kwargs: dict[str, Any] | None = None,
) -> None:
self.n_analogs = n_analogs
self.kind = kind
self.thresh = thresh
self.kdtree_kwargs = kdtree_kwargs
self.query_kwargs = query_kwargs
[docs]
def predict(self, X):
"""Predict using the PureAnalog model
Parameters
----------
X : pd.Series or pd.DataFrame, shape (n_samples, 1)
Samples.
Returns
-------
C : pd.DataFrame, shape (n_samples, self.n_outputs)
Returns predicted values, including the mean prediction, exceedance probability, and prediction error
"""
# validate input data
return_df = isinstance(X, pd.DataFrame)
check_is_fitted(self)
X = check_array(X)
if self.kind == 'best_analog' or self.n_analogs == 1:
k = 1
kind = 'best_analog'
else:
k = self.k_
kind = self.kind
query_kwargs = default_none_kwargs(self.query_kwargs)
dist, inds = self.kdtree_.query(X, k=k, **query_kwargs)
analogs = np.take(self.y_, inds, axis=0)
if self.thresh is not None:
# TODO: rethink how the analog threshold is applied.
# There are certainly edge cases not dealt with properly here
# particularly in the weight analogs case
analog_mask = analogs > self.thresh
masked_analogs = np.where(analog_mask, analogs, np.nan)
if kind == 'best_analog':
predicted = analogs[:, 0]
elif kind == 'sample_analogs':
# get 1 random index to sample from the analogs
rand_inds = np.random.randint(low=0, high=k, size=len(X))
# select the analog now
predicted = select_analogs(analogs, rand_inds)
elif kind == 'weight_analogs':
# take weighted average
# work around for zero distances (perfect matches)
tiny = 1e-20
weights = 1.0 / np.where(dist == 0, tiny, dist)
if self.thresh is not None:
predicted = np.average(masked_analogs, weights=weights, axis=1)
else:
predicted = np.average(analogs.squeeze(), weights=weights, axis=1)
elif kind == 'mean_analogs':
if self.thresh is not None:
predicted = masked_analogs.mean(axis=1)
else:
predicted = analogs.mean(axis=1)
else:
raise ValueError(f'got unexpected kind {kind}')
if self.thresh is not None:
# for mean/weight cases, this fills nans when all analogs
# were below thresh
predicted = np.nan_to_num(predicted, nan=0.0)
prediction_error = masked_analogs.std(axis=1)
exceedance_prob = np.where(analog_mask, 1, 0).mean(axis=1)
else:
prediction_error = analogs.std(axis=1)
exceedance_prob = np.ones(len(X), dtype=np.float64)
# if the input is a dataframe, return dataframe, otherwise return a numpy array
# the output_names can be used to determine the order of columns
if return_df:
out = pd.DataFrame(
{
'pred': predicted,
'exceedance_prob': exceedance_prob,
'prediction_error': prediction_error,
}
)
return out[self.output_names]
else:
predicted = predicted.reshape(-1, 1)
exceedance_prob = exceedance_prob.reshape(-1, 1)
prediction_error = prediction_error.reshape(-1, 1)
# this order has to be the same as output_names
return np.hstack((predicted, exceedance_prob, prediction_error))
class PureRegression(RegressorMixin, NamedColumnBaseEstimator):
"""PureRegression
Parameters
----------
thresh : float
Subset analogs based on threshold
logistic_kwargs : dict
Dictionary of keyword arguments to pass to logistic regression model
linear_kwargs : dict
Dictionary of keyword arguments to pass to linear regression model
Attributes
----------
kdtree_ : sklearn.neighbors.KDTree
KDTree object
Notes
-----
GARD models generates three columns in the predict function, the columns include `pred`, the mean prediction value;
`exceedance_prob`, the probability of exceeding self.thresh value; and `prediction_error`, the RMSE associated
with the mean prediction.
"""
_fit_attributes = [
'logistic_model_',
'linear_model_',
'fit_error_',
]
n_outputs = 3
output_names = ['pred', 'exceedance_prob', 'prediction_error']
def __init__(
self,
thresh: float | None = None,
logistic_kwargs: dict[str, Any] | None = None,
linear_kwargs: dict[str, Any] | None = None,
) -> None:
self.thresh = thresh
self.logistic_kwargs = logistic_kwargs
self.linear_kwargs = linear_kwargs
def fit(self, X, y):
X, y = self._validate_data(X, y=y, y_numeric=True)
if self.thresh is not None:
exceed_ind = y > self.thresh
binary_y = exceed_ind.astype(np.int8)
logistic_kwargs = default_none_kwargs(self.logistic_kwargs)
try:
self.logistic_model_ = LogisticRegression(**logistic_kwargs).fit(X, binary_y)
except ValueError:
# if the value error is raised enter here - this is
# more efficient for this very rare corner case
# confirm that this error is due to the there being no dry days
if len(np.unique(exceed_ind)) > 1:
raise ValueError(
f'Logistic regression requires at least two classes. Found {np.unique(exceed_ind)}'
)
else:
warnings.warn(
'Found only one class while attempting logistic regression. Mutating attribute thresh'
)
# if you get a value error, just set the threshold to None
# so that when you get to the predict you'll just act as if it's a normal linear
# don't need to adjust the exceed_ind because (a) if they're all exceeding then
# exceed_ind is already all 1s (as would be done below in the case of self.thresh=None)
# and (b) if they're all not exceeding, then they'll just get the 1's applied to the error term
# which, in the case of all zeros, will be 0, and in the case of non-zero but below the threshold,
# we want to retain.
self.thresh = None
else:
exceed_ind = np.ones(len(y), dtype=bool)
linear_kwargs = default_none_kwargs(self.linear_kwargs)
self.linear_model_ = LinearRegression(**linear_kwargs).fit(X[exceed_ind], y[exceed_ind])
y_hat = self.linear_model_.predict(X[exceed_ind])
error = root_mean_squared_error(y[exceed_ind], y_hat)
self.fit_error_ = error
return self
def predict(self, X):
"""Predict using the PureRegression model
Parameters
----------
X : pd.Series or pd.DataFrame, shape (n_samples, 1)
Samples.
Returns
-------
C : pd.DataFrame, shape (n_samples, self.n_outputs)
Returns predicted values, including the mean prediction, exceedance probability, and prediction error
"""
return_df = isinstance(X, pd.DataFrame)
check_is_fitted(self)
if self.thresh is not None:
# 0 accesses the probability of no rain, 1 accesses probability of rainy day
exceedance_prob = self.logistic_model_.predict_proba(X)[:, 1]
else:
exceedance_prob = np.ones(len(np.asarray(X)), dtype=np.float64)
prediction_error = np.full(
shape=len(np.asarray(X)), dtype=np.float64, fill_value=self.fit_error_
)
predicted = self.linear_model_.predict(X)
# if the input is a dataframe, return dataframe, otherwise return a numpy array
# the output_names can be used to determine the order of columns
if return_df:
out = pd.DataFrame(
{
'pred': predicted,
'exceedance_prob': exceedance_prob,
'prediction_error': prediction_error,
}
)
return out[self.output_names]
else:
predicted = predicted.reshape(-1, 1)
exceedance_prob = exceedance_prob.reshape(-1, 1)
prediction_error = prediction_error.reshape(-1, 1)
# this order has to be the same as output_names
return np.hstack((predicted, exceedance_prob, prediction_error))
def __sklearn_tags__(self):
from dataclasses import replace
tags = super().__sklearn_tags__()
# Skip tests - GARD models output 3 columns pandas dataframe
tags = replace(
tags,
_skip_test='GARD models output 3 columns pandas dataframe instead of one during predict',
)
return tags