#!/opt/homebrew/Caskroom/miniconda/base/envs/main/bin/python
"""Handles generating random objects to use in various calculations.
**Author: Jonathan Delgado**
"""
#------------- Imports -------------#
import sys
import numpy as np
import scipy.linalg
import bisect # for binary search for KMC algorithm
from functools import cache # memoization for transition matrices
#--- Custom imports ---#
#------------- Fields -------------#
# The random number generator to be used
# _RNG = np.random.default_rng(seed=0) # Seeded one for testing
_RNG = np.random.default_rng()
# We may want to replace zeros by machine epsilon to avoid division by zero
# errors when calculating quantities of interest
MACHINE_EPSILON = np.finfo(float).eps
#======================== Constructors ========================#
[docs]
def rand_p(n=2, zeros=0):
""" Creates a random probability distribution, currently implemented only
with uniform sampling.
Kwargs:
n (int): the dimension of the desired distribution
zeros (int): the number of desired zeros to be injected into the distribution. Allows for ease of testing edge cases.
Returns:
(np.ndarray): the random distribution
"""
# Error checking for input amounts
if zeros > n - 1:
print('Too many zeros, will not allow for normalization.')
sys.exit()
elif zeros < 0:
print('Invalid amount of zeros. Should be nonnegative.')
sys.exit()
# Creates a random distribution of length n
p = _RNG.random(n)
# Set the first m entries equal to 0, where m is the amount of zeros
# requested
if zeros != 0:
p[:zeros] = 0
# Shuffle the contents to avoid just having the same first entries as 0
np.random.shuffle(p)
# Return the distribution normalized by its 1-norm
return p / np.sum(p)
def _matrix_to_rate_matrix(matrix):
""" Converts a square matrix into a rate matrix by normalizing the columns. """
np.fill_diagonal(matrix, 0)
# Normalize the columns to sum to zero.
np.fill_diagonal(matrix, -matrix.sum(axis=0))
return matrix
[docs]
def normal_rand_rate_matrix(n=2, mean=0, std=1, seed=None):
""" Generates a random, time-independent, n x n rate matrix consisting of
probabilities per unit time. Column normalized.
Kwargs:
n (int): the number of states the matrix will correspond to. Relates to the dimensions of the matrix.
seed (None/int): the seed for sampling. Unseeded (uses module-level rng) if None.
Returns:
(np.ndarray): the rate matrix.
"""
# Use module-level rng if no seed is provided, else, seed one
rng = _RNG if seed is None else np.random.default_rng(seed=seed)
return _matrix_to_rate_matrix( rng.normal(loc=mean, scale=std, size=(n, n)) )
[docs]
def rate_to_transition_matrix(W, time_step):
""" Converts a rate matrix to a transition matrix assuming a constant
control parameter during the duration of time_step.
Args:
W (np.ndarray): the rate matrix
time_step (float): the time step
Returns:
(np.ndarray): the transition matrix
"""
if callable(W):
# Time-dependent rate matrix, call this function at each moment in time
return cache(lambda t: rate_to_transition_matrix(W(t), time_step))
return scipy.linalg.expm(W * time_step)
[docs]
def rand_transition_matrix(n=2, time_step=None):
""" Generates a random, time-independent, discrete time, transition matrix.
If a time_step is provided, then a rate matrix is generated first, to couple the time_step parameter to the rate matrix. Otherwise it is generated directly.
Kwargs:
n (int): the number of states the matrix will correspond to. Relates to the dimensions of the matrix.
time_step (float): the time step.
Returns:
(np.ndarray): the n x n transition matrix
"""
if time_step is not None:
return rate_to_transition_matrix(rand_rate_matrix(n), time_step)
R = _RNG.random((n, n))
return R / R.sum(axis=0)
[docs]
def self_assembly_rate_matrix(alpha=1, c=1, M=1):
""" Generates a self-assembly rate matrix for a 3-state system following:
https://aip.scitation.org/doi/10.1063/1.3662140.
Kwargs:
alpha (float/function): the energy/temperature coupled parameter as a float or a function which returns alpha as a function of time. In the latter case the returned rate matrix will be a function which provides a np.ndarray as a function of time. The energy is the negative of the "optimally bound level of energy".
c (float): "concentration-like variable".
M (int): the degeneracy of the misbound level.
Returns:
(np.ndarray/function): the time-independent rate matrix as a numpy array in the case where the temperature is constant. Otherwise returns a function corresponding to the time-dependent rate matrix.
"""
if not callable(alpha):
if alpha < 0:
raise ValueError('Must provide a nonnegative alpha parameter.')
if alpha > 1:
raise ValueError('α must be less than 1 to represent a physical system since α = exp(-E/2T) and E/2T is a nonnegative quantity.')
if callable(alpha):
# T is a time-dependent temperature
# Provide the time-dependent rate matrix
return lambda t : self_assembly_rate_matrix(alpha=alpha(t), c=c, M=M)
# if not isinstance(M, int):
# raise ValueError('The degeneracy of the misbound level, M, must be an integer.')
#--- Main body ---#
W = np.array([
[ -c * (M + 1), alpha, alpha**2 ],
[ c * M, -alpha, 0 ],
[ c, 0, -alpha**2 ]
])
return W
[docs]
def self_assembly_transition_matrix(alpha=1, c=1, M=1, time_step=1):
""" Generates a self-assembly, discrete time, transition matrix for a
3-state system following: https://aip.scitation.org/doi/10.1063/1.3662140. Done assuming any external control parameter is fixed for the duration of the time_step. This matrix is step dependent, so conversions will be done to time to calculate the current temperature.
Kwargs:
alpha (float/function): the energy/temperature coupled parameter as a float or a function which returns alpha as a function of time. In the latter case the returned transition matrix will be a function which provides a np.ndarray as a function of time. The energy is the negative of the "optimally bound level of energy". This external control parameter will be fixed for the duration of the time_step.
c (float): "concentration-like variable"
M (int): the degeneracy of the misbound level
time_step (float): the time step
Returns:
(np.ndarray/function): the transition matrix
"""
return rate_to_transition_matrix(
self_assembly_rate_matrix(alpha=alpha, c=c, M=M),
time_step
)
#======================== Probability Operations ========================#
[docs]
def step(matrix, p):
""" Evolves a probability distribution one step forward by computing the
matrix multiplication between matrix and p. In the case of the matrix being a rate matrix the output is the time-derivative of p.
Args:
matrix (np.ndarray): the transition or rate matrix
p (np.ndarray): the marginal distribution
Returns:
(np.ndarray): the evolved marginal
"""
return matrix @ p
[docs]
def get_stationary_distribution(matrix, discrete=True):
""" Calculates the stationary distribution of a transition or rate matrix.
Credit: https://stackoverflow.com/questions/31791728/python-code-explanation-for-stationary-distribution-of-a-markov-chain.
Args:
matrix (np.ndarray/function): the transition or rate matrix
Kwargs:
discrete (bool): True if the provided matrix is a discrete time transition matrix, False if the provided matrix is a continuous time rate matrix
Returns:
(np.ndarray/function): the limiting distribution (as a function of time in the case where the matrix is too)
"""
if callable(matrix):
# Return the limiting distribution as a function of time
return lambda t : get_stationary_distribution(matrix(t), discrete=discrete)
if not discrete:
# Use the simplified method for the rate matrix, not as robust
# Make sure to check output
_, eigenvec = np.linalg.eig(matrix)
print('Returning experimental right eigenvector to the rate matrix. Always check the vector is correct!')
return np.abs(eigenvec[:, 0])
S, U = np.linalg.eig(matrix)
# Added the need to take the real part to avoid +0j added to each term
# and raising a complex error
# Added absolute value since I was occasionally getting the negative p
stationary = np.absolute(
np.array(U[:, np.where(np.abs(S - 1.) < 1e-8)[0][0]].flat).real
)
return stationary / np.sum(stationary)
[docs]
def get_path_probability(R, p, path):
""" Calculates the probability of observing a provided path using the a
(potentially time-inhomogeneous) transition matrix, the initial marginal distribution, and the path itself.
Args:
R (function/np.ndarray): the transition matrix. A function of the observation step if the matrix is time-inhomogeneous. That is, R = R(n). In which case Pr( x <- y ) = R(1)[x,y] * p[y]. Provide a numpy matrix in the case of a time-homogeneous transition matrix.
p (np.ndarray): the marginal distribution.
path (list/np.ndarray): the path.
Returns:
(float): the probability of observing this path.
"""
# Check if R is time-independent, if so, convert it to a function
# of the observation step that simply returns the constant matrix.
if not callable(R):
matrix = R
R = cache(lambda _: matrix)
# Comprehension to collect transition probabilities for vectorized product
total_jump_prob = np.array([
R(i + 1)[path[i+1], path[i]]
for i in range( len(path) - 1 )
]).prod()
return total_jump_prob * p[path[0]]
#======================== Path space sampling ========================#
[docs]
def complete_path_space(n, path_length):
""" Generates the entire path space as a matrix with each row corresponding
to a path and each column corresponding to an observation step.
Args:
n (int): the number of states in the state space
path_length (int): the length of each path
Returns:
(np.ndarray): the entire path space. Will be a matrix of size: n^path_length x path_length.
"""
paths = []
num_paths = n**path_length
for i in range(num_paths):
path = np.base_repr(i, base=n)
# Fixes constant width path to match the path length. Otherwise
# 1 and 221 are distinct path lengths when the former should read as 001
path = ( path_length - len(path) )*'0' + path
paths.append(
# Convert the number from decimal to base n.
# This will be in a string format, turn each digit into a separate
# int in a list, this will be a path.
[ int(state) for state in path ]
)
return np.array(paths)
[docs]
def KMC(W, p, num_paths, path_length, time_step=1, seed=None):
""" A Rejection-free Kinetic Monte Carlo (KMC) algorithm for simulating
the discrete time evolution of a system, where some processes can occur with known (continuous time) rates W = W(t). The discrete time dynamics will be computed using the continuous time rate matrix, W, by freezing the control parameter for moments of time, time_step. From: https://en.wikipedia.org/wiki/Kinetic_Monte_Carlo.
This algorithm can be improved by not freezing the control parameter, and parallelization, and by leveraging the first-order Markov assumption by generating the next step for all paths currently in the same state.
Args:
W (np.ndarray/function): rate matrix for np.ndarray. If function is provided then W is the function of time that provides a rate matrix (W = W(t)) for each moment in time. Will be used to implement driven systems. If W is just a rate matrix then W(t) is just the time-homogeneous rate matrix.
p (np.ndarray): the initial marginal distribution.
num_paths (int): the number of paths to sample.
path_length (int): the length of each path.
Kwargs:
time_step (float): the interval between changing of rate matrix (changing of control parameter), and the delay between observations.
seed (None/int): the seed for sampling. Unseeded (uses module-level rng) if None.
Returns:
(np.ndarray): matrix of paths sampled via KMC.
"""
#------------- Helper functions -------------#
def _KMC_jump(cumulatives, state):
""" Helper: takes cumulatives, a state, and generates
the next likely jump via the KMC algorithm and the time it takes to observe the jump. Useful for discretizing the KMC.
Args:
cumulatives (np.ndarray): the KMC's cumulatives.
state (int): the current state.
Returns:
(2-tuple): (next_state, delta time)
"""
# https://en.wikipedia.org/wiki/Kinetic_Monte_Carlo.
# The cumulative relevant to the current state
relevant_cumulative = cumulatives[:, state]
u = rng.uniform(0, 1)
scaled_total_rate = u * cumulatives[-1, state]
next_state = bisect.bisect_left(relevant_cumulative, scaled_total_rate)
u = rng.uniform(0, 1)
jump_time = -np.log(u) / relevant_cumulative[-1]
return next_state, jump_time
#------------- Initialization -------------#
# Use module-level rng if no seed is provided, else, seed one
rng = _RNG if seed is None else np.random.default_rng(seed=seed)
if not callable(W):
# Hold the rate matrix in memory.
constW = W
# Let the time-dependent rate matrix just be the constant matrix.
W = lambda t: constW
# Read off the size of the state space.
n = len(W(0))
state_options = np.arange(n)
# Initialize the matrix storing the paths
paths = np.full( (num_paths, path_length), -1, dtype='int64' )
# Set the first column of the matrix to be initial state weighted by p.
paths[:, 0] = rng.choice(state_options, p=p, size=num_paths)
# Fill each column
for k in range(1, path_length):
# This will find the current time of the control parameter to fix
# the rate matrix since this is discrete time.
current_time = k * time_step
# Fix the rate matrix
current_W = W(current_time)
#------------- Main algorithm -------------#
# We need to remove the normalization for the algorithm and just
# look at the positive rates.
posW = current_W.copy()
np.fill_diagonal(posW, 0)
# Matrix of partial sums, last one being the total rate
# Each row corresponds to the same final index for the partial sum
# Each column corresponds to which rates were summed together
cumulatives = np.array([
[ np.sum(posW[:i+1, x]) for i in range(n) ]
for x in range(n)
]).T
# Run through the path matrix.
for path in paths:
current_state = path[k - 1]
# The state we are currently in is the last nonnegative
# element of our path list (since the path matrix is
# initialized with -1)
# current_state = path[path >= 0][-1]
total_jump_times = 0
# Prepare for the while loop
next_state = current_state
# Keep evolving the state until it fits what would be observed
# in a discrete time process.
while total_jump_times < time_step:
potential_jump, jump_time = _KMC_jump(cumulatives, next_state)
# Note that if the next jump takes longer than our
# time_step we won't observe it. Since we will
# make the measurement before this happens.
if total_jump_times + jump_time >= time_step:
break
total_jump_times += jump_time
next_state = potential_jump
path[k] = next_state
return np.unique(paths, axis=0)
[docs]
def direct_sampling(R, p):
""" Not yet been implemented: Samples the path space directly to reflect
the dynamics given by the initial marginal and the transition matrix.
Args:
R (function/np.ndarray): the (possibly time-dependent) transition matrix.
p (np.ndarray): the marginal distribution.
Returns:
(np.ndarray): the sampled portion of the path space.
"""
raise NotImplementedError('Direct sampling has not been implemented yet.')
######################## Entry Code ########################
def main():
print('stochastic.py')
W = self_assembly_transition_matrix(alpha=lambda t : np.exp(-65/2*(0.25*np.cos(0.01*t)+0.32+0.1*t)))
p = np.array([1 - 2 * MACHINE_EPSILON, MACHINE_EPSILON, MACHINE_EPSILON])
paths = KMC(W, p, 500, 20, seed=0)
paths2 = KMC(W, p, 500, 20, seed=0)
print( f'paths.shape: {paths.shape}' )
print( f'paths: {paths}' )
print( f'paths - paths2: {paths - paths2}' )
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt as e:
print('Keyboard interrupt.')