Source code for skdownscale.pointwise_models.bcsd

from __future__ import annotations

from typing import Any

import pandas as pd
from sklearn.utils.validation import check_is_fitted

from .base import TimeSynchronousDownscaler
from .groupers import DAY_GROUPER, MONTH_GROUPER, PaddedDOYGrouper
from .quantile import QuantileMapper
from .utils import default_none_kwargs


class BcsdBase(TimeSynchronousDownscaler):
    """Base class for BCSD model."""

    _fit_attributes = ['y_climo_', 'quantile_mappers_']
    _timestep = 'M'

    def __init__(
        self,
        time_grouper: str | pd.Grouper | type[PaddedDOYGrouper] = MONTH_GROUPER,
        climate_trend_grouper: str | pd.Grouper = DAY_GROUPER,
        climate_trend: str | pd.Grouper = MONTH_GROUPER,
        return_anoms: bool = True,
        qm_kwargs: dict[str, Any] | None = None,
    ) -> None:
        self.time_grouper = time_grouper
        self.climate_trend_grouper = climate_trend_grouper
        self.climate_trend = climate_trend
        self.return_anoms = return_anoms
        self.qm_kwargs = qm_kwargs

    def _pre_fit(self):
        if isinstance(self.time_grouper, str):
            if self.time_grouper == 'daily_nasa-nex':
                self.time_grouper = PaddedDOYGrouper
                self.timestep = 'daily'
            else:
                self.time_grouper_ = pd.Grouper(freq=self.time_grouper)
                self.timestep = 'monthly'
        else:
            self.time_grouper_ = self.time_grouper
            self.timestep = 'monthly'

    def _create_groups(self, df, climate_trend=False):
        """helper function to create groups by either daily or month"""
        if self.timestep == 'monthly':
            return df.groupby(self.time_grouper)
        elif self.timestep == 'daily':
            if climate_trend:
                # group by day only rather than also +/- offset days
                return df.groupby(self.climate_trend_grouper)
            else:
                return self.time_grouper(df)
        else:
            raise TypeError(f'unexpected time grouper type {self.time_grouper}')

    def _qm_fit_by_group(self, groups):
        """helper function to fit quantile mappers by group

        Note that we store these mappers for later
        """
        self.quantile_mappers_ = {}
        qm_kwargs = default_none_kwargs(self.qm_kwargs)
        for key, group in groups:
            self.quantile_mappers_[key] = QuantileMapper(**qm_kwargs).fit(group)

    def _qm_transform_by_group(self, groups):
        """helper function to apply quantile mapping by group

        Note that we recombine the dataframes using pd.concat, there may be a better way to do this
        """

        dfs = []
        for key, group in groups:
            qmapped = self.quantile_mappers_[key].transform(group)
            dfs.append(pd.DataFrame(qmapped, index=group.index, columns=group.columns))
        return pd.concat(dfs).sort_index()

    def _remove_climatology(self, obj, climatology, climate_trend=False):
        """helper function to remove climatologies"""
        dfs = []
        for key, group in self._create_groups(obj, climate_trend):
            if self.timestep == 'monthly':
                dfs.append(group - climatology.loc[key].values)
            elif self.timestep == 'daily':
                dfs.append(group - climatology.loc[key])

        result = pd.concat(dfs).sort_index()
        if obj.shape != result.shape:
            raise ValueError(f'Result shape {result.shape} does not match input shape {obj.shape}')
        return result


[docs] class BcsdPrecipitation(BcsdBase): """Classic BCSD model for Precipitation Parameters ---------- time_grouper : str or pd.Grouper, optional Pandas time frequency str or Grouper object. Specifies how to group time periods. Default is 'M' (e.g. Monthly). qm_kwargs : dict Keyword arguments to pass to QuantileMapper. Attributes ---------- time_grouper : pd.Grouper Linear Regression object. quantile_mappers_ : dict QuantileMapper objects (one for each time group). """
[docs] def fit(self, X, y): """Fit BcsdPrecipitation model Parameters ---------- X : pd.Series or pd.DataFrame, shape (n_samples, 1) Training data y : pd.Series or pd.DataFrame, shape (n_samples, 1) Target values. Returns ------- self : returns an instance of self. """ self._pre_fit() X, y = self._validate_data(X, y, y_numeric=True) # TO-DO: set n_features_n attribute if self.n_features_in_ != 1: raise ValueError(f'BCSD only supports 1 feature, found {self.n_features_in_}') y_groups = self._create_groups(y) # calculate the climatologies self.y_climo_ = y_groups.mean() if self.return_anoms and self.y_climo_.values.min() <= 0: raise ValueError('Invalid value in target climatology') # fit the quantile mappers # TO-DO: do we need to detrend the data before fitting the quantile mappers?? self._qm_fit_by_group(y_groups) return self
[docs] def predict(self, X): """Predict using the BcsdPrecipitation model Parameters ---------- X : pd.Series or pd.DataFrame, shape (n_samples, 1) Samples. Returns ------- C : pd.DataFrame, shape (n_samples, 1) Returns predicted values. """ check_is_fitted(self) X = self._validate_data(X) # Bias correction # apply quantile mapping by month or day Xqm = self._qm_transform_by_group(self._create_groups(X, climate_trend=True)) # calculate the anomalies as a ratio of the training data return self._calc_ratio_anoms(Xqm, self.y_climo_) if self.return_anoms else Xqm
def _calc_ratio_anoms(self, obj, climatology, climate_trend=False): """helper function for dividing day groups by climatology""" dfs = [] for key, group in self._create_groups(obj, climate_trend): if self.timestep == 'monthly': dfs.append(group / climatology.loc[key].values) else: dfs.append(group / climatology.loc[key]) result = pd.concat(dfs).sort_index() if obj.shape != result.shape: raise ValueError(f'Result shape {result.shape} does not match input shape {obj.shape}') return result def __sklearn_tags__(self): from dataclasses import replace tags = super().__sklearn_tags__() # Skip tests - only supports 1 feature, temporal order matters tags = replace(tags, _skip_test='BCSD only supports 1 feature and temporal order matters') return tags
[docs] class BcsdTemperature(BcsdBase):
[docs] def fit(self, X, y): """Fit BcsdTemperature model Parameters ---------- X : pd.Series or pd.DataFrame, shape (n_samples, 1) Training data y : pd.Series or pd.DataFrame, shape (n_samples, 1) Target values. Returns ------- self : returns an instance of self. """ self._pre_fit() X, y = self._validate_data(X, y, y_numeric=True) # TO-DO: set n_features_in attribute if self.n_features_in_ != 1: raise ValueError(f'BCSD only supports up to 4 features, found {self.n_features_in_}') # make groups for day or month y_groups = self._create_groups(y) # calculate the climatologies self._x_climo = self._create_groups(X).mean() self.y_climo_ = y_groups.mean() # fit the quantile mappers self._qm_fit_by_group(y_groups) return self
[docs] def predict(self, X): """Predict using the BcsdTemperature model Parameters ---------- X : DataFrame, shape (n_samples, 1) Samples. Returns ------- C : pd.DataFrame, shape (n_samples, 1) Returns predicted values. """ check_is_fitted(self) X = self._check_array(X) # Calculate the 9-year running mean for each month def rolling_func(x): return x.rolling(9, center=True, min_periods=1).mean() X_rolling_mean = X.groupby(self.climate_trend, group_keys=False).apply(rolling_func) # remove climatology from 9-year monthly mean climate trend X_shift = self._remove_climatology(X_rolling_mean, self._x_climo, climate_trend=True) # remove shift from model data X_no_shift = X - X_shift # Bias correction # apply quantile mapping by month or day Xqm = self._qm_transform_by_group(self._create_groups(X_no_shift, climate_trend=True)) # restore the climate trend X_qm_with_shift = X_shift + Xqm # return bias corrected absolute values or calculate the anomalies if self.return_anoms: return self._remove_climatology(X_qm_with_shift, self.y_climo_) else: return X_qm_with_shift
def _remove_climatology(self, obj, climatology, climate_trend=False): """helper function to remove climatologies""" dfs = [ group - climatology.loc[key].values for key, group in self._create_groups(obj, climate_trend) if self.timestep in ['monthly', 'daily'] ] result = pd.concat(dfs).sort_index() if obj.shape != result.shape: raise ValueError('shape of climo is not equal to input array') return result def __sklearn_tags__(self): from dataclasses import replace tags = super().__sklearn_tags__() # Skip tests - only supports 1 feature, temporal order matters tags = replace(tags, _skip_test='BCSD only supports 1 feature and temporal order matters') return tags