Source code for cayenne.simulation

"""
    The main class for running stochastic simulation
"""

import multiprocessing as mp
from functools import partial
from typing import List, Optional
from warnings import warn

import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import numpy as np

from .algorithms.direct import direct
from .algorithms.tau_adaptive import tau_adaptive
from .algorithms.tau_leaping import tau_leaping
from .model_io import ModelIO
from .results import Results


[docs]def wrapper(x, func): return func(*x)
[docs]class Simulation: """ A main class for running simulations. Parameters ---------- species_names : List[str] List of species names rxn_names : List[str] List of reaction names react_stoic: (ns, nr) ndarray A 2D array of the stoichiometric coefficients of the reactants. Reactions are columns and species are rows. prod_stoic: (ns, nr) ndarray A 2D array of the stoichiometric coefficients of the products. Reactions are columns and species are rows. init_state: (ns,) ndarray A 1D array representing the initial state of the system. k_det: (nr,) ndarray A 1D array representing the deterministic rate constants of the system. volume: float, optional The volume of the reactor vessel which is important for second and higher order reactions. Defaults to 1 arbitrary units. chem_flag: bool, optional If True, divide by Na (Avogadro's constant) while calculating stochastic rate constants. Defaults to ``False``. Attributes ---------- results: Results The results instance Raises ------ ValueError If supplied with order > 3. Examples -------- >>> V_r = np.array([[1,0],[0,1],[0,0]]) >>> V_p = np.array([[0,0],[1,0],[0,1]]) >>> X0 = np.array([10,0,0]) >>> k = np.array([1,1]) >>> sim = Simulation(V_r, V_p, X0, k) Notes ----- Stochastic reaction rates depend on the size of the system for second and third order reactions. By this, we mean the volume of the system in which the reactants are contained. Intuitively, this makes sense considering that collisions between two or more molecules becomes rarer as the size of the system increases. A detailed mathematical treatment of this idea can be found in [3]_ . In practice, this means that ``volume`` and ``chem_flag`` need to be supplied for second and third order reactions. ``volume`` represents the size of the system containing the reactants. In chemical systems ``chem_flag`` should generally be set to ``True`` as ``k_det`` is specified in units of molarity or M or mol/L. For example, a second order rate constant could be = 0.15 mol / (L s). Then Avogadro's constant (:math:`N_a`) is used for normalization while computing ``k_stoc`` (:math:`c_\mu` in [3]_ ) from ``k_det``. In biological systems, ``chem_flag`` should be generally be set to ``False`` as ``k_det`` is specified in units of copies/L or CFU/L. For example, a second order rate constant could be = 0.15 CFU / (L s). References ---------- .. [3] Gillespie, D.T., 1976. A general method for numerically simulating the stochastic time evolution of coupled chemical reactions. J. Comput. Phys. 22, 403–434. doi:10.1016/0021-9991(76)90041-3. """ _results: Optional[Results] = None def __init__( self, species_names: List[str], rxn_names: List[str], react_stoic: np.ndarray, prod_stoic: np.ndarray, init_state: np.ndarray, k_det: np.ndarray, chem_flag: bool = False, volume: float = 1.0, ) -> None: self.species_names = species_names self.rxn_names = rxn_names self._react_stoic = react_stoic self._prod_stoic = prod_stoic self._init_state = init_state self._k_det = k_det self._chem_flag = chem_flag self._ns = self._react_stoic.shape[0] self._nr = self._react_stoic.shape[1] self._volume = volume self._orders = np.sum( self._react_stoic, axis=0 ) # Order of rxn = number of reactants self._check_consistency() def _check_consistency(self): if len(self.species_names) != self._ns: raise ValueError( "Species names must match the stoichiometric matrix shapes." ) if len(self.rxn_names) != self._nr: raise ValueError( "Reaction names must match the stoichiometric matrix shapes." ) if (self._ns != self._prod_stoic.shape[0]) or ( self._nr != self._prod_stoic.shape[1] ): raise ValueError("react_stoic and prod_stoic should be the same shape.") if np.any(self._react_stoic < 0): raise ValueError("react_stoic cannot have negative elements.") if np.any(self._prod_stoic < 0): raise ValueError("V_p cannot have negative elements.") if np.any(self._init_state < 0): raise ValueError("Initial numbers in X0 can't be negative.") if np.any(self._k_det < 0): raise ValueError("Rate constant(s) can't be negative.") if self._k_det.shape[0] != self._nr: raise ValueError("Number of rate constants must equal number of reactions") if len(self._k_det.shape) != 1: raise ValueError("k_det must be a 1-D array") if self._chem_flag not in (True, False): raise ValueError("chem_flag must be a boolean True or False.") if np.max(self._orders) > 3: raise ValueError("Order greater than 3 not suppported.") if self._ns != self._init_state.shape[0]: raise ValueError( "X0 must have be of length = num. of species (or rows of V_r)" ) if len(self._init_state.shape) != 1: raise ValueError("X0 must be a 1-D array") @property def results(self) -> Optional[Results]: """ The ``Results`` instance of the simulation Returns ------- Optional[Results] """ if self._results is None: warn( "Run `Simulation.simulate` before requesting the results object", Warning, ) return self._results else: return self._results
[docs] @classmethod def load_model(cls, contents: str, contents_type: str) -> "Simulation": """ Load model contents into a Simulation object Parameters ---------- model_contents : str Either the model string or the file path content_type : str, {"ModelString", "ModelFile"} The type of the model Returns ------- sim : Simulation An instance of the Simulation class. """ modelio = ModelIO(contents, contents_type) ( species_names, rxn_names, react_stoic, prod_stoic, init_state, k_det, chem_flag, volume, ) = modelio.args return cls( species_names, rxn_names, react_stoic, prod_stoic, init_state, k_det, chem_flag, volume, )
[docs] def simulate( self, max_t: float = 10.0, max_iter: int = 1000, seed: int = 0, n_rep: int = 1, n_procs: Optional[int] = 1, algorithm: str = "direct", debug: bool = False, **kwargs, ) -> None: """ Run the simulation Parameters ---------- max_t: float, optional The end time of the simulation. The default is 10.0. max_iter: int, optional The maximum number of iterations of the simulation loop. The default is 1000 iterations. seed: int, optional The seed used to generate simulation seeds. The default value is 0. n_rep: int, optional The number of repetitions of the simulation required. The default value is 1. n_procs: int, optional The number of cpu cores to use for the simulation. Use ``None`` to automatically detect number of cpu cores. The default value is 1. algorithm: str, optional The algorithm to be used to run the simulation. The default value is ``"direct"``. Notes ----- The status indicates the status of the simulation at exit. Each repetition will have a status associated with it, and these are accessible through the ``Simulation.results.status_list``. status: int Indicates the status of the simulation at exit. 1: Succesful completion, terminated when ``max_iter`` iterations reached. 2: Succesful completion, terminated when ``max_t`` crossed. 3: Succesful completion, terminated when all species went extinct. -1: Failure, order greater than 3 detected. -2: Failure, propensity zero without extinction. """ tlist = [] xlist = [] status_list = [] if not isinstance(seed, int): raise TypeError("Seed should be of type int") np.random.seed(seed) sim_seeds = np.random.randint(low=0, high=1e7, size=n_rep) if not isinstance(max_iter, int): raise TypeError("max_iter should be of type int") algo_args = [] if algorithm == "direct": for index in range(n_rep): algo_args.append( ( self._react_stoic, self._prod_stoic, self._init_state, self._k_det, max_t, max_iter, self._volume, sim_seeds[index], self._chem_flag, ) ) algo = direct elif algorithm == "tau_leaping": if "tau" in kwargs.keys(): tau = kwargs["tau"] else: tau = 0.1 for index in range(n_rep): algo_args.append( ( self._react_stoic, self._prod_stoic, self._init_state, self._k_det, tau, max_t, self._volume, sim_seeds[index], self._chem_flag, ) ) algo = tau_leaping elif algorithm == "tau_adaptive": if "epsilon" in kwargs.keys(): epsilon = kwargs["epsilon"] else: epsilon = 0.03 if "nc" in kwargs.keys(): nc = kwargs["nc"] else: nc = 10 hor = self.HOR for index in range(n_rep): algo_args.append( ( self._react_stoic, self._prod_stoic, self._init_state, self._k_det, hor.astype(np.int), nc, epsilon, max_t, max_iter, self._volume, sim_seeds[index], self._chem_flag, ) ) algo = tau_adaptive else: raise ValueError("Requested algorithm not supported") algo_func = partial(wrapper, func=algo) if debug: results = map(algo_func, algo_args) else: with mp.Pool(processes=n_procs) as pool: results = pool.map(algo_func, algo_args) for t, X, status in results: tlist.append(t) xlist.append(X) status_list.append(status) self._results = Results( self.species_names, self.rxn_names, tlist, xlist, status_list, algorithm, sim_seeds, )
@property def HOR(self) -> np.ndarray: """ Determine the HOR vector. HOR(i) is the highest order of reaction in which species S_i appears as a reactant. Returns ------- HOR: np.ndarray Highest order of the reaction for the reactive species as defined under Eqn. (27) of [1]_. HOR can be 1, 2 or 3 if the species appears only once in the reactants. If HOR is -2, it appears twice in a second order reaction. If HOR is -3, it appears thrice in a third order reaction. If HOR is -32, it appears twice in a third order reaction. The corresponding value of ``g_i`` in Eqn. (27) is handled by ``tau_adaptive``. References ---------- .. [1] Cao, Y., Gillespie, D.T., Petzold, L.R., 2006. Efficient step size selection for the tau-leaping simulation method. J. Chem. Phys. 124, 044109. doi:10.1063/1.2159468 """ ns = self._react_stoic.shape[0] HOR = np.zeros(ns, dtype=np.int32) orders = np.sum(self._react_stoic, axis=0) for ind in range(ns): this_orders = orders[np.where(self._react_stoic[ind, :] > 0)[0]] if len(this_orders) == 0: HOR[ind] = 0 continue HOR[ind] = np.max(this_orders) if HOR[ind] == 1: continue order_2_indices = np.where(orders == 2) this_react_stoic = self._react_stoic[ind, :] if order_2_indices[0].size > 0: if np.max(this_react_stoic[order_2_indices[0]]) == 2 and HOR[ind] == 2: HOR[ind] = -2 # g_i should be (2 + 1/(x_i-1)) if np.where(orders == 3): if ( HOR[ind] == 3 and np.max(this_react_stoic[np.where(this_orders == 3)[0]]) == 2 ): HOR[ind] = -32 # g_i should be (3/2 * (2 + 1/(x_i-1))) elif ( HOR[ind] == 3 and np.max(this_react_stoic[np.where(this_orders == 3)[0]]) == 3 ): HOR[ind] = -3 # g_i should be(3 + 1/(x_i-1) + 2/(x_i-2)) return HOR
[docs] def plot( self, species_names: list = None, new_names: list = None, thinning: int = 1 ): """ Plot the simulation Parameters ---------- species_names: list, optional The names of the species to be plotted (``list`` of ``str``). The default is ``None`` and plots all species. new_names: list, optional The names of the species to be plotted. The default is ``"xi"`` for species ``i``. thinning: int The parameter that controls the sampling Eg. a value of 100 means that 1 point will be sampled every 100 steps The default is 1 (every time-point is sampled) Returns ------- fig: class 'matplotlib.figure.Figure' Figure object of the generated plot. ax: class 'matplotlib.axes._subplots.AxesSubplot Axis objected of the generated plot. """ if self._results is None: raise ValueError("Simulate not run.") else: if species_names is None: species_names = self.species_names n_species = len(species_names) prop_cycle = plt.rcParams["axes.prop_cycle"] colors = prop_cycle.by_key()["color"] fig, ax = plt.subplots() res = self._results if not isinstance(thinning, int): raise ValueError("The thinning parameter must be integer") legend_handlers = [0] * n_species generic_names = [""] * n_species for index1 in range(n_species): legend_handlers[index1] = mlines.Line2D([], [], color=colors[index1]) this_species = species_names[index1] generic_names[index1] = this_species n_reps = len(res) for index2 in range(n_reps): if thinning == 1: ax.step( res.t_list[index2][::thinning], res.get_species([this_species])[index2][::thinning, :], color=colors[index1], where="post", ) else: ax.scatter( res.t_list[index2][::thinning], res.get_species([this_species])[index2][::thinning, :], color=colors[index1], ) if new_names is None: new_names = self.species_names fig.legend(legend_handlers, new_names) ax.set_xlabel("Time") ax.set_ylabel("Species amounts") return fig, ax