Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

Python 3.4 code Infectious Disease Simulation The transmission probability is it

ID: 3761007 • Letter: P

Question

Python 3.4 code

Infectious Disease Simulation

The transmission probability is itself calculated as a product of probabilities, where these probabilities reflect many different factors. More specifically:

(1) Each disease has an inherent transmissitivity parameter t that reflects how intrinsically easy the disease is to transmit.

(2) Each agent has an inherent susceptibility parameter s that reflects the resistance of the agent.

(3) Each agent has an inherent vaccination state v that reflects how protected, if at all, the agent is by vaccination.

Whether or not transmission occurs depends on these factors as well as the duration and proximity of the contact between agents as well as the current agent state (e.g., first day of exposed state, fourth day of infected state, etc.) as this affects the amount of infectious material shed by an agent. Disease transmission is inherently complex! For example, it’s actually much more difficult to catch the flu than to catch the mumps; the flu virus is transmitted by someone coughing or sneezing, and these kinds of contact only occur if the agents are within 1 or 2 meters of each other. On the other hand, the mumps virus or the 1

Your initial implementation consists of three classes and their associated methods

class Disease():

Note: the signature of the constructor should read __init__(self, name=’influenza’, t=0.95, E=2, I=7, r=0.0).

Objects in this class describe a particular disease, like influenza, the measles, or rubella. The initialization parameters correspond to the disease name, its transmissivity t, the number of days in the exposed state E, the number of days in the infected state I , and the probability that recovery confers lifetime immunity r, respectively (these default values are reasonable values for modeling the flu).

class Agent():

Note: the signature of the constructor should read __init__(self, s=0.99).

Objects in the class model the individual agents in the simulation. The only initialization parameter is the intrinsic susceptibility of the agent to disease; a higher number may indicate a frail elderly person or someone with a compromised immune system (both highly susceptible) while a lower number may indicate a healthy young adult in peak physical condition. You’ll also need a few additional variables which are not set at initialization, like the agent’s vaccination state, v, and some sort of internal state indicating if the agent is sick or not. This last requirement is easily met by keeping an internal state counter initially set to -1, indicating that the agent is initially in the susceptible state. When the agent is infected, the counter is set to I+E (parameters of the disease), and then decremented on each successive day of the simulation until it reaches 0, the recovered state. At this point, depending on the probability that lifetime immunity to the disease is achieved upon recovery, the counter may be reset to -1, the susceptible state. Your agent class will also require a few methods. The first method, vaccinate(self, v) sets the agent’s internal v variable to indicate that some immunity to the disease has been instantly conferred by immunization. We’ll use this variable to model how quickly we can dampen how fast the disease spreads by starting to innoculate the herd: v close to 0 indicates the vaccine is very effective, while v close to 1 indicates very little protection. By default, v should be 1.0, indicating that no vaccination has taken place.

The second method, infect(self, other, disease) gives agent other the chance to infect agent self with disease. Of course, first you’ll need to check if other is infective, if self is susceptible, vaccinated, etc.

How can we model people who die from the disease? Even influenza can be fatal; indeed, there about 35,000 to 40,000 influenza deaths in the USA each year! Interestingly enough, in our simulation, a dead agent is no different from an agent with lifetime immunity; unlike a dead agent, a recovered but immune agent may hang around, but like a dead agent, they will never again make anyone else sick. This means we can factor the death rate into the per- manent immunity/recovery probability, r.

before you ‘‘roll the die’’ to determine if the infection is transmitted. If successful, update self’s internal state counter as described above. The general idea here is that, if self is susceptible and other is either exposed or infected, we want to infect self with probability self . s * self . v * disease. t, and adjust self’s internal state accordingly. 2 Also, be careful not to interpret this method the other way around — since only one agent’s internal state is changed, it makes more self to represent that agent, and therefore the agent that is being infected. This better fits with the notion of encapsulation that lies at the heart of the object-oriented worldview.

The third method, state(self), returns True if self internal state corresponds to exposed or infected, and False otherwise. The fourth method, update(self), is inv oked on each agent every day, and is used to update self’s internal state indicating another day has elapsed, putting self one step closer to recovery if they are exposed or

infected.

class Simulation():

Note: the signature of the constructor should read __init__(self, D=500).

The final class represents a simple simulation that runs at most D days of simulation (the simulation may terminate early if there are no longer any infected agents remaining). The class must obviously have some internal variables that contain, e.g., a list of instances of Agent that are in the simulation and the instance of Disease that is being simulated. Other variables may also be necessary, such as, for example, a mixing coefficient m that indicates the probability any one agent will encounter any other agent on any particular day. Interactions with instances of this class occur via the following methods.

The first method, join(self, agent) adds agent to the simulation.

The second method, introduce(self, disease) adds disease to the simulation.

The third method, run(self) starts the simulation. The simulation runs for at most self.D days, but may terminate early if there are no longer any infected agents. The value returned should be a list of tuples (N e, N i ) corresponding to daily snapshots of the simulation, where N e is the number of exposed agents and N i is the number of infected agents on the corresponding day (the list returned will contain at most D tuples).

Explanation / Answer

import networkx
import random
import math
from networkx import *
from epidemic_code import *
N=100000 #set network size
G=fast_gnp_random_graph(N, 4./(N-1.0)) #create network expected degree 4
EPN = create_EPN_fixed_transmissiblity(G,0.8) #create (directed) EPN with T=0.8
[P,A] = get_prob_and_size(EPN) #find P and A. Since T
#is constant, they will
#be almost identical.
[infection_curve,times]= create_epidemic_curve(EPN,23) #find the epidemic curve
for an epidemic starting at node 23. Since no recovery times are specified, it
assumes recovery happens at time after one unit of time. The default EPN
created has weight 1 for each edge, so it also assumes that infection happens
after one unit of time.
changes:
v0.1 -> v0.2
corrected bug referencing edge[2] in output_EPN and output_dendrogram
corrected but in fixed_rec_exp_inf_infection which did not give correct
infection duration and also another that had a time2infect1, rather than
time2infect.
modified EPN creation routine to allow directed networks as input.
removed I and S (and similar) and replaced them with node attributes:
node attributes:
infection_duration
type
edge attributes:
time_to_infection
type
to do this, eliminated PIS which created dicts mapping node to I and S
and replaced with type_assignment which create a dict for each node giving I and
S or other appropriate vars
have changed 'parameters' from a list to a dict.
"""
###### EPN CREATION CODE
#We start with code for creating EPNs
def create_EPN(G,type_assignment,attempt_infection,parameters):
"""
Creates an EPN from the graph or DiGraph G, using various rules we
might want to apply to the infection process. It returns just the EPN.
G: the underlying network on which the epidemic spreads
type_assignment: A generic function which takes EPN, a node name, and any
parameters and then adds the node to the EPN with appropriate attributes:
e.g., duration of infection, infectiousness, susceptibility, type, etc.
attempt_infection: A function of the form
attempt_infection(u,v,I,S,parameters) where I and S are
dictionaries giving I[u] and S[v], the infectiousness and
susceptibility of u and v respectively. It then determines
whether u will infect v, returning [True,time] if so and [False]
otherwise (the only important part of the second result is that
the first entry evaluates to False). Here time is the time it
takes for infection to happen. If this is unimportant, it can be
set to 1.
parameters: Any parameters that type_assignment and attempt_infection might
need. This is a dict
return_weights: optional argument. If True then returns [EPN,I,S]. If False or
unspecified, just returns EPN
A number of routines have been developed that use this to create EPNs:
create_EPN_fixed_transmissibility(G,T)
create an EPN with constant transmissibility on the graph G.
create_EPN_exponential_rec_and_inf(G,gamma,beta)
create an EPN with constant recovery rate gamma and constant
infection rate beta.
create_EPN_fixed_recovery_exponential_inf(G,tau,beta)
create an EPN where everyone recovers after tau units of time
and infectiousness is constant at rate beta.
"""
if type(G).__name__ not in ['Graph' , 'DiGraph', 'MultiGraph']:
#not sure if algorithm works if other graph type used.
raise networkx.NetworkXError("Bad type %s for input
network"%type(G).__name__)
# if type(G).__name__ == 'MultiGraph':
# print 'warning, received %s, proceeding as normal'%type(G).__name__
EPN=networkx.DiGraph(weighted=True) #will give error if using
#networkx prior to 0.99
node_assignment(G,EPN,type_assignment,parameters)
edge_assignment(G,EPN,parameters,attempt_infection) #need to send G so that
we can grab any edge attributes.
return EPN
def create_EPN_weighted_edges(G,type_assignment,attempt_infection,parameters):
"""
The new structure of create_EPN allows weighted edges. So I'm just keeping
this code for compatibility reasons.
"""
print "create_EPN_weighted_edges is obsolete - use create_EPN"
EPN = create_EPN(G,type_assignment,attempt_infection,parameters)
return EPN
def create_EPN_preassigned_weights(G,I,S,attempt_infection,parameters):
print "create_EPN_preassigned_weights is obsolete - use create_EPN"
EPN = create_EPN(G,type_assignment,attempt_infection,parameters)
return EPN
def node_assignment(G,EPN,type_assignment,parameters):
nodes = G.nodes_iter()
for node in nodes:
type_assignment(EPN,node,G.node[node],parameters)
def edge_assignment(G,EPN,parameters,attempt_infection):
edges = G.edges_iter()
if type(G).__name__ == 'DiGraph':
for edge in edges:
attempt_infection(EPN,edge[0],edge[1],G.get_edge_data(edge[0],edge[1]),parameter
s)
elif type(G).__name__ in ['Graph', 'MultiGraph']:
for edge in edges:
attempt_infection(EPN,edge[0],edge[1],G.get_edge_data(edge[0],edge[1]),parameter
s)
attempt_infection(EPN,edge[1],edge[0],G.get_edge_data(edge[1],edge[0]),parameter
s)
else:
raise networkx.NetworkXError("Bad type %s for input
network"%type(G).__name__)

def exp_rec_and_inf_type_assignment(EPN,node,node_data,parameters):
gamma = parameters['gamma']
tau = random.expovariate(gamma)
EPN.add_node(node,infection_duration=tau)
EPN.node[node].update(node_data)
def exp_rec_and_inf_attempt_inf(EPN,node0,node1,edge_data,parameters):
beta = parameters['beta']
time2infect = random.expovariate(beta)
if time2infect<EPN.node[node0]['infection_duration']:
EPN.add_edge(node0,node1,time_to_infection=time2infect)
EPN[node0][node1].update(edge_data)
def create_EPN_exponential_rec_and_inf(G,gamma,beta):

parameters = {'gamma':gamma, 'beta':beta}
return
create_EPN(G,exp_rec_and_inf_type_assignment,exp_rec_and_inf_attempt_inf,paramet
ers)
def fixed_trans_type_assignment(EPN,node,node_data,parameters):
EPN.add_node(node,infection_duration=1)
EPN.node[node].update(node_data)
def fixed_trans_infection(EPN, node0, node1,edge_data,parameters):
T=parameters['transmissibility']
if random.random()<T:
EPN.add_edge(node0,node1,time_to_infection=1)
EPN[node0][node1].update(edge_data)
def create_EPN_fixed_transmissibility(G,T):

parameters = {'transmissibility':T}
return
create_EPN(G,fixed_trans_type_assignment,fixed_trans_infection,parameters)
#epidemics with fixed recovery time and constant infection rate --- this is a
special case of fixed transmissibility, but allows us to assign an infection
time
def fixed_rec_exp_inf_type_assignment(EPN,node,node_data,parameters):
duration = parameters['infection_duration']
EPN.add_node(node,infection_duration=duration)
EPN.node[node].update(node_data)
def fixed_rec_exp_inf_infection(EPN, node0, node1,edge_data,parameters):
duration = parameters['infection_duration']
beta = parameters['beta']
time2infect = random.expovariate(beta)
if time2infect<duration:
EPN.add_edge(node0,node1,time_to_infection=time2infect)
EPN[node0][node1].update(edge_data)
def create_EPN_fixed_recovery_exponential_inf(G,tau,beta):

#return_value is either just EPN or it is [EPN, I, S]
parameters = {'infection_duration':tau,'beta':beta}
return
create_EPN(G,fixed_rec_exp_inf_type_assignment,fixed_rec_exp_inf_infection,param
eters)
#shown by trapman to give lower bound for probability in case where
susceptibility is homogeneous. I suspect it's also the lower bound if
susceptibility allowed to vary.
def extreme_het_type_assignment(EPN,node,node_data,parameters):
EPN.add_node(node,rel_infectiousness=random.random(),rel_susceptibility=random.r
andom())
EPN.node[node].update(node_data)
def extreme_het_inf_infection(EPN, node0, node1,edge_data,parameters):
T = parameters['transmissibility']
if EPN.node[node0]['rel_infectiousness']<T:
EPN.add_edge(node0,node1,time_to_infection=1)
EPN[node0][node1].update(edge_data)
def extreme_het_sus_infection(EPN,node0,node1,edge_data,parameters):
T = parameters['transmissibility']
if EPN.node[node1]['rel_susceptibility']<T:
EPN.add_edge(node0,node1,time_to_infection=1)
EPN[node0][node1].update(edge_data)
def create_EPN_extreme_het_inf(G,T):
parameters={'transmissibility':T}
return create_EPN(G, extreme_het_type_assignment, extreme_het_inf_infection,
parameters)
def create_EPN_extreme_het_sus(G,T):
parameters={'transmissibility':T}
return create_EPN(G, extreme_het_type_assignment, extreme_het_sus_infection,
parameters)
def get_prob_and_size(EPN):
"""
Calculates the probability and attack rate of epidemics by finding
the relative size of the in-component of the largest strongly
connected component (this is the probability of an epidemic P) and
finding the relative size of the out-component of the largest
strongly connected component (this is the attack rate A).
Note that these in- and out-components include the strongly
connected component.
Warning - this returns the sizes for the largest
strongly-connected-component, whether or not it is a giant
component.
Returns [P,A]
"""
N=EPN.order()
scc_list = networkx.strongly_connected_components(EPN)
start_node = scc_list[0][0]
out_component = networkx.dfs_preorder(EPN,start_node)
in_component = networkx.dfs_preorder(EPN,start_node,reverse_graph = True)
A = len(out_component)*1.0/N
P = len(in_component)*1.0/N
return [P,A]
def create_epidemic_curve(EPN, source=None, cum_inc = False, stratify = False):
#this is effectively Dijstra's algorithm, with additional info on the recovery
times. It returns a dictionary with the times that recovery/infection occurs,
and the current number infected at that time. If source is None, it finds a
random source from which the largest scc in EPN can be reached. If cum_inc is
True, then rather than just giving number infected, it also returns cumulative
incidence. If you want to know number still susceptible, just subtract
cumulative infections from population.
events = {}
if source == None:
scc_list = networkx.strongly_connected_components(EPN)
start_node = scc_list[0][0]
in_component = networkx.dfs_preorder(EPN,start_node,reverse_graph =
True)
source = random.choice(in_component)
"""WARNING WARNING: need to edit single_source_dijkstra to accept
weight='weight' as an optional argument. Then change all occurrences of
'weight' to weight"""
[distances,paths] = networkx.single_source_dijkstra(EPN, source, weight =
'time_to_infection')
for node in distances.keys(): #key is node, distance[key] is distance
following node.
events[distances[node]] = events.get(distances[node],0)+1
if cum_inc: #if also returning cumulative incidence, not just infection
curve.
infections = {}
for node in distances.keys():
infections[distances[node]] = infections.get(distances[node],0)+1
cum_times = infections.keys()
cum_times.sort()
cumulative_curve = [0]
for time in cum_times:
newvalue = cumulative_curve[-1]+infections[time]
cumulative_curve.append(newvalue)
cumulative_curve.pop(0)
for node in distances.keys():
events[distances[node]+EPN.node[node].get('infection_duration',1)] =
events.get(distances[node]+EPN.node[node].get('infection_duration',1),0)-1
tmp = events.items()
tmp.sort()
infection_curve = []
times=[]
current_count = 0
for event in tmp:
current_count += event[1]
infection_curve.append(current_count)
times.append(event[0])
if cum_inc:
return [infection_curve,times,cumulative_curve,cum_times]
else:
return [infection_curve,times]
def create_epidemic_curve_stratified(EPN, source=None, cum_inc = False): #this
is effectively Dijstra's algorithm, with additional info on the recovery times.
It returns a dictionary with the times that recovery/infection occurs, and the
current number infected at that time. If source is None, it finds a random
source from which the largest scc in EPN can be reached. If cum_inc is True,
then rather than just giving number infected, it also returns cumulative
incidence. If stratify is true, it also returns stratifications by 'type'. If
you want to know number still susceptible, just subtract cumulative infections
from population.
events = {}
if source == None:
scc_list = networkx.strongly_connected_components(EPN)
start_node = scc_list[0][0]
in_component = networkx.dfs_preorder(EPN,start_node,reverse_graph =
True)
source = random.choice(in_component)
"""WARNING WARNING: need to edit single_source_dijkstra to accept
weight='weight' as an optional argument. Then change all occurrences of
'weight' to weight"""
[distances,paths] = networkx.single_source_dijkstra(EPN, source, weight =
'time_to_infection')
for node in distances.keys(): #key is node, distance[key] is distance
following node.
type = EPN.node[node]['type']
if not events.has_key(type):
events[type]={}
events[type][distances[node]] = events[type].get(distances[node],0)+1
if cum_inc: #if also returning cumulative incidence, not just infection
curve.
infections = {}
for node in distances.keys():
type = EPN.node[node]['type']
if not infections.has_key(type):
infections[type][distances[node]] =
infections[type].get(distances[node],0)+1
cum_times={}
cumulative_curve={}
for type in infections.keys():
cum_times[type] = infections.keys()
cum_times[type].sort()
cumulative_curve[type] = [0]
for time in cum_times[type]:
newvalue = cumulative_curve[-1]+infections[time]
cumulative_curve[type].append(newvalue)
cumulative_curve[type].pop(0)
for node in distances.keys():
type = G[node]['type']
events[type][distances[node]+EPN.node[node].get('infection_duration',1)]
=
events[type].get(distances[node]+EPN.node[node].get('infection_duration',1),0)-1
tmp = {}
for type in events.keys():
tmp[type] = events[type].items()
tmp[type].sort()
infection_curve = {}
times={}
for type in tmp.keys():
current_count = 0
infection_curve[type]=[]
times[type]=[]
for event in tmp[type]:
current_count += event[1]
infection_curve[type].append(current_count)
times[type].append(event[0])
if cum_inc:
return [infection_curve,times,cumulative_curve,cum_times]
else:
return [infection_curve,times]
##### MAKE PREDICTIONS - ANALYTIC #####
#predictions assume configuration model type networks.
#
# Method basically follows J C Miller: Epidemic size and probability
# in populations with heterogeneous infectiousness and susceptibility.
def get_Pk(G):
P = {}
order = G.order()
inv_order = 1./order
for node in G.nodes_iter():
k = G.degree(node)
P[k] = P.get(k,0) + inv_order
return P
def simple_fixed_trans_prob_size_prediction(G,T):
Pk = get_Pk(G)
[P,A]= fixed_trans_pgf_probsize_prediction(T,Pk)
return [P,A]
def fixed_trans_pgf_probsize_prediction(T,Pk,iterations=1000):
x=0
PTo={}
PTo[T]=1
for counter in range(iterations):
x=h(PTo,Pk,x)
P = 1- f(PTo,Pk,x)
return [P,P]
def theta(T,x):
return 1 - T + T*x
def f(PT,Pk,x):
fx = 0
for T in PT.keys():
tmp = 0
for k in Pk.keys():
# tmp = tmp + (1+T*(x-1))**k*Pk[k]
tmp = tmp + theta(T,x)**k*Pk[k]
fx += PT[T]*tmp
return fx
def h(PT,Pk,x):
avek=0
for k in Pk.keys():
avek+= k*Pk[k]
hx = 0
for T in PT.keys():
tmp = 0
for k in Pk.keys():
if k>0:
# tmp = tmp + (1+T*(x-1))**(k-1)*k*Pk[k]
tmp = tmp + theta(T,x)**(k-1)*k*Pk[k]
hx += PT[T]*tmp/avek
return hx