Source code for stp.stochastic

#!/usr/bin/env python3
"""Handles generating random objects to use in various calculations.

**Author: Jonathan Delgado**

"""

######################## Imports ########################


import sys
import numpy as np
import scipy.linalg
import bisect # for binary search

# For ProgressBar
import stp.tools.gui as gui


# The random number generator to be used
# _RNG = np.random.default_rng(seed=0) # Seeded one for testing
_RNG = np.random.default_rng()
# We may want to replace zeros by machine epsilon to avoid division by zero 
# errors when calculating quantities of interest
MACHINE_EPSILON = np.finfo(float).eps


######################## Constructors ########################


[docs]def rand_p(n=2, zeros=0): """ Creates a random probability distribution, currently implemented only with uniform sampling. Kwargs: n (int): the dimension of the desired distribution zeros (int): the number of desired zeros to be injected into the distribution. Allows for ease of testing edge cases. Returns: (np.ndarray): the random distribution """ # Error checking for input amounts if zeros > n - 1: print('Too many zeros, will not allow for normalization.') sys.exit() elif zeros < 0: print('Invalid amount of zeros. Should be nonnegative.') sys.exit() # Creates a random distribution of length n p = _RNG.random(n) # Set the first m entries equal to 0, where m is the amount of zeros # requested if zeros != 0: p[:zeros] = 0 # Shuffle the contents to avoid just having the same first entries as 0 np.random.shuffle(p) # Return the distribution normalized by its 1-norm return p / np.sum(p)
[docs]def rand_rate_matrix(n=2, seed=None): """ Generates a random, time-independent, n x n rate matrix consisting of probabilities per unit time. Column normalized. Kwargs: n (int): the number of states the matrix will correspond to. Relates to the dimensions of the matrix. seed (None/int): the seed for sampling. Unseeded (uses module-level rng) if None. Returns: (np.ndarray): the rate matrix. """ # Use module-level rng if no seed is provided, else, seed one rng = _RNG if seed is None else np.random.default_rng(seed=seed) # Generates a random n x n matrix W = rng.random( (n, n) ) np.fill_diagonal(W, 0) # Normalize the columns to sum to zero. for y in range(n): W[y, y] = -np.sum(W[:,y]) return W
[docs]def rate_to_transition_matrix(W, time_step): """ Converts a rate matrix to a transition matrix assuming a constant control parameter during the duration of time_step. Args: W (np.ndarray): the rate matrix time_step (float): the time step Returns: (np.ndarray): the transition matrix """ return scipy.linalg.expm(W * time_step)
[docs]def rand_transition_matrix(n=2, time_step=1.0): """ Generates a random, time-independent, discrete time, transition matrix by first generating a random rate matrix and then matrix exponentiating it to incorporate the time step as an additional parameter. Kwargs: n (int): the number of states the matrix will correspond to. Relates to the dimensions of the matrix. time_step (float): the time step: the interval of time between observations. Returns: (np.ndarray): the n x n transition matrix """ return rate_to_transition_matrix(rand_rate_matrix(n), time_step)
[docs]def self_assembly_rate_matrix(alpha=1, c=1, M=1): """ Generates a self-assembly rate matrix for a 3-state system following: https://aip.scitation.org/doi/10.1063/1.3662140. Kwargs: alpha (float/function): the energy/temperature coupled parameter as a float or a function which returns alpha as a function of time. In the latter case the returned rate matrix will be a function which provides a np.ndarray as a function of time. The energy is the negative of the "optimally bound level of energy". c (float): "concentration-like variable". M (int): the degeneracy of the misbound level. Returns: (np.ndarray/function): the time-independent rate matrix as a numpy array in the case where the temperature is constant. Otherwise returns a function corresponding to the time-dependent rate matrix. """ if not callable(alpha) and alpha <= 0: raise ValueError('Must provide a nonnegative alpha parameter.') if callable(alpha): # T is a time-dependent temperature # Provide the time-dependent rate matrix return lambda t : self_assembly_rate_matrix(alpha=alpha(t), c=c, M=M) # if not isinstance(M, int): # raise ValueError('The degeneracy of the misbound level, M, must be an integer.') ### Main body ### W = np.array([ [ -c * (M + 1), alpha, alpha**2 ], [ c * M, -alpha, 0 ], [ c, 0, -alpha**2 ] ]) return W
[docs]def self_assembly_transition_matrix(alpha=1, c=1, M=1, time_step=1): """ Generates a self-assembly, discrete time, transition matrix for a 3-state system following: https://aip.scitation.org/doi/10.1063/1.3662140. Done assuming any external control parameter is fixed for the duration of the time_step. This matrix is step dependent, so conversions will be done to time to calculate the current temperature. Kwargs: alpha (float/function): the energy/temperature coupled parameter as a float or a function which returns alpha as a function of time. In the latter case the returned transition matrix will be a function which provides a np.ndarray as a function of time. The energy is the negative of the "optimally bound level of energy". This external control parameter will be fixed for the duration of the time_step. c (float): "concentration-like variable" M (int): the degeneracy of the misbound level time_step (float): the time step Returns: (np.ndarray/function): the transition matrix """ if callable(alpha): # Convert n to time. # This also serves to fix the temperature between observations to change # the control parameter discretely. R = lambda n : self_assembly_transition_matrix( alpha=alpha(n * time_step), c=c, M=M, time_step=time_step ) return R return rate_to_transition_matrix( self_assembly_rate_matrix(alpha=alpha, c=c, M=M), time_step )
######################## Probability Operations ########################
[docs]def step(matrix, p): """ Evolves a probability distribution one step forward by computing the matrix multiplication between matrix and p. In the case of the matrix being a rate matrix the output is the time-derivative of p. Args: matrix (np.ndarray): the transition or rate matrix p (np.ndarray): the marginal distribution Returns: (np.ndarray): the evolved marginal """ return np.matmul(matrix, p)
[docs]def get_stationary_distribution(matrix, discrete=True): """ Calculates the stationary distribution of a transition or rate matrix. Credit: https://stackoverflow.com/questions/31791728/python-code-explanation-for-stationary-distribution-of-a-markov-chain. Args: matrix (np.ndarray/function): the transition or rate matrix Kwargs: discrete (bool): True if the provided matrix is a discrete time transition matrix, False if the provided matrix is a continuous time rate matrix Returns: (np.ndarray/function): the limiting distribution (as a function of time in the case where the matrix is too) """ if callable(matrix): # Return the limiting distribution as a function of time return lambda t : get_stationary_distribution(matrix(t), discrete=discrete) if not discrete: # Use the simplified method for the rate matrix, not as robust # Make sure to check output _, eigenvec = np.linalg.eig(matrix) print('Returning experimental right eigenvector to the rate matrix. Always check the vector is correct!') return np.abs(eigenvec[:, 0]) S, U = np.linalg.eig(matrix) # Added the need to take the real part to avoid +0j added to each term # and raising a complex error # Added absolute value since I was occasionally getting the negative p stationary = np.absolute( np.array(U[:, np.where(np.abs(S - 1.) < 1e-8)[0][0]].flat).real ) return stationary / np.sum(stationary)
[docs]def get_path_probability(R, p, path): """ Calculates the probability of observing a provided path using the a (potentially time-inhomogeneous) transition matrix, the initial marginal distribution, and the path itself. Args: R (function/np.ndarray): the transition matrix. A function of the observation step if the matrix is time-inhomogeneous. That is, R = R(n). In which case Pr( x <- y ) = R(1)[x,y] * p[y]. Provide a numpy matrix in the case of a time-homogeneous transition matrix. p (np.ndarray): the marginal distribution. path (list/np.ndarray): the path. Returns: (float): the probability of observing this path. """ # Check if R is time-independent, if so, convert it to a function # of the observation step that simply returns the constant matrix. if not callable(R): matrix = R R = lambda n: matrix # Comprehension to collect transition probabilities for vectorized product total_jump_prob = np.array([ R(i + 1)[path[i+1], path[i]] for i in range( len(path) - 1 ) ]).prod() return total_jump_prob * p[path[0]]
######################## Path space sampling ########################
[docs]def complete_path_space(n, path_length): """ Generates the entire path space as a matrix with each row corresponding to a path and each column corresponding to an observation step. Args: n (int): the number of states in the state space path_length (int): the length of each path Returns: (np.ndarray): the entire path space. Will be a matrix of size: n^path_length x path_length. """ paths = [] num_paths = n**path_length bar = gui.ProgressBar(num_paths, width=300, title='Generating path space...') for i in range(num_paths): path = np.base_repr(i, base=n) path = ( path_length - len(path) )*'0' + path paths.append( # Convert the number from decimal to base n. # This will be in a string format, turn each digit into a separate # int in a list, this will be a path. [ int(state) for state in path ] ) bar.update() bar.finish() return np.array(paths)
[docs]def KMC(W, p, num_paths, path_length, time_step=1, seed=None, _degenerate_threshold=0.985): """ A Rejection-free Kinetic Monte Carlo (KMC) algorithm for simulating the discrete time evolution of a system, where some processes can occur with known (continuous time) rates W = W(t). The discrete time dynamics will be computed using the continuous time rate matrix, W, by freezing the control parameter for moments of time, time_step. From: https://en.wikipedia.org/wiki/Kinetic_Monte_Carlo. Args: W (np.ndarray/function): rate matrix for np.ndarray. If function is provided then W is the function of time that provides a rate matrix (W = W(t)) for each moment in time. Will be used to implement driven systems. If W is just a rate matrix then W(t) is just the time-homogeneous rate matrix. p (np.ndarray): the initial marginal distribution. num_paths (int): the number of paths to sample. path_length (int): the length of each path. Kwargs: time_step (float): the interval between changing of rate matrix (changing of control parameter), and the delay between observations. seed (None/int): the seed for sampling. Unseeded (uses module-level rng) if None. _degenerate_threshold (float): the fraction of paths that we require to be unique. If not satisfied KMC will be run again with an increased threshold. Returns: (np.ndarray): matrix of paths sampled via KMC. """ #------------- Helper functions -------------# def _KMC_jump(cumulatives, state): """ Helper: takes cumulatives, a state, and generates the next likely jump via the KMC algorithm and the time it takes to observe the jump. Useful for discretizing the KMC. Args: cumulatives (np.ndarray): the KMC's cumulatives. state (int): the current state. Returns: (2-tuple): (next_state, delta time) """ # https://en.wikipedia.org/wiki/Kinetic_Monte_Carlo. # The cumulative relevant to the current state relevant_cumulative = cumulatives[:, state] u = rng.uniform(0, 1) scaled_total_rate = u * cumulatives[-1, state] next_state = bisect.bisect_left(relevant_cumulative, scaled_total_rate) u = rng.uniform(0, 1) jump_time = -np.log(u) / relevant_cumulative[-1] return next_state, jump_time #------------- Initialization -------------# # Use module-level rng if no seed is provided, else, seed one rng = _RNG if seed is None else np.random.default_rng(seed=seed) if not callable(W): # Hold the rate matrix in memory. constW = W # Let the time-dependent rate matrix just be the constant matrix. W = lambda t: constW # Read off the size of the state space. n = W(0).shape[0] state_options = np.arange(n) # Read off the final time for the full time series observations given # the path length and the delay between observations. # For final_time, time_step, and path_length, the relationship is: # time_step * (number of jumps) = final_time final_time = time_step * (path_length - 1) # Initialize the matrix storing the paths paths = np.full( (num_paths, path_length), -1, dtype='int64' ) # Set the first column of the matrix to be initial state weighted by p. paths[:, 0] = rng.choice(state_options, p=p, size=num_paths) # Fill each column for k in range(1, path_length): # This will find the current time of the control parameter to fix # the rate matrix since this is discrete time. current_time = k * time_step # Fix the rate matrix current_W = W(current_time) #------------- Main algorithm -------------# # We need to remove the normalization for the algorithm and just # look at the positive rates. posW = current_W.copy() np.fill_diagonal(posW, 0) # Matrix of partial sums, last one being the total rate # Each row corresponds to the same final index for the partial sum # Each column corresponds to which rates were summed together cumulatives = np.array( [ [ np.sum(posW[:i+1, x]) for i in range(n) ] for x in range(n) ] ).T # Run through the path matrix. for path in paths: current_state = path[k - 1] # The state we are currently in is the last nonnegative # element of our path list (since the path matrix is # initialized with -1) # current_state = path[path >= 0][-1] total_jump_times = 0 # Prepare for the while loop next_state = current_state # Keep evolving the state until it fits what would be observed # in a discrete time process. while total_jump_times < time_step: potential_jump, jump_time = _KMC_jump(cumulatives, next_state) # Note that if the next jump takes longer than our # time_step we won't observe it. Since we will # make the measurement before this happens. if total_jump_times + jump_time >= time_step: break total_jump_times += jump_time next_state = potential_jump path[k] = next_state ### Degenerate paths check ### # Check for degenerate paths and sample again if needed. paths = np.unique(paths, axis=0) if paths.shape[0] / num_paths < _degenerate_threshold: # Generate a rate matrix of perturbations deltaW = rand_rate_matrix(n, seed=seed) / 100 Wprime = lambda t : W(t) + deltaW # Run KMC again with an increased degeneracy threshold and # a perturbed rate matrix. new_paths = KMC(Wprime, p, num_paths, path_length, time_step=time_step, seed=seed, _degenerate_threshold=_degenerate_threshold-10E-9) # Combine the path data paths = np.unique( np.vstack((paths, new_paths)), axis=0 ) # Return only the requested number of paths (in case we have extra unique # ones from previous degeneracies) return paths[:num_paths]
[docs]def direct_sampling(R, p): """ Not yet been implemented: Samples the path space directly to reflect the dynamics given by the initial marginal and the transition matrix. Args: R (function/np.ndarray): the (possibly time-dependent) transition matrix. p (np.ndarray): the marginal distribution. Returns: (np.ndarray): the sampled portion of the path space. """ print('Direct sampling has not been implemented yet.')
######################## Entry Code ######################## def main(): print('stochastic.py') W = self_assembly_transition_matrix(alpha=lambda t : np.exp(-65/2*(0.25*np.cos(0.01*t)+0.32+0.1*t))) p = np.array([1 - 2 * MACHINE_EPSILON, MACHINE_EPSILON, MACHINE_EPSILON]) paths = KMC(W, p, 500, 20, seed=0) paths2 = KMC(W, p, 500, 20, seed=0) print( f'paths.shape: {paths.shape}' ) print( f'paths: {paths}' ) print( f'paths - paths2: {paths - paths2}' ) if __name__ == '__main__': main()