Source code for covid19_inference.models

import datetime
import platform

import theano
import theano.tensor as tt
import numpy as np
import pymc3 as pm

from . import model_helper as mh


if platform.system() == 'Darwin':
    theano.config.gcc.cxxflags = "-Wno-c++11-narrowing" # workaround for pauls macos


[docs]def SIR_with_change_points( new_cases_obs, change_points_list, date_begin_simulation, num_days_sim, diff_data_sim, N, priors_dict=None, weekends_modulated=False, weekend_modulation_type = 'step' ): """ Parameters ---------- new_cases_obs : list or array Timeseries (day over day) of newly reported cases (not the total number) change_points_list : list of dicts List of dictionaries, each corresponding to one change point. Each dict can have the following key-value pairs. If a pair is not provided, the respective default is used. * pr_mean_date_begin_transient : datetime.datetime, NO default * pr_median_lambda : number, same as default priors, below * pr_sigma_lambda : number, same as default priors, below * pr_sigma_date_begin_transient : number, 3 * pr_median_transient_len : number, 3 * pr_sigma_transient_len : number, 0.3 date_begin_simulation: datetime.datetime The begin of the simulation data num_days_sim : integer Number of days to forecast into the future diff_data_sim : integer Number of days that the simulation-begin predates the first data point in `new_cases_obs`. This is necessary so the model can fit the reporting delay. Set this parameter to a value larger than what you expect to find for the reporting delay. N : number The population size. For Germany, we used 83e6 priors_dict : dict Dictionary of the prior assumptions Possible key-value pairs (and default values) are: * pr_beta_I_begin : number, default = 100 * pr_median_lambda_0 : number, default = 0.4 * pr_sigma_lambda_0 : number, default = 0.5 * pr_median_mu : number, default = 1/8 * pr_sigma_mu : number, default = 0.2 * pr_median_delay : number, default = 8 * pr_sigma_delay : number, default = 0.2 * pr_beta_sigma_obs : number, default = 10 * week_end_days : tuple, default = (6,7) * pr_mean_weekend_factor : number, default = 0.7 * pr_sigma_weekend_factor :number, default = 0.17 weekends_modulated : bool Whether to add the prior that cases are less reported on week ends. Multiplies the new cases numbers on weekends by a number between 0 and 1, given by a prior beta distribution. The beta distribution is parametrised by pr_mean_weekend_factor and pr_sigma_weekend_factor weekend_modulation_type : 'step' or 'abs_sine': whether the weekends are modulated by a step function, which only multiplies the days given by week_end_days by the week_end_factor, or whether the whole week is modulated by an abs(sin(x)) function, with an offset with flat prior. Returns ------- : pymc3.Model Returns an instance of pymc3 model with the change points """ if priors_dict is None: priors_dict = dict() default_priors = dict( pr_beta_I_begin=100, pr_median_lambda_0=0.4, pr_sigma_lambda_0=0.5, pr_median_mu=1 / 8, pr_sigma_mu=0.2, pr_median_delay=8, pr_sigma_delay=0.2, pr_beta_sigma_obs=10, week_end_days = (6,7), pr_mean_weekend_factor=0.7, pr_sigma_weekend_factor=0.17 ) default_priors_change_points = dict( pr_median_lambda=default_priors["pr_median_lambda_0"], pr_sigma_lambda=default_priors["pr_sigma_lambda_0"], pr_sigma_date_begin_transient=3, pr_median_transient_len=3, pr_sigma_transient_len=0.3, pr_mean_date_begin_transient=None, ) if not weekends_modulated: del default_priors['week_end_days'] del default_priors['pr_mean_weekend_factor'] del default_priors['pr_sigma_weekend_factor'] for prior_name in priors_dict.keys(): if prior_name not in default_priors: raise RuntimeError(f"Prior with name {prior_name} not known") for change_point in change_points_list: for prior_name in change_point.keys(): if prior_name not in default_priors_change_points: raise RuntimeError(f"Prior with name {prior_name} not known") for prior_name, value in default_priors.items(): if prior_name not in priors_dict: priors_dict[prior_name] = value print(f"{prior_name} was set to default value {value}") for prior_name, value in default_priors_change_points.items(): for i_cp, change_point in enumerate(change_points_list): if prior_name not in change_point: change_point[prior_name] = value print( f"{prior_name} of change point {i_cp} was set to default value {value}" ) if ( diff_data_sim < priors_dict["pr_median_delay"] + 3 * priors_dict["pr_median_delay"] * priors_dict["pr_sigma_delay"] ): print("WARNING: diff_data_sim could be to small compared to the prior delay") if num_days_sim < len(new_cases_obs) + diff_data_sim: raise RuntimeError( "Simulation ends before the end of the data. Increase num_days_sim." ) # ------------------------------------------------------------------------------ # # Model and prior implementation # ------------------------------------------------------------------------------ # with pm.Model() as model: # all pm functions now apply on the model instance # true cases at begin of loaded data but we do not know the real number I_begin = pm.HalfCauchy(name="I_begin", beta=priors_dict["pr_beta_I_begin"]) # fraction of people that are newly infected each day lambda_list = [] lambda_list.append( pm.Lognormal( name="lambda_0", mu=np.log(priors_dict["pr_median_lambda_0"]), sigma=priors_dict["pr_sigma_lambda_0"], ) ) for i, cp in enumerate(change_points_list): lambda_list.append( pm.Lognormal( name=f"lambda_{i + 1}", mu=np.log(cp["pr_median_lambda"]), sigma=cp["pr_sigma_lambda"], ) ) # list of start dates of the transient periods of the change points tr_begin_list = [] dt_before = date_begin_simulation for i, cp in enumerate(change_points_list): dt_begin_transient = cp["pr_mean_date_begin_transient"] if dt_before is not None and dt_before > dt_begin_transient: raise RuntimeError("Dates of change points are not temporally ordered") prior_mean = ( dt_begin_transient - date_begin_simulation).days - 1 # convert the provided date format (argument) into days (a number) tr_begin = pm.Normal( name=f"transient_begin_{i}", mu=prior_mean, sigma=cp["pr_sigma_date_begin_transient"], ) tr_begin_list.append(tr_begin) dt_before = dt_begin_transient # same for transient times tr_len_list = [] for i, cp in enumerate(change_points_list): tr_len = pm.Lognormal( name=f"transient_len_{i}", mu=np.log(cp["pr_median_transient_len"]), sigma=cp["pr_sigma_transient_len"], ) tr_len_list.append(tr_len) # build the time-dependent spreading rate lambda_t_list = [lambda_list[0] * tt.ones(num_days_sim)] lambda_before = lambda_list[0] for tr_begin, tr_len, lambda_after in zip( tr_begin_list, tr_len_list, lambda_list[1:] ): lambda_t = mh.smooth_step_function( start_val=0, end_val=1, t_begin=tr_begin, t_end=tr_begin + tr_len, t_total=num_days_sim, ) * (lambda_after - lambda_before) lambda_before = lambda_after lambda_t_list.append(lambda_t) lambda_t = sum(lambda_t_list) # fraction of people that recover each day, recovery rate mu mu = pm.Lognormal( name="mu", mu=np.log(priors_dict["pr_median_mu"]), sigma=priors_dict["pr_sigma_mu"], ) # delay in days between contracting the disease and being recorded delay = pm.Lognormal( name="delay", mu=np.log(priors_dict["pr_median_delay"]), sigma=priors_dict["pr_sigma_delay"], ) # prior of the error of observed cases sigma_obs = pm.HalfCauchy("sigma_obs", beta=priors_dict["pr_beta_sigma_obs"]) # -------------------------------------------------------------------------- # # training the model with loaded data provided as argument # -------------------------------------------------------------------------- # S_begin = N - I_begin S, I, new_I = _SIR_model( lambda_t=lambda_t, mu=mu, S_begin=S_begin, I_begin=I_begin, N=N ) new_cases_inferred = mh.delay_cases( new_I_t=new_I, len_new_I_t=num_days_sim, len_out=num_days_sim - diff_data_sim, delay=delay, delay_diff=diff_data_sim, ) if weekends_modulated: week_end_factor = pm.Beta('weekend_factor', mu=priors_dict['pr_mean_weekend_factor'], sigma=priors_dict['pr_sigma_weekend_factor']) if weekend_modulation_type == 'step': modulation = np.zeros(num_days_sim - diff_data_sim) for i in range(num_days_sim - diff_data_sim): date_curr = date_begin_simulation + datetime.timedelta(days=i + diff_data_sim + 1) if date_curr.isoweekday() in priors_dict['week_end_days']: modulation[i] = 1 elif weekend_modulation_type == 'abs_sine': offset_rad = pm.VonMises('offset_modulation_rad', mu = 0, kappa = 0.01) offset = pm.Deterministic('offset_modulation', offset_rad/(2*np.pi)*7) t = np.arange(num_days_sim - diff_data_sim) date_begin = date_begin_simulation + datetime.timedelta(days=diff_data_sim + 1) weekday_begin = date_begin.weekday() t -= weekday_begin # Sunday is zero modulation = 1-tt.abs_(tt.sin(t/7 * np.pi + offset_rad/2)) multiplication_vec = np.ones(num_days_sim - diff_data_sim) - (1 - week_end_factor) * modulation new_cases_inferred_eff = new_cases_inferred * multiplication_vec else: new_cases_inferred_eff = new_cases_inferred # likelihood of the model: # observed cases are distributed following studentT around the model. # we want to approximate a Poisson distribution of new cases. # we choose nu=4 to get heavy tails and robustness to outliers. # https://www.jstor.org/stable/2290063 num_days_data = new_cases_obs.shape[-1] pm.StudentT( name="_new_cases_studentT", nu=4, mu=new_cases_inferred_eff[:num_days_data], sigma=tt.abs_(new_cases_inferred[:num_days_data] + 1) ** 0.5 * sigma_obs, # +1 and tt.abs to avoid nans observed=new_cases_obs, ) # add these observables to the model so we can extract a time series of them # later via e.g. `model.trace['lambda_t']` pm.Deterministic("lambda_t", lambda_t) pm.Deterministic("new_cases", new_cases_inferred_eff) pm.Deterministic("new_cases_raw", new_cases_inferred) return model
def _SIR_model(lambda_t, mu, S_begin, I_begin, N): """ Implements the susceptible-infected-recovered model Parameters ---------- lambda_t : ~numpy.ndarray time series of spreading rate, the length of the array sets the number of steps to run the model for mu : number recovery rate S_begin : number initial number of susceptible at first time step I_begin : number initial number of infected N : number population size Returns ------- S : array time series of the susceptible I : array time series of the infected new_I : array time series of the new infected """ new_I_0 = tt.zeros_like(I_begin) def next_day(lambda_t, S_t, I_t, _, mu, N): new_I_t = lambda_t / N * I_t * S_t S_t = S_t - new_I_t I_t = I_t + new_I_t - mu * I_t I_t = tt.clip(I_t, 0, N) # for stability return S_t, I_t, new_I_t # theano scan returns two tuples, first one containing a time series of # what we give in outputs_info : S, I, new_I outputs, _ = theano.scan( fn=next_day, sequences=[lambda_t], outputs_info=[S_begin, I_begin, new_I_0], non_sequences=[mu, N], ) return outputs # ------------------------------------------------------------------------------ # # the more advanced model # ------------------------------------------------------------------------------ #
[docs]def SEIR_with_extensions( new_cases_obs, change_points_list, date_begin_simulation, num_days_sim, diff_data_sim, N, priors_dict=None, with_random_walk=True, weekends_modulated=False, weekend_modulation_type='step' ): """ This model includes 3 extensions to the `SIR_model_with_change_points`: 1. The SIR model now includes a incubation period during which infected people are not infectious, in the spirit of an SEIR model. In contrast to the SEIR model, the length of incubation period is not exponentially distributed but has a lognormal distribution. 2. People that are infectious are observed with a delay that is now lognormal distributed. In the `SIR_model_with_change_points` we assume a fixed delay between infection and observation. 3. `lambda_t` has an additive term given by a Gaussian random walk. Thereby, we want to fit any deviation in `lambda_t` that is not captured by the change points. If the change points are wisely chosen, and the rest of the model captures the dynamics well, one would expect that the amplitude of the random walk is small. In this case, the posterior distribution of `sigma_random_walk` will be small. Parameters ---------- new_cases_obs : list or array Timeseries (day over day) of newly reported cases (not the total number) change_points_list : list of dicts List of dictionaries, each corresponding to one change point Each dict can have the following key-value pairs. If a pair is not provided, the respective default is used. * pr_mean_date_begin_transient: datetime.datetime, NO default * pr_median_lambda: float, default: 0.4 * pr_sigma_lambda: float, default: 0.5 * pr_sigma_begin_transient: float, default: 3 * pr_median_transient_len: float, default: 3 * pr_sigma_transient_len: float, default: 0.3 date_begin_simulation: datetime.datetime. The begin of the simulation data num_days_sim : integer Number of days to forecast into the future diff_data_sim : integer Number of days that the simulation-begin predates the first data point in `new_cases_obs`. This is necessary so the model can fit the reporting delay. Set this parameter to a value larger than what you expect to find for the reporting delay. N : number The population size. For Germany, we used 83e6 priors_dict : dict Dictionary of the prior assumptions Possible key-value pairs (and default values) are: * pr_beta_I_begin : number, default: 100 * pr_beta_E_begin_scale : number, default: 10 * pr_median_lambda_0 : number, default: 2 * pr_sigma_lambda_0 : number, default: 0.7 * pr_median_mu : number, default: 1/3 * pr_sigma_mu : number, default: 0.3 * pr_median_delay : number, default: 5 * pr_sigma_delay : number, default: 0.2 * scale_delay : number, default: 0.3 * pr_beta_sigma_obs : number, default: 10 * pr_sigma_random_walk : number, default: 0.05 * pr_mean_median_incubation : number, default: 5 https://www.ncbi.nlm.nih.gov/pubmed/32150748 https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7014672/ about -1 day compared to the sources day because persons likely become infectious before. * pr_sigma_median_incubation : number, default: 1 The error from the sources above is smaller, but as the -1 day is a very rough estimate, we take here a larger error. * sigma_incubation : number, default: 0.418 https://www.ncbi.nlm.nih.gov/pubmed/32150748 with_random_walk: boolean whether to add a Gaussian walk to `lambda_t`. computationolly expensive Returns ------- : pymc3.Model Returns an instance of pymc3 model with the change points """ if priors_dict is None: priors_dict = dict() default_priors = dict( pr_beta_I_begin=100, pr_beta_E_begin_scale=10, pr_median_lambda_0=2, pr_sigma_lambda_0=0.7, pr_median_mu=1 / 3, pr_sigma_mu=0.3, pr_median_delay=5, pr_sigma_delay=0.2, scale_delay=0.3, pr_beta_sigma_obs=10, pr_sigma_random_walk=0.05, pr_mean_median_incubation=5, # https://www.ncbi.nlm.nih.gov/pubmed/32150748 # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7014672/ # about -1 day because persons likely become infectious before pr_sigma_median_incubation=1, sigma_incubation=0.418, # https://www.ncbi.nlm.nih.gov/pubmed/32150748 week_end_days=(6, 7), pr_mean_weekend_factor=0.7, pr_sigma_weekend_factor=0.17 ) if not with_random_walk: del default_priors["pr_sigma_random_walk"] default_priors_change_points = dict( pr_median_lambda=default_priors["pr_median_lambda_0"], pr_sigma_lambda=default_priors["pr_sigma_lambda_0"], pr_sigma_date_begin_transient=3, pr_median_transient_len=3, pr_sigma_transient_len=0.3, pr_mean_date_begin_transient=None, ) for prior_name in priors_dict.keys(): if prior_name not in default_priors: raise RuntimeError(f"Prior with name {prior_name} not known") for change_point in change_points_list: for prior_name in change_point.keys(): if prior_name not in default_priors_change_points: raise RuntimeError(f"Prior with name {prior_name} not known") for prior_name, value in default_priors.items(): if prior_name not in priors_dict: priors_dict[prior_name] = value print(f"{prior_name} was set to default value {value}") for prior_name, value in default_priors_change_points.items(): for i_cp, change_point in enumerate(change_points_list): if prior_name not in change_point: change_point[prior_name] = value print( f"{prior_name} of change point {i_cp} was set to default value {value}" ) if ( diff_data_sim < priors_dict["pr_median_delay"] + 3 * priors_dict["pr_median_delay"] * priors_dict["pr_sigma_delay"] ): print("WARNING: diff_data_sim could be to small compared to the prior delay") if num_days_sim < len(new_cases_obs) + diff_data_sim: raise RuntimeError( "Simulation ends before the end of the data. Increase num_days_sim." ) with pm.Model() as model: # all pm functions now apply on the model instance # true cases at begin of loaded data but we do not know the real number I_begin = pm.HalfCauchy(name="I_begin", beta=priors_dict["pr_beta_I_begin"]) E_begin_scale = pm.HalfCauchy( name="E_begin_scale", beta=priors_dict["pr_beta_E_begin_scale"] ) new_E_begin = pm.HalfCauchy("E_begin", beta=E_begin_scale, shape=9) # fraction of people that are newly infected each day lambda_list = [] lambda_list.append( pm.Lognormal( name="lambda_0", mu=np.log(priors_dict["pr_median_lambda_0"]), sigma=priors_dict["pr_sigma_lambda_0"], ) ) for i, cp in enumerate(change_points_list): lambda_list.append( pm.Lognormal( name="lambda_{}".format(i + 1), mu=np.log(cp["pr_median_lambda"]), sigma=cp["pr_sigma_lambda"], ) ) # set the start dates of the two periods tr_begin_list = [] dt_before = None for i, cp in enumerate(change_points_list): date_begin_transient = cp["pr_mean_date_begin_transient"] if dt_before is not None and dt_before > date_begin_transient: raise RuntimeError("Dates of change points are not temporally ordered") prior = (date_begin_transient - date_begin_simulation).days - 1 tr_begin = pm.Normal( name="transient_begin_{}".format(i), mu=prior, sigma=cp["pr_sigma_date_begin_transient"], ) tr_begin_list.append(tr_begin) dt_before = date_begin_transient # transient time tr_len_list = [] for i, cp in enumerate(change_points_list): transient_len = pm.Lognormal( name="transient_len_{}".format(i), mu=np.log(cp["pr_median_transient_len"]), sigma=cp["pr_sigma_transient_len"], ) tr_len_list.append(transient_len) # build the time-dependent spreading rate if with_random_walk: sigma_random_walk = pm.HalfNormal( name="sigma_random_walk", sigma=priors_dict["pr_sigma_random_walk"] ) lambda_t_random_walk = pm.distributions.timeseries.GaussianRandomWalk( name="lambda_t_random_walk", mu=0, sigma=sigma_random_walk, shape=num_days_sim, init=pm.Normal.dist(sigma=priors_dict["pr_sigma_random_walk"]), ) lambda_base = lambda_t_random_walk + lambda_list[0] else: lambda_base = lambda_list[0] * tt.ones(num_days_sim) lambda_t_list = [lambda_base] lambda_step_before = lambda_list[0] for tr_begin, transient_len, lambda_step in zip( tr_begin_list, tr_len_list, lambda_list[1:] ): lambda_t = mh.smooth_step_function( start_val=0, end_val=1, t_begin=tr_begin, t_end=tr_begin + transient_len, t_total=num_days_sim, ) * (lambda_step - lambda_step_before) lambda_step_before = lambda_step lambda_t_list.append(lambda_t) lambda_t = sum(lambda_t_list) # fraction of people that recover each day, recovery rate mu mu = pm.Lognormal( name="mu", mu=np.log(priors_dict["pr_median_mu"]), sigma=priors_dict["pr_sigma_mu"], ) # delay in days between contracting the disease and being recorded delay = pm.Lognormal( name="delay", mu=np.log(priors_dict["pr_median_delay"]), sigma=priors_dict["pr_sigma_delay"], ) # prior of the error of observed cases sigma_obs = pm.HalfCauchy( name="sigma_obs", beta=priors_dict["pr_beta_sigma_obs"] ) # -------------------------------------------------------------------------- # # training the model with loaded data provided as argument # -------------------------------------------------------------------------- # median_incubation = pm.Normal( name="median_incubation", mu=priors_dict["pr_mean_median_incubation"], sigma=priors_dict["pr_sigma_median_incubation"], ) # sources: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7014672/ # S_begin = N - I_begin S_t, new_E_t, I_t, new_I_t = _SEIR_model_with_delay( lambda_t=lambda_t, mu=mu, S_begin=S_begin, new_E_begin=new_E_begin, I_begin=I_begin, N=N, median_incubation=median_incubation, sigma_incubation=0.418, # https://www.ncbi.nlm.nih.gov/pubmed/32150748 ) new_cases_inferred = mh.delay_cases_lognormal( input_arr=new_I_t, len_input_arr=num_days_sim, len_output_arr=num_days_sim - diff_data_sim, median_delay=delay, scale_delay=priors_dict["scale_delay"], delay_betw_input_output=diff_data_sim, ) if weekends_modulated: week_end_factor = pm.Beta('weekend_factor', mu=priors_dict['pr_mean_weekend_factor'], sigma=priors_dict['pr_sigma_weekend_factor']) if weekend_modulation_type == 'step': modulation = np.zeros(num_days_sim - diff_data_sim) for i in range(num_days_sim - diff_data_sim): date_curr = date_begin_simulation + datetime.timedelta(days=i + diff_data_sim + 1) if date_curr.isoweekday() in priors_dict['week_end_days']: modulation[i] = 1 elif weekend_modulation_type == 'abs_sine': offset_rad = pm.VonMises('offset_modulation_rad', mu = 0, kappa = 0.01) offset = pm.Deterministic('offset_modulation', offset_rad/(2*np.pi)*7) t = np.arange(num_days_sim - diff_data_sim) date_begin = date_begin_simulation + datetime.timedelta(days=diff_data_sim + 1) weekday_begin = date_begin.weekday() t -= weekday_begin # Sunday is zero modulation = 1-tt.abs_(tt.sin(t/7 * np.pi + offset_rad/2)) multiplication_vec = np.ones(num_days_sim - diff_data_sim) - (1 - week_end_factor) * modulation new_cases_inferred_eff = new_cases_inferred * multiplication_vec else: new_cases_inferred_eff = new_cases_inferred # likelihood of the model: # observed cases are distributed following studentT around the model. # we want to approximate a Poisson distribution of new cases. # we choose nu=4 to get heavy tails and robustness to outliers. # https://www.jstor.org/stable/2290063 num_days_data = new_cases_obs.shape[-1] pm.StudentT( name="_new_cases_studentT", nu=4, mu=new_cases_inferred_eff[:num_days_data], sigma=tt.abs_(new_cases_inferred[:num_days_data] + 1) ** 0.5 * sigma_obs, # +1 and tt.abs to avoid nans observed=new_cases_obs, ) # add these observables to the model so we can extract a time series of them # later via e.g. `model.trace['lambda_t']` pm.Deterministic("lambda_t", lambda_t) pm.Deterministic("new_cases", new_cases_inferred_eff) pm.Deterministic("new_cases_raw", new_cases_inferred) return model
def _SEIR_model_with_delay( lambda_t, mu, S_begin, new_E_begin, I_begin, N, median_incubation=5, sigma_incubation=0.418, ): """ """ x = np.arange(1, 9) beta = mh.tt_lognormal(x, tt.log(median_incubation), sigma_incubation) new_I_0 = tt.zeros_like(I_begin) def next_day( lambda_t, S_t, nE1, nE2, nE3, nE4, nE5, nE6, nE7, nE8, I_t, _, mu, beta, N ): new_E_t = lambda_t / N * I_t * S_t S_t = S_t - new_E_t new_I_t = ( beta[0] * nE1 + beta[1] * nE2 + beta[2] * nE3 + beta[3] * nE4 + beta[4] * nE5 + beta[5] * nE6 + beta[6] * nE7 + beta[7] * nE8 ) I_t = I_t + new_I_t - mu * I_t I_t = tt.clip(I_t, 0.001, N) # for stability # I_t = tt.nnet.sigmoid(I_t/N)*N # S_t = tt.clip(S_t, -1, N+1) return S_t, new_E_t, I_t, new_I_t outputs, _ = theano.scan( fn=next_day, sequences=[lambda_t], outputs_info=[ S_begin, dict(initial=new_E_begin, taps=[-1, -2, -3, -4, -5, -6, -7, -8]), I_begin, new_I_0, ], non_sequences=[mu, beta, N], ) return outputs