Source code for pymc_marketing.mmm.budget_optimizer

#   Copyright 2024 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Budget optimization module."""

import warnings
from typing import Any

import numpy as np
from pydantic import BaseModel, ConfigDict, Field
from scipy.optimize import minimize

from pymc_marketing.mmm.components.adstock import AdstockTransformation
from pymc_marketing.mmm.components.saturation import SaturationTransformation


class MinimizeException(Exception):
    """Custom exception for optimization failure."""

    def __init__(self, message: str):
        super().__init__(message)


[docs] class BudgetOptimizer(BaseModel): """A class for optimizing budget allocation in a marketing mix model. The goal of this optimization is to maximize the total expected response by allocating the given budget across different marketing channels. The optimization is performed using the Sequential Least Squares Quadratic Programming (SLSQP) method, which is a gradient-based optimization algorithm suitable for solving constrained optimization problems. For more information on the SLSQP algorithm, refer to the documentation: https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html Parameters ---------- adstock : AdstockTransformation The adstock class. saturation : SaturationTransformation The saturation class. num_periods : int The number of time units. parameters : dict A dictionary of parameters for each channel. adstock_first : bool, optional Whether to apply adstock transformation first or saturation transformation first. Default is True. """ adstock: AdstockTransformation = Field( ..., description="The adstock transformation class." ) saturation: SaturationTransformation = Field( ..., description="The saturation transformation class." ) num_periods: int = Field( ..., gt=0, description="The number of time units at time granularity which the budget is to be allocated.", ) parameters: dict[str, dict[str, dict[str, float]]] = Field( ..., description="A dictionary of parameters for each channel." ) scales: np.ndarray = Field( ..., description="The scale parameter for each channel variable" ) adstock_first: bool = Field( True, description="Whether to apply adstock transformation first or saturation transformation first.", ) model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] def objective(self, budgets: list[float]) -> float: """Calculate the total response during a period of time given the budgets. It considers the saturation and adstock transformations. Parameters ---------- budgets : list[float] The budgets for each channel. Returns ------- float The negative total response value. """ total_response = 0 first_transform, second_transform = ( (self.adstock, self.saturation) if self.adstock_first else (self.saturation, self.adstock) ) for idx, (_channel, params) in enumerate(self.parameters.items()): budget = budgets[idx] / self.scales[idx] first_params = ( params["adstock_params"] if self.adstock_first else params["saturation_params"] ) second_params = ( params["saturation_params"] if self.adstock_first else params["adstock_params"] ) spend = np.full(self.num_periods, budget) spend_extended = np.concatenate([spend, np.zeros(self.adstock.l_max)]) transformed_spend = second_transform.function( x=first_transform.function(x=spend_extended, **first_params), **second_params, ).eval() total_response += np.sum(transformed_spend) return -total_response
[docs] def allocate_budget( self, total_budget: float, budget_bounds: dict[str, tuple[float, float]] | None = None, custom_constraints: dict[Any, Any] | None = None, minimize_kwargs: dict[str, Any] | None = None, ) -> tuple[dict[str, float], float]: """Allocate the budget based on the total budget, budget bounds, and custom constraints. The default budget bounds are (0, total_budget) for each channel. The default constraint is the sum of all budgets should be equal to the total budget. The optimization is done using the Sequential Least Squares Quadratic Programming (SLSQP) method and it's constrained such that: 1. The sum of budgets across all channels equals the total available budget. 2. The budget allocated to each individual channel lies within its specified range. The purpose is to maximize the total expected objective based on the inequality and equality constraints. Parameters ---------- total_budget : float The total budget. budget_bounds : dict[str, tuple[float, float]], optional The budget bounds for each channel. Default is None. custom_constraints : dict, optional Custom constraints for the optimization. Default is None. minimize_kwargs : dict, optional Additional keyword arguments for the `scipy.optimize.minimize` function. If None, default values are used. Method is set to "SLSQP", ftol is set to 1e-9, and maxiter is set to 1_000. Returns ------- tuple[dict[str, float], float] The optimal budgets for each channel and the negative total response value. Raises ------ Exception If the optimization fails, an exception is raised with the reason for the failure. """ if budget_bounds is None: budget_bounds = {channel: (0, total_budget) for channel in self.parameters} warnings.warn( "No budget bounds provided. Using default bounds (0, total_budget) for each channel.", stacklevel=2, ) elif not isinstance(budget_bounds, dict): raise TypeError("`budget_bounds` should be a dictionary.") if custom_constraints is None: constraints = {"type": "eq", "fun": lambda x: np.sum(x) - total_budget} warnings.warn( "Using default equality constraint: The sum of all budgets should be equal to the total budget.", stacklevel=2, ) elif not isinstance(custom_constraints, dict): raise TypeError("`custom_constraints` should be a dictionary.") else: constraints = custom_constraints num_channels = len(self.parameters.keys()) initial_guess = np.ones(num_channels) * total_budget / num_channels bounds = [ ( (budget_bounds[channel][0], budget_bounds[channel][1]) if channel in budget_bounds else (0, total_budget) ) for channel in self.parameters ] if minimize_kwargs is None: minimize_kwargs = { "method": "SLSQP", "options": {"ftol": 1e-9, "maxiter": 1_000}, } result = minimize( fun=self.objective, x0=initial_guess, bounds=bounds, constraints=constraints, **minimize_kwargs, ) if result.success: optimal_budgets = { name: budget for name, budget in zip(self.parameters.keys(), result.x, strict=False) } return optimal_budgets, -result.fun else: raise MinimizeException(f"Optimization failed: {result.message}")