Source code for pymc_marketing.clv.models.beta_geo

#   Copyright 2024 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Beta-Geometric Negative Binomial Distribution (BG/NBD) model for a non-contractual customer population across continuous time."""  # noqa: E501

import warnings
from collections.abc import Sequence

import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import xarray
from pymc.distributions.dist_math import check_parameters
from pymc.util import RandomState
from pytensor.tensor import TensorVariable
from scipy.special import expit, hyp2f1

from pymc_marketing.clv.models.basic import CLVModel
from pymc_marketing.clv.utils import to_xarray
from pymc_marketing.model_config import ModelConfig
from pymc_marketing.prior import Prior


[docs] class BetaGeoModel(CLVModel): r"""Beta-Geometric Negative Binomial Distribution (BG/NBD) model for a non-contractual customer population across continuous time. First introduced by Fader, Hardie & Lee [1]_, with additional predictive methods and enhancements in [2]_ and [3]_. The BG/NBD model assumes dropout probabilities for the customer population are Beta distributed, and time between transactions follows a Gamma distribution while the customer is still active. This model requires data to be summarized by *recency*, *frequency*, and *T* for each customer, using `clv.utils.rfm_summary()` or equivalent. Modeling assumptions require *T >= recency*. Predictive methods have been adapted from the *BetaGeoFitter* class in the legacy *lifetimes* library (see https://github.com/CamDavidsonPilon/lifetimes/). Parameters ---------- data : ~pandas.DataFrame DataFrame containing the following columns: * `customer_id`: Unique customer identifier * `frequency`: Number of repeat purchases * `recency`: Time between the first and the last purchase * `T`: Time between the first purchase and the end of the observation period model_config : dict, optional Dictionary of model prior parameters: * `a_prior`: Shape parameter for time until dropout; defaults to `pymc.HalfFlat()` * `b_prior`: Shape parameter for time until dropout; defaults to `pymc.HalfFlat()` * `alpha_prior`: Scale parameter for time between purchases; defaults to `pymc.HalfFlat()` * `r_prior`: Scale parameter for time between purchases; defaults to `pymc.HalfFlat()` sampler_config : dict, optional Dictionary of sampler parameters. Defaults to *None*. Examples -------- .. code-block:: python from pymc_marketing.prior import Prior from pymc_marketing.clv import BetaGeoModel, rfm_summary # customer identifiers and purchase datetimes # are all that's needed to start modeling data = [ [1, "2024-01-01"], [1, "2024-02-06"], [2, "2024-01-01"], [3, "2024-01-02"], [3, "2024-01-05"], [4, "2024-01-16"], [4, "2024-02-05"], [5, "2024-01-17"], [5, "2024-01-18"], [5, "2024-01-19"], ] raw_data = pd.DataFrame(data, columns=["id", "date"] # preprocess data rfm_df = rfm_summary(raw_data,'id','date') # model_config and sampler_configs are optional model = BetaGeoModel( data=data, model_config={ "r_prior": Prior("Gamma", alpha=0.1, beta=1), "alpha_prior": Prior("Gamma", alpha=0.1, beta=1), "a_prior": Prior("Gamma", alpha=0.1, beta=1), "b_prior": Prior("Gamma", alpha=0.1, beta=1), }, sampler_config={ "draws": 1000, "tune": 1000, "chains": 2, "cores": 2, }, ) # The default 'mcmc' fit_method provides informative predictions # and reliable performance on small datasets model.fit() print(model.fit_summary()) # Maximum a Posteriori can quickly fit a model to large datasets, # but will give limited insights into predictive uncertainty. model.fit(fit_method='map') print(model.fit_summary()) # Predict number of purchases for current customers # over the next 10 time periods expected_purchases = model.expected_purchases(future_t=10) # Predict probability customers are still active probability_alive = model.expected_probability_alive() # Predict number of purchases for a new customer over 't' time periods expected_purchases_new_customer = model.expected_purchases_new_customer(t=10) References ---------- .. [1] Fader, P. S., Hardie, B. G., & Lee, K. L. (2005). “Counting your customers the easy way: An alternative to the Pareto/NBD model." Marketing science, 24(2), 275-284. http://brucehardie.com/papers/018/fader_et_al_mksc_05.pdf .. [2] Fader, P. S., Hardie, B. G., & Lee, K. L. (2008). "Computing P (alive) using the BG/NBD model." http://www.brucehardie.com/notes/021/palive_for_BGNBD.pdf. .. [3] Fader, P. S. & Hardie, B. G. (2013) "Overcoming the BG/NBD Model's #NUM! Error Problem." http://brucehardie.com/notes/027/bgnbd_num_error.pdf. """ # noqa: E501 _model_type = "BG/NBD" # Beta-Geometric Negative Binomial Distribution
[docs] def __init__( self, data: pd.DataFrame, model_config: dict | None = None, sampler_config: dict | None = None, ): self._validate_cols( data, required_cols=["customer_id", "frequency", "recency", "T"], must_be_unique=["customer_id"], ) super().__init__( data=data, model_config=model_config, sampler_config=sampler_config, )
@property def default_model_config(self) -> ModelConfig: """Default model configuration.""" return { "a_prior": Prior("HalfFlat"), "b_prior": Prior("HalfFlat"), "alpha_prior": Prior("HalfFlat"), "r_prior": Prior("HalfFlat"), }
[docs] def build_model(self) -> None: # type: ignore[override] """Build the model.""" coords = {"customer_id": self.data["customer_id"]} with pm.Model(coords=coords) as self.model: a = self.model_config["a_prior"].create_variable("a") b = self.model_config["b_prior"].create_variable("b") alpha = self.model_config["alpha_prior"].create_variable("alpha") r = self.model_config["r_prior"].create_variable("r") def logp(t_x, x, a, b, r, alpha, T): """ Compute the log-likelihood of the BG/NBD model. The log-likelihood expression here aligns with expression (4) from [3] due to the possible numerical instability of expression (3). """ x_non_zero = x > 0 # Refactored for numerical error d1 = ( pt.gammaln(r + x) - pt.gammaln(r) + pt.gammaln(a + b) + pt.gammaln(b + x) - pt.gammaln(b) - pt.gammaln(a + b + x) ) d2 = r * pt.log(alpha) - (r + x) * pt.log(alpha + t_x) c3 = ((alpha + t_x) / (alpha + T)) ** (r + x) c4 = a / (b + x - 1) logp = d1 + d2 + pt.log(c3 + pt.switch(x_non_zero, c4, 0)) return check_parameters( logp, a > 0, b > 0, alpha > 0, r > 0, msg="a, b, alpha, r > 0", ) pm.Potential( "likelihood", logp( x=self.data["frequency"], t_x=self.data["recency"], a=a, b=b, alpha=alpha, r=r, T=self.data["T"], ), )
# TODO: delete this utility after API standardization is completed def _unload_params(self): trace = self.idata.posterior a = trace["a"] b = trace["b"] alpha = trace["alpha"] r = trace["r"] return a, b, alpha, r def _extract_predictive_variables( self, data: pd.DataFrame, customer_varnames: Sequence[str] = (), ) -> xarray.Dataset: """ Extract predictive variables from the data. Utility function assigning default customer arguments for predictive methods and converting to xarrays. """ self._validate_cols( data, required_cols=[ "customer_id", *customer_varnames, ], must_be_unique=["customer_id"], ) a = self.fit_result["a"] b = self.fit_result["b"] alpha = self.fit_result["alpha"] r = self.fit_result["r"] customer_vars = to_xarray( data["customer_id"], *[data[customer_varname] for customer_varname in customer_varnames], ) if len(customer_varnames) == 1: customer_vars = [customer_vars] return xarray.combine_by_coords( ( a, b, alpha, r, *customer_vars, ) )
[docs] def expected_num_purchases( self, customer_id: np.ndarray | pd.Series, t: np.ndarray | pd.Series | TensorVariable, frequency: np.ndarray | pd.Series | TensorVariable, recency: np.ndarray | pd.Series | TensorVariable, T: np.ndarray | pd.Series | TensorVariable, ) -> xarray.DataArray: r"""Compute the expected number of purchases for a customer. This is a deprecated method and will be removed in a future release. Please use `BetaGeoModel.expected_purchases` instead. """ warnings.warn( "Deprecated method. Use 'expected_purchases' instead.", FutureWarning, stacklevel=1, ) t = np.asarray(t) if t.size != 1: t = to_xarray(customer_id, t) T = np.asarray(T) if T.size != 1: T = to_xarray(customer_id, T) # print(customer_id) frequency, recency = to_xarray(customer_id, frequency, recency) a, b, alpha, r = self._unload_params() numerator = 1 - ((alpha + T) / (alpha + T + t)) ** (r + frequency) * hyp2f1( r + frequency, b + frequency, a + b + frequency - 1, t / (alpha + T + t), ) numerator *= (a + b + frequency - 1) / (a - 1) denominator = 1 + (frequency > 0) * (a / (b + frequency - 1)) * ( (alpha + T) / (alpha + recency) ) ** (r + frequency) return (numerator / denominator).transpose( "chain", "draw", "customer_id", missing_dims="ignore" )
[docs] def expected_purchases( self, data: pd.DataFrame | None = None, *, future_t: int | np.ndarray | pd.Series | None = None, ) -> xarray.DataArray: r"""Compute the expected number of future purchases across *future_t* time periods given *recency*, *frequency*, and *T* for each customer. The *data* parameter is only required for out-of-sample customers. Adapted from equation (10) in [1]_, and *lifetimes* package: https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/beta_geo_fitter.py#L201 Parameters ---------- future_t : int, array_like Number of time periods to predict expected purchases. data : ~pandas.DataFrame Optional dataframe containing the following columns: * `customer_id`: Unique customer identifier * `frequency`: Number of repeat purchases * `recency`: Time between the first and the last purchase * `T`: Time between first purchase and end of observation period; model assumptions require T >= recency References ---------- .. [1] Fader, Peter S., Bruce G.S. Hardie, and Ka Lok Lee (2005a), "Counting Your Customers the Easy Way: An Alternative to the Pareto/NBD Model," Marketing Science, 24 (2), 275-84. https://www.brucehardie.com/papers/bgnbd_2004-04-20.pdf """ # noqa: E501 if data is None: data = self.data if future_t is not None: data = data.assign(future_t=future_t) dataset = self._extract_predictive_variables( data, customer_varnames=["frequency", "recency", "T", "future_t"] ) a = dataset["a"] b = dataset["b"] alpha = dataset["alpha"] r = dataset["r"] x = dataset["frequency"] t_x = dataset["recency"] T = dataset["T"] t = dataset["future_t"] numerator = 1 - ((alpha + T) / (alpha + T + t)) ** (r + x) * hyp2f1( r + x, b + x, a + b + x - 1, t / (alpha + T + t), ) numerator *= (a + b + x - 1) / (a - 1) denominator = 1 + (x > 0) * (a / (b + x - 1)) * ( (alpha + T) / (alpha + t_x) ) ** (r + x) return (numerator / denominator).transpose( "chain", "draw", "customer_id", missing_dims="ignore" )
[docs] def expected_probability_alive( self, data: pd.DataFrame | None = None, ) -> xarray.DataArray: r"""Compute the probability a customer with history *frequency*, *recency*, and *T* is currently active. The *data* parameter is only required for out-of-sample customers. Adapted from page (2) in Bruce Hardie's notes [1]_, and *lifetimes* package: https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/beta_geo_fitter.py#L260 Parameters ---------- data : *pandas.DataFrame Optional dataframe containing the following columns: * `customer_id`: Unique customer identifier * `frequency`: Number of repeat purchases * `recency`: Time between the first and the last purchase * `T`: Time between first purchase and end of observation period, model assumptions require T >= recency References ---------- .. [1] Fader, P. S., Hardie, B. G., & Lee, K. L. (2008). Computing P (alive) using the BG/NBD model. http://www.brucehardie.com/notes/021/palive_for_BGNBD.pdf. """ if data is None: data = self.data dataset = self._extract_predictive_variables( data, customer_varnames=["frequency", "recency", "T"] ) a = dataset["a"] b = dataset["b"] alpha = dataset["alpha"] r = dataset["r"] x = dataset["frequency"] t_x = dataset["recency"] T = dataset["T"] log_div = (r + x) * np.log((alpha + T) / (alpha + t_x)) + np.log( a / (b + np.maximum(x, 1) - 1) ) return xarray.where(x == 0, 1.0, expit(-log_div)).transpose( "chain", "draw", "customer_id", missing_dims="ignore" )
[docs] def expected_num_purchases_new_customer(self, *args, **kwargs) -> xarray.DataArray: """Compute the expected number of purchases for a new customer. This is a deprecated method and will be removed in a future release. Please use `BetaGeoModel.expected_purchases_new_customer` instead. """ warnings.warn( "Deprecated method. Use 'expected_purchases_new_customer' instead.", FutureWarning, stacklevel=1, ) self.expected_purchases_new_customer(*args, **kwargs)
[docs] def expected_purchases_new_customer( self, data: pd.DataFrame | None = None, *, t: np.ndarray | pd.Series, ) -> xarray.DataArray: r"""Compute the expected number of purchases for a new customer across *t* time periods. Adapted from equation (9) in [1]_, and `lifetimes` library: https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/beta_geo_fitter.py#L328 Parameters ---------- t : array_like Number of time periods over which to estimate purchases. References ---------- .. [1] Fader, Peter S., Bruce G.S. Hardie, and Ka Lok Lee (2005a), "Counting Your Customers the Easy Way: An Alternative to the Pareto/NBD Model," Marketing Science, 24 (2), 275-84. http://www.brucehardie.com/notes/021/palive_for_BGNBD.pdf """ # TODO: This is extraneous now, but needed for future covariate support. if data is None: data = self.data if t is not None: data = data.assign(t=t) dataset = self._extract_predictive_variables(data, customer_varnames=["t"]) a = dataset["a"] b = dataset["b"] alpha = dataset["alpha"] r = dataset["r"] t = dataset["t"] first_term = (a + b - 1) / (a - 1) second_term = 1 - (alpha / (alpha + t)) ** r * hyp2f1( r, b, a + b - 1, t / (alpha + t) ) return (first_term * second_term).transpose( "chain", "draw", "customer_id", missing_dims="ignore" )
def _distribution_new_customers( self, random_seed: RandomState | None = None, var_names: Sequence[str] = ("population_dropout", "population_purchase_rate"), ) -> xarray.Dataset: with pm.Model(): a = pm.HalfFlat("a") b = pm.HalfFlat("b") alpha = pm.HalfFlat("alpha") r = pm.HalfFlat("r") fit_result = self.fit_result if fit_result.sizes["chain"] == 1 and fit_result.sizes["draw"] == 1: # For map fit add a dummy draw dimension fit_result = self.fit_result.squeeze("draw").expand_dims( draw=range(1000) ) pm.Beta("population_dropout", alpha=a, beta=b) pm.Gamma("population_purchase_rate", alpha=r, beta=alpha) return pm.sample_posterior_predictive( fit_result, var_names=var_names, random_seed=random_seed, ).posterior_predictive
[docs] def distribution_new_customer_dropout( self, random_seed: RandomState | None = None, ) -> xarray.Dataset: """Sample the Beta distribution for the population-level dropout rate. This is the probability that a new customer will "drop out" and make no further purchases. Parameters ---------- random_seed : RandomState, optional Random state to use for sampling. Returns ------- xarray.Dataset Dataset containing the posterior samples for the population-level dropout rate. """ return self._distribution_new_customers( random_seed=random_seed, var_names=["population_dropout"], )["population_dropout"]
[docs] def distribution_new_customer_purchase_rate( self, random_seed: RandomState | None = None, ) -> xarray.Dataset: """Sample the Gamma distribution for the population-level purchase rate. This is the purchase rate for a new customer and determines the time between purchases for any new customer. Parameters ---------- random_seed : RandomState, optional Random state to use for sampling. Returns ------- xarray.Dataset Dataset containing the posterior samples for the population-level purchase rate. """ return self._distribution_new_customers( random_seed=random_seed, var_names=["population_purchase_rate"], )["population_purchase_rate"]