diff --git a/ads/model/model_metadata.py b/ads/model/model_metadata.py index f0428ec9c..90a558ace 100644 --- a/ads/model/model_metadata.py +++ b/ads/model/model_metadata.py @@ -165,6 +165,8 @@ class Framework(ExtendedEnum): PYOD = "pyod" SPACY = "spacy" PROPHET = "prophet" + THETA = "theta" + ETSForecaster = "ets" SKTIME = "sktime" STATSMODELS = "statsmodels" CUML = "cuml" diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index 0c024cb05..15405c59f 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -23,6 +23,7 @@ InvalidParameterError, ) from ads.secrets import ADBSecretKeeper +from sktime.param_est.seasonality import SeasonalityACF def call_pandas_fsspec(pd_fn, filename, storage_options, **kwargs): @@ -385,3 +386,63 @@ def enable_print(): except Exception: pass sys.stdout = sys.__stdout__ + + +def find_seasonal_period_from_dataset(data: pd.DataFrame) -> tuple[int, list]: + try: + sp_est = SeasonalityACF() + sp_est.fit(data) + sp = sp_est.get_fitted_params()["sp"] + probable_sps = sp_est.get_fitted_params()["sp_significant"] + return sp, probable_sps + except Exception as e: + logger.warning(f"Unable to find seasonal period: {e}") + return None, None + + +def normalize_frequency(freq: str) -> str: + """ + Normalize pandas frequency strings to sktime/period-compatible formats. + + Args: + freq: Pandas frequency string + + Returns: + Normalized frequency string compatible with PeriodIndex + """ + if freq is None: + return None + + freq = freq.upper() + + # Handle weekly frequencies with day anchors (W-SUN, W-MON, etc.) + if freq.startswith("W-"): + return "W" + + # Handle month start/end frequencies + freq_mapping = { + "MS": "M", # Month Start -> Month End + "ME": "M", # Month End -> Month + "BMS": "M", # Business Month Start -> Month + "BME": "M", # Business Month End -> Month + "QS": "Q", # Quarter Start -> Quarter + "QE": "Q", # Quarter End -> Quarter + "BQS": "Q", # Business Quarter Start -> Quarter + "BQE": "Q", # Business Quarter End -> Quarter + "YS": "Y", # Year Start -> Year (Alias: A) + "AS": "Y", # Year Start -> Year (Alias: A) + "YE": "Y", # Year End -> Year + "AE": "Y", # Year End -> Year + "BYS": "Y", # Business Year Start -> Year + "BAS": "Y", # Business Year Start -> Year + "BYE": "Y", # Business Year End -> Year + "BAE": "Y", # Business Year End -> Year + } + + # Handle frequencies with prefixes (e.g., "2W", "3M") + for old_freq, new_freq in freq_mapping.items(): + if freq.endswith(old_freq): + prefix = freq[:-len(old_freq)] + return f"{prefix}{new_freq}" if prefix else new_freq + + return freq \ No newline at end of file diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index f2265418a..077cb49ef 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -15,6 +15,8 @@ class SupportedModels(ExtendedEnum): NeuralProphet = "neuralprophet" LGBForecast = "lgbforecast" AutoMLX = "automlx" + Theta = "theta" + ETSForecaster = "ets" AutoTS = "autots" # Auto = "auto" diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index db2c73507..88d29a539 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -962,3 +962,38 @@ def _get_unique_filename(self, base_path: str, storage_options: dict = None) -> f"Error checking OCI path existence: {e}. Using original path." ) return base_path + + def generate_explanation_report_from_data(self) -> tuple[rc.Block, rc.Block]: + if not self.target_cat_col: + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True + ) + + # Create a markdown section for the global explainability + global_explanation_section = rc.Block( + rc.Heading("Global Explainability", level=2), + rc.Text( + "The following tables provide the feature attribution for the global explainability." + ), + rc.DataTable(self.formatted_global_explanation, index=True), + ) + + blocks = [ + rc.DataTable( + local_ex_df.drop("Series", axis=1), + label=s_id if self.target_cat_col else None, + index=True, + ) + for s_id, local_ex_df in self.local_explanation.items() + ] + local_explanation_section = rc.Block( + rc.Heading("Local Explanation of Models", level=2), + rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], + ) + return global_explanation_section, local_explanation_section \ No newline at end of file diff --git a/ads/opctl/operator/lowcode/forecast/model/ets.py b/ads/opctl/operator/lowcode/forecast/model/ets.py new file mode 100644 index 000000000..5c67ead4b --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/ets.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python + +import logging +import traceback +from typing import Dict, Any + +import numpy as np +import optuna +import pandas as pd +from joblib import Parallel, delayed +from optuna.trial import TrialState +from sktime.split import ExpandingWindowSplitter +from statsmodels.tsa.exponential_smoothing.ets import ETSModel + +from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import find_seasonal_period_from_dataset +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, _build_metrics_df) +from .forecast_datasets import ForecastDatasets, ForecastOutput +from .univariate_model import UnivariateForecasterOperatorModel +from ..const import ( + SupportedModels, DEFAULT_TRIALS, +) + +logging.getLogger("report_creator").setLevel(logging.WARNING) + + +class ETSOperatorModel(UnivariateForecasterOperatorModel): + """ETS operator model""" + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + self.global_explanation = {} + self.local_explanation = {} + + def set_kwargs(self): + """Prepare kwargs for ETS model from spec. + The operator's 'model_kwargs' is respected. + """ + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) + model_kwargs["error"] = self.spec.model_kwargs.get("error", "add") + model_kwargs["trend"] = self.spec.model_kwargs.get("trend", None) + model_kwargs["damped_trend"] = self.spec.model_kwargs.get("damped_trend", False) + model_kwargs["seasonal"] = self.spec.model_kwargs.get("seasonal", None) + model_kwargs["seasonal_periods"] = self.spec.model_kwargs.get("seasonal_periods", None) + model_kwargs["initialization_method"] = self.spec.model_kwargs.get("initialization_method", "estimated") + + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else 1 - model_kwargs[ + "alpha"] + + model_kwargs["interval_width"] = self.spec.confidence_interval_width + return model_kwargs + + def preprocess(self, data, series_id): + self.le[series_id], df_encoded = _label_encode_dataframe( + data, + no_encode={self.spec.datetime_column.name, self.original_target_column}, + ) + return df_encoded.set_index(self.spec.datetime_column.name) + + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): + try: + self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) + data = self.preprocess(df, series_id) + data_i = self.drop_horizon(data) + freq = self.datasets.get_datetime_frequency() + + Y = data_i[self.spec.target_column] + dates = data_i.index.values + + if model_kwargs["seasonal"] is None: + model_kwargs["seasonal"] = "add" + + if self.loaded_models is not None and series_id in self.loaded_models: + previous_res = self.loaded_models[series_id].get("model") + model_kwargs["error"] = previous_res.model.error + model_kwargs["trend"] = previous_res.model.trend + model_kwargs["damped_trend"] = previous_res.damped_trend + model_kwargs["seasonal"] = previous_res.model.seasonal + model_kwargs["seasonal_periods"] = previous_res.model.seasonal_periods + model_kwargs["initialization_method"] = previous_res.model.initialization_method + else: + if self.perform_tuning: + model_kwargs = self.run_tuning(Y, model_kwargs) + + use_seasonal = (model_kwargs["seasonal"] is not None and + model_kwargs["seasonal_periods"] is not None and + len(Y) >= 2 * model_kwargs["seasonal_periods"] + ) + if not use_seasonal: + model_kwargs["seasonal"] = None + model_kwargs["seasonal_periods"] = None + + model = ETSModel(Y, error=model_kwargs["error"], trend=model_kwargs["trend"], + damped_trend=model_kwargs["damped_trend"], seasonal=model_kwargs["seasonal"], + seasonal_periods=model_kwargs["seasonal_periods"], + dates=dates, + freq=freq, + initialization_method=model_kwargs["initialization_method"], + initial_level=model_kwargs.get("initial_level", None), + initial_trend=model_kwargs.get("initial_trend", None), + initial_seasonal=model_kwargs.get("initial_seasonal", None), ) + fit = model.fit() + fitted_values = fit.fittedvalues + forecast_values = fit.forecast(self.spec.horizon) + f1 = fit.get_prediction(start=len(Y), end=len(Y) + self.spec.horizon - 1) + forecast_bounds = f1.summary_frame(alpha=1 - self.spec.confidence_interval_width) + + forecast = pd.DataFrame( + pd.concat( + [forecast_values, forecast_bounds["pi_lower"], forecast_bounds["pi_upper"]], + axis=1, + ), + index=forecast_bounds.index, + ) + + forecast.columns = ["yhat", "yhat_lower", "yhat_upper"] + + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + self.forecast_output.populate_series_output( + series_id=series_id, + fit_val=fitted_values.values, + forecast_val=forecast["yhat"].values, + upper_bound=forecast["yhat_upper"].values, + lower_bound=forecast["yhat_lower"].values, + ) + self.outputs[series_id] = forecast + self.models[series_id] = {} + self.models[series_id]["model"] = fit + self.models[series_id]["le"] = self.le[series_id] + + params = vars(model).copy() + for param in ["arima_res_", "endog_index_"]: + if param in params: + params.pop(param) + self.model_parameters[series_id] = { + "framework": SupportedModels.Arima, + **params, + } + + logger.debug("===========Done===========") + + except Exception as e: + self.errors_dict[series_id] = { + "model_name": self.spec.model, + "error": str(e), + "error_trace": traceback.format_exc(), + } + logger.error(f"Encountered Error: {e}. Skipping.") + logger.error(traceback.format_exc()) + + def _build_model(self) -> pd.DataFrame: + """Build models for all series in parallel and return forecast long format.""" + full_data_dict = self.datasets.get_data_by_series() + self.models = {} + self.outputs = {} + self.explanations_info = {} + model_kwargs = self.set_kwargs() + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width, + horizon=self.spec.horizon, + target_column=self.original_target_column, + dt_column=self.spec.datetime_column.name, + ) + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ETSOperatorModel._train_model)( + self, i, series_id, df, model_kwargs.copy() + ) + for self, (i, (series_id, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) + + return self.forecast_output.get_forecast_long() + + def run_tuning(self, y: pd.Series, model_kwargs_i: Dict[str, Any]): + + tsp, probable_sps = find_seasonal_period_from_dataset(y) + + def objective(trial): + + error = trial.suggest_categorical("error", ["add", "mul"]) + trend = trial.suggest_categorical("trend", ["add", "mul", None]) + damped_trend = trial.suggest_categorical("damped_trend", [True, False]) + sp = trial.suggest_categorical("seasonal_periods", probable_sps) + initialization_method = trial.suggest_categorical( + "initialization_method", ["estimated", "heuristic"] + ) + seasonal = trial.suggest_categorical("seasonal", ["add", "mul", None]) + + if (error == "mul" or trend == "mul" or seasonal == "mul") and (y <= 0).any(): + raise optuna.exceptions.TrialPruned() + + # Invalid combination + if trend is None and damped_trend: + raise optuna.exceptions.TrialPruned() + + cv = ExpandingWindowSplitter( + initial_window=max(50, self.spec.horizon * 3), + step_length=self.spec.horizon, + fh=np.arange(1, self.spec.horizon + 1), + ) + + scores = [] + dates = y.index.values + + for train_idx, test_idx in cv.split(y): + + y_train = y.iloc[train_idx] + y_test = y.iloc[test_idx] + + if ( + seasonal is not None and sp is not None + and len(y_train) < 2 * sp + ): + raise optuna.exceptions.TrialPruned() + + try: + model = ETSModel( + y_train, + error=error, + trend=trend, + damped_trend=damped_trend, + seasonal=seasonal, + seasonal_periods=sp, + dates=dates, + freq=self.datasets.get_datetime_frequency(), + initialization_method=initialization_method, + initial_level=model_kwargs_i.get("initial_level"), + initial_trend=model_kwargs_i.get("initial_trend"), + initial_seasonal=model_kwargs_i.get("initial_seasonal"), + ) + + fit = model.fit() + y_pred = fit.forecast(len(y_test)) + + metrics_df = _build_metrics_df(y_test, y_pred, 0) + metrics_dict = { + k.lower(): v + for k, v in metrics_df[0].to_dict().items() + } + if self.spec.metric.lower() not in metrics_dict: + scores.append(metrics_dict["mape"]) + else: + scores.append(metrics_dict[self.spec.metric.lower()]) + + except Exception: + continue + return np.mean(scores) + + study = optuna.create_study(direction="minimize") + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + study.optimize(objective, n_trials=trials) + completed_trials = [ + t for t in study.trials + if t.state == TrialState.COMPLETE + ] + + if not completed_trials: + logger.debug( + "Theta tuning produced no completed trials. " + "Falling back to default parameters." + ) + return model_kwargs_i + + model_kwargs_i.update({ + "error": study.best_params["error"], + "trend": study.best_params["trend"], + "damped_trend": study.best_params["damped_trend"], + "seasonal": study.best_params["seasonal"], + "seasonal_periods": study.best_params["seasonal_periods"], + "initialization_method": study.best_params["initialization_method"], + }) + + return model_kwargs_i + + def _generate_report(self): + import report_creator as rc + """The method that needs to be implemented on the particular model level.""" + all_sections = [] + + if len(self.models) > 0: + sec5_text = rc.Heading("ETS Model Parameters", level=2) + blocks = [ + rc.Html( + m["model"].summary().as_html(), + label=s_id if self.target_cat_col else None, + ) + for i, (s_id, m) in enumerate(self.models.items()) + ] + sec5 = rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0] + all_sections = [sec5_text, sec5] + + if self.spec.generate_explanations: + try: + # If the key is present, call the "explain_model" method + self.explain_model() + + global_explanation_section, local_explanation_section = self.generate_explanation_report_from_data() + + # Append the global explanation text and section to the "all_sections" list + all_sections = all_sections + [ + global_explanation_section, + local_explanation_section, + ] + except Exception as e: + logger.warning(f"Failed to generate Explanations with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") + + model_description = rc.Text( + "ETS stands for Error, Trend, Seasonal. An ETS forecaster is a classical time-series forecasting model " + "that explains a series using these three components and extrapolates them into the future." + ) + other_sections = all_sections + + return ( + model_description, + other_sections, + ) diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index 262fe5bbc..fc834e501 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -23,6 +23,8 @@ from .ml_forecast import MLForecastOperatorModel from .neuralprophet import NeuralProphetOperatorModel from .prophet import ProphetOperatorModel +from .theta import ThetaOperatorModel +from .ets import ETSOperatorModel class UnSupportedModelError(Exception): @@ -46,6 +48,8 @@ class ForecastOperatorModelFactory: SupportedModels.LGBForecast: MLForecastOperatorModel, SupportedModels.AutoMLX: AutoMLXOperatorModel, SupportedModels.AutoTS: AutoTSOperatorModel, + SupportedModels.Theta: ThetaOperatorModel, + SupportedModels.ETSForecaster: ETSOperatorModel, } @classmethod diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 3019b6839..0c234b906 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -345,19 +345,18 @@ def populate_series_output( f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) from e + start_idx = output_i.shape[0] - self.horizon - len(fit_val) if (output_i.shape[0] - self.horizon) == len(fit_val): - output_i["fitted_value"].iloc[: -self.horizon] = ( - fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon - ) + output_i.loc[output_i.index[ + : -self.horizon], "fitted_value"] = fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon elif (output_i.shape[0] - self.horizon) > len(fit_val): logger.debug( f"Fitted Values were only generated on a subset ({len(fit_val)}/{(output_i.shape[0] - self.horizon)}) of the data for Series: {series_id}." ) - start_idx = output_i.shape[0] - self.horizon - len(fit_val) - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val else: - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val[ - -(output_i.shape[0] - self.horizon) : + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val[ + -(output_i.shape[0] - self.horizon): ] if len(forecast_val) != self.horizon: @@ -365,21 +364,21 @@ def populate_series_output( f"Attempting to set forecast along horizon ({self.horizon}) for series: {series_id}, however forecast is only length {len(forecast_val)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i["forecast_value"].iloc[-self.horizon :] = forecast_val + output_i.loc[output_i.index[-self.horizon:], "forecast_value"] = forecast_val if len(upper_bound) != self.horizon: raise ValueError( f"Attempting to set upper_bound along horizon ({self.horizon}) for series: {series_id}, however upper_bound is only length {len(upper_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.upper_bound_name].iloc[-self.horizon :] = upper_bound + output_i.loc[output_i.index[-self.horizon:], self.upper_bound_name] = upper_bound if len(lower_bound) != self.horizon: raise ValueError( f"Attempting to set lower_bound along horizon ({self.horizon}) for series: {series_id}, however lower_bound is only length {len(lower_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.lower_bound_name].iloc[-self.horizon :] = lower_bound + output_i.loc[output_i.index[-self.horizon:], self.lower_bound_name] = lower_bound self.series_id_map[series_id] = output_i self.verify_series_output(series_id) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 306768ded..78feba86f 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -412,38 +412,7 @@ def _generate_report(self): # If the key is present, call the "explain_model" method self.explain_model() - if not self.target_cat_col: - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( - {"Series 1": self.original_target_column}, - axis=1, - ) - ) - self.formatted_local_explanation.drop( - "Series", axis=1, inplace=True - ) - - # Create a markdown section for the global explainability - global_explanation_section = rc.Block( - rc.Heading("Global Explainability", level=2), - rc.Text( - "The following tables provide the feature attribution for the global explainability." - ), - rc.DataTable(self.formatted_global_explanation, index=True), - ) - - blocks = [ - rc.DataTable( - local_ex_df.drop("Series", axis=1), - label=s_id if self.target_cat_col else None, - index=True, - ) - for s_id, local_ex_df in self.local_explanation.items() - ] - local_explanation_section = rc.Block( - rc.Heading("Local Explanation of Models", level=2), - rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], - ) + global_explanation_section, local_explanation_section = self.generate_explanation_report_from_data() # Append the global explanation text and section to the "all_sections" list all_sections = all_sections + [ diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py new file mode 100644 index 000000000..a54f073fb --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python + +import logging +import traceback +from typing import Dict, Any + +import numpy as np +import optuna +import pandas as pd +from joblib import Parallel, delayed +from optuna.trial import TrialState +from sktime.forecasting.base import ForecastingHorizon +from sktime.forecasting.theta import ThetaForecaster +from sktime.split import ExpandingWindowSplitter +from sktime.transformations.series.detrend import Deseasonalizer + +from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import find_seasonal_period_from_dataset, normalize_frequency +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, _build_metrics_df) +from .base_model import ForecastOperatorBaseModel +from .forecast_datasets import ForecastDatasets, ForecastOutput +from .univariate_model import UnivariateForecasterOperatorModel +from ..const import ( + SupportedModels, DEFAULT_TRIALS, +) + +logging.getLogger("report_creator").setLevel(logging.WARNING) + + +class ThetaOperatorModel(UnivariateForecasterOperatorModel): + """Theta operator model""" + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + self.global_explanation = {} + self.local_explanation = {} + + def set_kwargs(self): + """Prepare kwargs for Theta model from spec. + The operator's 'model_kwargs' is respected. + """ + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) + model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) + model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) + model_kwargs["deseasonalize_model"] = "mul" + model_kwargs["sp"] = self.spec.model_kwargs.get("sp", None) + + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else 1 - model_kwargs[ + "alpha"] + + model_kwargs["interval_width"] = self.spec.confidence_interval_width + return model_kwargs + + def preprocess(self, data, series_id): + self.le[series_id], df_encoded = _label_encode_dataframe( + data, + no_encode={self.spec.datetime_column.name, self.original_target_column}, + ) + return df_encoded.set_index(self.spec.datetime_column.name) + + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): + try: + self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) + data = self.preprocess(df, series_id) + + data_i = self.drop_horizon(data) + target = self.spec.target_column + + freq = self.datasets.get_datetime_frequency() + if freq is not None: + normalized_freq = normalize_frequency(freq) + data_i.index = data_i.index.to_period(normalized_freq) + + y = data_i[target] + X_in = data_i.drop(target, axis=1) + + if model_kwargs["deseasonalize"] and model_kwargs["sp"] is None: + sp, probable_sps = find_seasonal_period_from_dataset(y) + else: + sp, probable_sps = 1, [1] + + model_kwargs["sp"] = model_kwargs.get("sp") or sp + + if not sp or len(y) < 2 * model_kwargs["sp"]: + model_kwargs["deseasonalize"] = False + + # If model already loaded, extract parameters (best-effort) + if self.loaded_models is not None and series_id in self.loaded_models: + previous_res = self.loaded_models[series_id].get("model") + fitted_params = previous_res.get_fitted_params() + model_kwargs["initial_level"] = fitted_params.get("initial_level", None) + elif self.perform_tuning: + model_kwargs = self.run_tuning(y, X_in, model_kwargs, probable_sps) + + # Fit ThetaModel using params + using_additive_deseasonalization = False + additive_deseasonalizer = None + if model_kwargs["deseasonalize"]: + if (y <= 0).any(): + logger.warning( + "Processing data with additive deseasonalization model as data contains negative or zero values which can't be deseasonalized using multiplicative deseasonalization. And ThetaForecaster by default only supports multiplicative deseasonalization.") + model_kwargs["deseasonalize_model"] = "add" + using_additive_deseasonalization = True + additive_deseasonalizer = Deseasonalizer( + sp=model_kwargs["sp"], + model="additive", + ) + y_adj = additive_deseasonalizer.fit_transform(y) + y = y_adj + model_kwargs["deseasonalize"] = False + else: + model_kwargs["deseasonalize_model"] = "" + + model = ThetaForecaster(initial_level=model_kwargs["initial_level"], + deseasonalize=model_kwargs["deseasonalize"], + sp=1 if model_kwargs["deseasonalize_model"] == "add" else model_kwargs.get("sp", + 1), ) + model.fit(y, X=X_in) + + fh = ForecastingHorizon(range(1, self.spec.horizon + 1), is_relative=True) + fh_in_sample = ForecastingHorizon(range(-len(data_i) + 1, 1)) + fitted_vals = model.predict(fh_in_sample) + forecast_values = model.predict(fh) + forecast_range = model.predict_interval(fh=fh, coverage=self.spec.confidence_interval_width) + + if using_additive_deseasonalization and additive_deseasonalizer is not None: + fitted_vals = additive_deseasonalizer.inverse_transform(fitted_vals) + forecast_values = additive_deseasonalizer.inverse_transform(forecast_values) + forecast_range_inv = forecast_range.copy() + for col in forecast_range.columns: + forecast_range_inv[col] = additive_deseasonalizer.inverse_transform( + forecast_range[[col]] + )[col] + forecast_range = forecast_range_inv + + lower = forecast_range[(self.original_target_column, self.spec.confidence_interval_width, "lower")].rename( + "yhat_lower") + upper = forecast_range[(self.original_target_column, self.spec.confidence_interval_width, "upper")].rename( + "yhat_upper") + point = forecast_values.rename("yhat") + forecast = pd.DataFrame( + pd.concat([point, lower, upper], axis=1) + ) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + self.forecast_output.populate_series_output( + series_id=series_id, + fit_val=fitted_vals.values, + forecast_val=forecast["yhat"].values, + upper_bound=forecast["yhat_upper"].values, + lower_bound=forecast["yhat_lower"].values, + ) + self.outputs[series_id] = forecast + self.models[series_id] = {} + self.models[series_id]["model"] = model + self.models[series_id]["model_params"] = model_kwargs + self.models[series_id]["le"] = self.le[series_id] + + params = vars(model).copy() + self.model_parameters[series_id] = { + "framework": SupportedModels.Theta, + **params, + } + + logger.debug("===========Done===========") + + except Exception as e: + self.errors_dict[series_id] = { + "model_name": self.spec.model, + "error": str(e), + "error_trace": traceback.format_exc(), + } + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) + + def _build_model(self) -> pd.DataFrame: + """Build models for all series in parallel and return forecast long format.""" + full_data_dict = self.datasets.get_data_by_series() + self.models = {} + self.outputs = {} + self.explanations_info = {} + model_kwargs = self.set_kwargs() + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width, + horizon=self.spec.horizon, + target_column=self.original_target_column, + dt_column=self.spec.datetime_column.name, + ) + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ThetaOperatorModel._train_model)( + self, i, series_id, df, model_kwargs.copy() + ) + for self, (i, (series_id, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) + + return self.forecast_output.get_forecast_long() + + def run_tuning(self, y: pd.DataFrame, X: pd.DataFrame | None, model_kwargs_i: Dict[str, Any], + probable_sps: list[int]): + + def objective(trial): + y_used = y + X_used = X + + initial_level = model_kwargs_i["initial_level"] + sp = trial.suggest_categorical("sp", probable_sps) + deseasonalize = trial.suggest_categorical("deseasonalize", [True, False]) + deseasonalize_model = trial.suggest_categorical( + "deseasonalize_model", ["add", "mul"] + ) + + if deseasonalize and deseasonalize_model == "mul" and (y_used <= 0).any(): + raise optuna.exceptions.TrialPruned() + d_sp, d_deseasonalize = sp, deseasonalize + if deseasonalize and deseasonalize_model == "add": + additive_deseasonalizer = Deseasonalizer( + sp=sp, + model="additive", + ) + y_used = additive_deseasonalizer.fit_transform(y_used) + d_sp = 1 + d_deseasonalize = False + + model = ThetaForecaster( + initial_level=initial_level, + sp=d_sp, + deseasonalize=d_deseasonalize, + ) + + cv = ExpandingWindowSplitter( + initial_window=50, + step_length=100 + ) + + scores = [] + + for train, test in cv.split(y_used): + y_train = y_used.iloc[train] + y_test = y.iloc[test] + if y_train.isna().any(): + continue + if len(y_train) < 2 * sp: + continue + + X_train = None + X_test = None + + if X_used is not None: + X_train = X_used.iloc[train] + X_test = X_used.iloc[test] + + model.fit(y_train, X=X_train) + fh = ForecastingHorizon(y.index[test], is_relative=False) + y_pred = model.predict(fh, X=X_test) + if y_test.isna().any(): + continue + metrics_df = _build_metrics_df(y_test, y_pred, 0) + metrics_dict = { + k.lower(): v + for k, v in metrics_df[0].to_dict().items() + } + if self.spec.metric.lower() not in metrics_dict: + scores.append(metrics_dict["mape"]) + else: + scores.append(metrics_dict[self.spec.metric.lower()]) + + return np.mean(scores) + + study = optuna.create_study(direction="minimize") + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + study.optimize(objective, n_trials=trials) + completed_trials = [ + t for t in study.trials + if t.state == TrialState.COMPLETE + ] + + if not completed_trials: + logger.debug( + "Theta tuning produced no completed trials. " + "Falling back to default parameters." + ) + return model_kwargs_i + + model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] + model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] + model_kwargs_i["sp"] = study.best_params["sp"] + return model_kwargs_i + + def _generate_report(self): + import report_creator as rc + """The method that needs to be implemented on the particular model level.""" + all_sections = [] + theta_blocks = [] + + for series_id, sm in self.models.items(): + model = sm["model"] + model_kwargs = sm["model_params"] + + fitted_params = model.get_fitted_params() + initial_level = fitted_params.get("initial_level", None) + smoothing_level = fitted_params.get("smoothing_level", None) + sp = model_kwargs.get("sp", 1) + deseasonalize_model = model_kwargs.get("deseasonalize_model", "mul") + desasonalized = model.deseasonalize + n_obs = len(model._y) if hasattr(model, "_y") else "N/A" + + # Date range + if hasattr(model, "_y"): + start_date = model._y.index[0] + end_date = model._y.index[-1] + else: + start_date = "" + end_date = "" + + # ---- Build the DF ---- + meta_df = pd.DataFrame({ + "Metric": [ + "Initial Level", + "Smoothing Level", + "No. Observations", + "Deseasonalized", + "Deseasonalization Method", + "Period (sp)", + "Sample Start", + "Sample End", + ], + "Value": [ + initial_level, + smoothing_level, + n_obs, + desasonalized, + deseasonalize_model, + sp, + start_date, + end_date, + ], + }) + + # ---- Create a block (NOT a section directly) ---- + theta_block = rc.Block( + rc.Heading(f"Theta Model Summary", level=3), + rc.DataTable(meta_df), + label=series_id + ) + + # Add with optional label support + theta_blocks.append( + theta_block + ) + + # ---- Combine into final section like ARIMA example ---- + theta_title = rc.Heading("Theta Model Parameters", level=2) + theta_section = [] + if len(theta_blocks) > 1: + theta_section = rc.Select(blocks=theta_blocks) + elif len(theta_blocks) == 1: + theta_section = theta_blocks[0] + else: + theta_section = rc.Text("No Theta models were successfully trained.") + + all_sections.extend([theta_title, theta_section]) + + if self.spec.generate_explanations: + try: + # If the key is present, call the "explain_model" method + self.explain_model() + + global_explanation_section, local_explanation_section = self.generate_explanation_report_from_data() + + # Append the global explanation text and section to the "all_sections" list + all_sections = all_sections + [ + global_explanation_section, + local_explanation_section, + ] + + except Exception as e: + logger.warning(f"Failed to generate Explanations with error: {e}.") + logger.warning(f"Full Traceback: {traceback.format_exc()}") + + model_description = rc.Text( + "A Theta forecaster is a popular and surprisingly effective time series forecasting" + "method that works by decomposing data into long-term trend and short-term components, forecasting them separately," + "and then combining the results, often outperforming complex models by adjusting the original series' local" + "curvature using a parameter called theta (θ). It's known for its simplicity, speed, and strong performance, " + "especially in forecasting competitions like the M3, where it served as a strong benchmark, often by using" + "Simple Exponential Smoothing (SES) with drift on a modified series" + ) + other_sections = all_sections + + return ( + model_description, + other_sections, + ) diff --git a/ads/opctl/operator/lowcode/forecast/model/univariate_model.py b/ads/opctl/operator/lowcode/forecast/model/univariate_model.py new file mode 100644 index 000000000..30a8ab486 --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/univariate_model.py @@ -0,0 +1,64 @@ +from abc import ABC + +from ads.opctl.operator.lowcode.forecast.model.base_model import ForecastOperatorBaseModel +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from .forecast_datasets import ForecastDatasets +import pandas as pd +import numpy as np + + +class UnivariateForecasterOperatorModel(ForecastOperatorBaseModel, ABC): + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + + def explain_model(self): + """ + Explanation logic for univariate model which do not depend on exogenous variables.: + - Target gets full weight + - All exogenous features get zero weight + """ + + self.local_explanation = {} + global_expl = [] + self.explanations_info = {} + + for series_id, sm in self.models.items(): + model = sm["model"] + if hasattr(model, "_y"): + idx = model._y.index + else: + idx = self.full_data_dict[series_id].index + + df_orig = self.full_data_dict[series_id] + dt_col = self.spec.datetime_column.name + target_col = self.original_target_column + + exog_cols = [c for c in df_orig.columns if c not in {dt_col, target_col}] + + expl_df = pd.DataFrame(index=idx) + expl_df[target_col] = 1.0 + for col in exog_cols: + expl_df[col] = 0.0 + + self.explanations_info[series_id] = expl_df + + local_df = self.get_horizon(expl_df) + local_df["Series"] = series_id + local_df.index.rename(self.dt_column_name, inplace=True) + self.local_explanation[series_id] = local_df + + g_expl = self.drop_horizon(expl_df).mean() + g_expl.name = series_id + global_expl.append(np.abs(g_expl)) + + self.global_explanation = pd.concat(global_expl, axis=1) + self.formatted_global_explanation = ( + self.global_explanation + / self.global_explanation.sum(axis=0) + * 100 + ) + + self.formatted_local_explanation = pd.concat( + self.local_explanation.values() + ) diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index 45690aa57..7e90caca7 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -460,6 +460,8 @@ spec: - autots - auto-select - auto-select-series + - theta + - ets model_kwargs: type: dict diff --git a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst index dc0ee92de..4f8d3847e 100644 --- a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst +++ b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - string - No - prophet - - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select. + - Model to use. Options: prophet, arima, neuralprophet, theta, ets, automlx, autots, auto-select. * - model_kwargs - dict diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index aeb5daa66..59d93d89d 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -33,6 +33,8 @@ "neuralprophet", "autots", "lgbforecast", + "theta", + "ets", "auto-select", "auto-select-series", ] diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 2d69dce9e..268057ae7 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -144,6 +144,8 @@ "neuralprophet", "autots", "lgbforecast", + "theta", + "ets", ] TEMPLATE_YAML = { @@ -816,7 +818,7 @@ def test_date_format(operator_setup, model): @pytest.mark.parametrize("model", MODELS) def test_what_if_analysis(operator_setup, model): os.environ["TEST_MODE"] = "True" - if model in ["auto-select", "lgbforecast"]: + if model in ["auto-select", "lgbforecast", "theta", "ets"]: pytest.skip("Skipping what-if scenario for auto-select") tmpdirname = operator_setup historical_data_path, additional_data_path = setup_small_rossman() diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index 753e324f4..819288d96 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -20,6 +20,8 @@ # "automlx", # FIXME: automlx is failing, no errors "prophet", "neuralprophet", + "theta", + "ets", "auto-select-series", ]