#!/usr/bin/env python3
"""Entropy and information theory related calculations.
**Author: Jonathan Delgado**
"""
######################## Imports ########################
import numpy as np
import stp
######################## Helper functions ########################
def _eps_filter(x):
""" Checks if the value is within machine-epsilon of zero and maps it to
zero if it is the case. Useful for removing negative values in entropies that should otherwise be zero.
Args:
x (float): value to be checked.
Returns:
(float): x if the value is not within machine epsilon of zero, 0 otherwise.
"""
return x if not np.isclose(x, 0, atol=9*10E-15) else 0
######################## Entropy calculations ########################
[docs]def entropy(p):
""" Calculates the Shannon entropy for a marginal distribution.
Args:
p (np.ndarray): the marginal distribution.
Returns:
(float): the entropy of p
"""
# Since zeros do not contribute to the Shannon entropy by definition, we
# ignore them to avoid any errors/warnings.
p = p[p != 0]
H = -np.dot(p, np.log(p))
# Filter against machine epsilon
return _eps_filter(H)
[docs]def delta_entropy(R, p):
""" Calculates the discrete time change in entropy using the entropy of p
evolved with R, minus the entropy of p.
Args:
R (np.ndarray): the transition matrix.
p (np.ndarray): the marginal distribution.
Returns:
(float): the change in entropy
"""
return entropy(step(R, p)) - entropy(p)
[docs]def relative_entropy(p, q):
""" Calculates the Kullback-Leibler divergence, which is nonnegative and
vanishes if and only if the distributions coincide.
Args:
p, q (np.ndarray): the probability distributions.
Returns:
(float): the relative entropy.
"""
if p.shape[0] != q.shape[0]:
print('Dimensions of vectors are not equal. Cannot find relative entropy.')
sys.exit()
# Any values where p is zero are defined to be zero and hence do not
# contribute to the relative entropy
# By masking q as well we automatically skip the values that were supposed
# to vanish with p avoiding any misalignment issues
# Note that by masking q only where p is zero doesn't remove
# any mismatching meaning it will still be infinite (as it should be)
# in the case where q has a zero that p does not.
p_filtered = p[p != 0]
log_ratio = np.log(p_filtered / q[p != 0])
return np.dot(p_filtered, log_ratio)
[docs]def entropy_production(matrix, p, discrete=True):
""" Calculates the entropy production for either discrete or continuous
time.
Args:
matrix (np.ndarray): the stochastic matrix, either a discrete time transition matrix or a continuous time rate matrix.
p (np.ndarray): the marginal distribution
Kwargs:
discrete (bool): True if we are calculating the discrete time entropy production (nats), False if we are calculating it in continuous time (nats/time).
Returns:
(float/np.inf): the entropy production
"""
log_product = matrix * np.log( matrix / matrix.T )
# The entropy term only exists in the case of discrete time
# it vanishes when we calculate the continuous time EP,
# by multiplying by the boolean we include it only when
# necessary
EP = np.dot(log_product.sum(axis=0), p) - (entropy(p) * discrete) \
- np.dot(stp.step(matrix, p), np.log(p))
return EP
[docs]def entropy_flow(R, p):
""" Calculates the discrete time entropy flow. This has not been
generalized to handle the continuous time entropy flow yet.
Args:
R (np.ndarray): the discrete time transition matrix
p (np.ndarray): the marginal distribution
Returns:
(float): the entropy flow
"""
# Vectorized calculation
log_product = R * np.log( R / R.T )
p_step = step(R, p)
EF = -np.dot(log_product.sum(axis=0), p) + entropy(p_step) \
+ np.dot(p_step, np.log(p))
return EF
######################## Entropy rates ########################
[docs]def entropy_rate(R):
""" Calculates the asymptotic entropy rate for the provided transition
matrix. If the matrix is time-inhomogeneous then we return a function that generates the entropy_rate as a function of n by calculating the systems limiting distribution for each n.
Args:
R (np.ndarray/function): the transition matrix.
Returns:
(float/function): the entropy velocity.
"""
if callable(R):
return lambda n : entropy_rate(R(n))
pst = stp.get_stationary_distribution(R, discrete=True)
RProduct = (R * np.log(R)).sum(axis=0)
return -np.dot(pst, RProduct)
######################## Information Space Objects ########################
[docs]class InfoSpace:
""" Information space. Holds collections of paths that traverse states in a
state space as a matrix, and the probability of each of those paths.
Provides functionality on this path space such as providing path entropies.
Attributes:
paths: the matrix of paths.
probabilities: a list of probabilities each path.
num_paths: the number of paths considered.
path_length: the length of the paths considered.
probabilities: a matrix where the (i,j)th element is the probability of observing the first j states of the ith path.
entropies: a list of path entropies for each path
total_probability: the sum of the probabilities of each path.
"""
def __init__(self, paths, p_matrix):
""" Initializes the InfoSpace object.
Args:
paths (np.ndarray): a matrix of paths where the (i,j)th element corresponds to the jth symbol of the ith path.
p_matrix (np.ndarray): a matrix of probabilities where the (i,j)th element corresponds to the probability of observing the ith path for the first j+1 (zero-indexing) symbols.
"""
self._paths = np.array(paths)
# Matrix of probabilities corresponding to the probability for the path
# at each moment.
self._p_matrix = np.array(p_matrix)
if self._p_matrix.size != 0:
# The information space is not empty
self._probabilities = self._p_matrix[:, -1]
else:
# There is zero probability here.
self._probabilities = 0
#------------- Properties -------------#
@property
def paths(self):
return self._paths
@property
def num_paths(self):
return self.paths.shape[0]
@property
def path_length(self):
return self.paths.shape[1]
@property
def probabilities(self):
return self._probabilities
@property
def entropies(self):
""" Returns a list of path entropies for each corresponding path
probability.
"""
try:
return self._entropies
except AttributeError:
# It's never been calculated before
self._entropies = -np.log(self.probabilities)
return self._entropies
@property
def total_probability(self):
try:
return self.probabilities.sum()
except AttributeError:
# Space is empty
return 0
#------------- Static methods -------------#
[docs] @staticmethod
def shorten(infospace, path_length, return_index=False):
""" Takes an Information Space and shortens it. Since unique paths of
length n, may be degenerate when truncated to paths of length m < n, we need to check for degeneracies and filter them out in both paths and probabilities.
Args:
infospace (InfoSpace): the information space to shorten.
path_length (int): the path length the information space should be shortened to.
Kwargs:
return_index (bool): returns the indices of the non-degenerate paths for the given path length using the original matrix. Useful for filtering other quantities of interest that may not be attached to this object.
Returns:
(InfoSpace): the shortened InfoSpace.
"""
if path_length < 1:
raise ValueError(f'Invalid path length: {path_length}. Path length must be an integer greater than 0.')
elif path_length > infospace.path_length:
raise ValueError(f'Cannot shorten an InformationSpace from length: {infospace.path_length} -> {path_length}.')
if infospace.paths.size == 0:
# This is an empty information space
return infospace if not return_index else (infospace, [])
# Truncate the path matrix
paths = infospace.paths[:, :path_length]
# Return index will provide the path indices of the non-degenerate paths
_, indices = np.unique(paths, axis=0, return_index=True)
# Sort the indices
indices = sorted(indices)
# Filter out the paths. Not taken from np.unique to ensure the correct
# ordering.
paths = paths[indices, :]
# Truncate the probability matrix
p_matrix = infospace._p_matrix[:, :path_length]
# Filter the probabilities matrix
p_matrix = p_matrix[indices, :]
infospace = InfoSpace(paths, p_matrix)
return infospace if not return_index else infospace, indices
[docs]class PartitionedInfoSpace(InfoSpace):
""" Partitioned Information Space. Constructs a typical set on an
information space to partition it into a typical information space and an atypical one.
Holds path probabilities, typical paths, atypical paths, atypical path probabilities and more. This object will use a provided (often sampled) path space to partition the space into a collection of typical and atypical paths depending on the dynamics provided. Will also track other quantities of interest such as the upper and lower bounds on the path probabilities required for the paths to be considered typical.
Attributes:
paths: the matrix of paths.
probabilities: a list of probabilities each path.
num_paths: the number of paths considered.
path_length: the length of the paths considered.
probabilities: a matrix where the (i,j)th element is the probability of observing the first j states of the ith path.
entropies: a list of path entropies for each path.
entropy_rates: a list of the entropy rates for each various path length. This will be the center of the epsilon-neighborhood for path entropies to qualify paths as typical for.
epsilon: the widths of the neighborhood used for paths to be considered typical for each path length.
upper/lower: the upper/lower bounds as measured in nats. This means that a path is typical if and only if its path entropy rate is within these bounds.
typicalities: a matrix where the (i,j)th element is a boolean determining whether the ith path is typical after j+1 steps.
ts: the typical set.
ats: the atypical set.
"""
def __init__(self, entropy_rates, epsilon, paths=None, p_matrix=None, typical_space=None, atypical_space=None):
""" Generates the PartitionedInfoSpace.
Args:
entropy_rates (np.ndarray): a list of the entropy rates for each various path length. This will be the center of the epsilon-neighborhood for path entropies to qualify paths as typical for.
epsilon (np.ndarray): the widths of the neighborhood used for paths to be considered typical for each path length.
Kwargs:
paths (np.ndarray/None): the entire sampled path space, the union of the typical and atypical spaces. If not provided these spaces will be merged to generate it.
p_matrix (np.ndarray/None): the entire matrix of probabilities for each path and each path length. If not provided, this will be generated by merging the p_matrix of the typical and atypical spaces.
typical_space (InfoSpace/None): the typical set on this space. If None, partitions the provided path space.
atypical_space (InfoSpace): the atypical set on this space. If None, partitions the provided path space.
"""
# Bool if the space simply needs to be partitioned
must_partition = (paths is None) or (p_matrix is None)
# Bool if the space simply needs to be merged since it's already been
# partitioned into a typical and atypical space
must_union = (typical_space is None) or (atypical_space is None)
if must_partition and must_union:
# We need either the paths AND the p_matrix or the tupical/atypical
# spaces to partition/union the spaces respectively.
raise TypeError('In sufficient information provided to partition/union the Information Space. We need either paths with their probabilities or the already partitioned spaces.')
if must_partition:
# Partition the paths and probability matrix into a typical and
# atypical space
# Need to generate the upper/lower bounds for the partitioning
# of the spaces
self._lower = entropy_rates - epsilon
self._upper = entropy_rates + epsilon
ts_paths = []; ts_p_matrix = []
ats_paths = []; ats_p_matrix = []
for path, path_index in enumerate(paths):
path_prob = p_matrix[path_index]
# The path entropy rate for direct comparison with the
# upper/lower bounds
path_entropy_rate = -np.log(path_prob[-1]) / path_length
is_typical = (
(self.lower[-1] <= path_entropy_rate)
and (path_entropy_rate <= self._upper)
)
if is_typical:
ts_paths.append(path)
ts_p_matrix.append(path_prob)
else:
ats_paths.append(path)
ats_p_matrix.append(path_prob)
typical_space = InfoSpace(ts_paths, ts_p_matrix)
atypical_space = InfoSpace(ats_paths, ats_p_matrix)
elif must_union:
# Union the path data
ts_empty = (typical_space.paths.size == 0)
ats_empty = (atypical_space.paths.size == 0)
if not ts_empty and not ats_empty:
# Both are nonempty
paths = np.vstack( (typical_space.paths, atypical_space.paths) )
p_matrix = np.vstack(
(typical_space._p_matrix, atypical_space._p_matrix)
)
elif ts_empty:
# Only the typical_space is empty
paths = atypical_space.paths
p_matrix = atypical_space._p_matrix
else:
# Only the atypical_space is empty
paths = typical_space.paths
p_matrix = typical_space._p_matrix
### Storing properties ###
self._paths = paths
self._p_matrix = p_matrix
self._probabilities = self._p_matrix[:, -1]
self._entropy_rates = entropy_rates
# Generalize the epsilon to a path_length dependent epsilon for
# potential generalizations in child classes.
if isinstance(epsilon, list):
epsilon = np.array(epsilon)
if not isinstance(epsilon, np.ndarray):
# We were only provided a float
epsilon = np.full(self.path_length, epsilon)
self._epsilon = epsilon
self._ts = typical_space
self._ats = atypical_space
#------------- Properties -------------#
@property
def entropy_rates(self):
return self._entropy_rates
@property
def epsilon(self):
return self._epsilon
@property
def upper(self):
try:
return self._upper
except AttributeError:
# It's never been calculated before
self._upper = self.entropy_rates + self.epsilon
return self._upper
@property
def lower(self):
try:
return self._lower
except AttributeError:
# It's never been calculated before.
self._lower = self.entropy_rates - self.epsilon
return self._lower
@property
def typicalities(self):
""" Returns the matrix of typicalities. """
try:
return self._typicalities
except AttributeError:
# It's never been calculated before
typicalities = []
ns = np.arange(1, self.path_length + 1)
path_entropy_rates = -np.log(self._p_matrix) / ns
self._typicalities = (
(self.lower <= path_entropy_rates)
& (path_entropy_rates <= self.upper)
)
return self._typicalities
@property
def ats(self):
return self._ats
@property
def ts(self):
return self._ts
#------------- Static methods -------------#
[docs] @staticmethod
def shorten(pinfospace, path_length, return_index=False):
""" Takes a PartitionedInformationSpace and shortens it. Since unique
paths of length n, may be degenerate when truncated to paths of length m < n, we need to check for degeneracies and filter them out in both paths and probabilities.
Args:
pinfospace (PartitionedInfoSpace): the partitioned information space to shorten.
path_length (int): the path length the information space should be shortened to.
Kwargs:
return_index (bool): returns the indices of the non-degenerate paths for the given path length using the original matrix. Useful for filtering other quantities of interest that may not be attached to this object.
Returns:
(PartitionedInfoSpace): the shortened PartitionedInfoSpace.
"""
# Hold the current information space to access properties
old_pinfospace = pinfospace
# Call parent method
# Paths and p_matrix will be handled here along with any other
# properties shared with parent. Sorted indices of non-degenerate
# paths will be calculated here too.
pinfospace, indices = InfoSpace.shorten(old_pinfospace, path_length, return_index=True)
# Finish the rest of this object's specific properties
# Truncate the entropy_rates
entropy_rates = old_pinfospace.entropy_rates[:path_length]
# Truncate the epsilon
epsilon = old_pinfospace.epsilon[:path_length]
# Truncate the typicalities matrix
# Necessary to re-partition the space.
# Filter out the typicalities matrix
typicalities = old_pinfospace.typicalities[indices, :path_length]
### Partitioning ###
ts_paths, ts_p_matrix = [], []
ats_paths, ats_p_matrix = [], []
paths = pinfospace.paths
p_matrix = pinfospace._p_matrix
for path_index, is_typical in enumerate(typicalities[:, -1]):
path = paths[path_index]
probs = p_matrix[path_index]
if is_typical:
ts_paths.append(path)
ts_p_matrix.append(probs)
else:
ats_paths.append(path)
ats_p_matrix.append(probs)
# The partitioned spaces
ts = InfoSpace(ts_paths, ts_p_matrix)
ats = InfoSpace(ats_paths, ats_p_matrix)
pinfospace = PartitionedInfoSpace(entropy_rates=entropy_rates, epsilon=epsilon, paths=paths, p_matrix=p_matrix, typical_space=ts, atypical_space=ats)
# Save the pre-generated property
pinfospace._typicalities = typicalities
return pinfospace if not return_index else pinfospace, indices
[docs] @staticmethod
def partition_space(R, p, paths, epsilon=0.5, return_p=False):
""" Partitions a path space using the dynamics provided.
Args:
R (np.ndarray/function): the transition matrix, time-dependent if provided as a function.
p (np.ndarray): the initial marginal distribution.
paths (np.ndarray): the portion of the path space to use.
Kwargs:
epsilon (float/np.ndarray): the radius/radii of the epsilon neighborhood to consider paths to be typical within.
return_p (bool): False, return only the PartitionedInfoSpace, True returns both the PartitionedInfoSpace and a list of the marginal vs time.
Returns:
(ParitionedInfoSpace/2-tuple): the PartitionedInfoSpace (PIS) or the PIS and a list of the marginal versus observation step if return_p is True.
"""
#------------- Data preparation -------------#
# Convert the transition matrix to add time-dependence as a constant
# matrix if a constant matrix was provided
if not callable(R):
# Not being saved as an attribute since this is not easily
# recoverable by being saved to a file.
# Emphasize saving properties that can be saved/loaded.
oldR = R
R = lambda n : oldR
num_paths, path_length = paths.shape
p_matrix = np.zeros(paths.shape)
# Initialize the marginal distribution data
for x, path in enumerate(paths):
# Just equal to the initial marginal
p_matrix[x, 0] = p[path[0]]
# Used for the bounds
entropy_rates = np.array([
entropy_rate(R(i))
for i in range(path_length)
])
# The marginal versus time
if return_p: p_vs_time = [p]
#------------- Data gathering -------------#
# bar = gui.ProgressBar(path_length * num_paths, width=300, title='Gathering data...')
### Quantities versus time ###
for current_path_length in range(2, path_length + 1):
# The data index
i = current_path_length - 1
# Since the marginals are zero-indexed as are the paths
step_index = current_path_length - 2
currentR = R(current_path_length - 1)
# Propagate the marginal one step and save it separately
# for quantities like the temporal coarse graining term
pstep = stp.step(currentR, p)
### Path probability calculations ###
for x, path in enumerate(paths):
current_state = path[step_index]
jump_state = path[step_index + 1]
# Forward calculations
# Recursive calculation to save time
last_joint = p_matrix[x, i - 1]
jump_prob = currentR[jump_state, current_state]
p_matrix[x, i] = last_joint * jump_prob
# If updated in each iteration, slows down the simulation
# drastically
# bar.update(amount=num_paths)
if return_p: p_vs_time.append(pstep)
# Finished data gathering for this iteration, propagate marginal
# forward in time
p = pstep
# bar.finish()
### Partitioning ###
upper = entropy_rates + epsilon
lower = entropy_rates - epsilon
ts_paths, ts_p_matrix = [], []
ats_paths, ats_p_matrix = [], []
path_entropy_rates = -np.log(p_matrix[:, -1]) / path_length
# Identify the paths that are typical and atypical
for path_index, path_entropy_rate in enumerate(path_entropy_rates):
# Can't create typicality matrix since partitioning it will
# break the ordering
# Determines whether this path is ultimately typical
is_typical = (
(lower[-1] <= path_entropy_rate)
and (path_entropy_rate <= upper[-1])
)
probs = p_matrix[path_index]
if is_typical:
ts_paths.append(path)
ts_p_matrix.append(probs)
else:
ats_paths.append(path)
ats_p_matrix.append(probs)
# The partitioned spaces
ts = InfoSpace(ts_paths, ts_p_matrix)
ats = InfoSpace(ats_paths, ats_p_matrix)
pinfospace = PartitionedInfoSpace(
entropy_rates=entropy_rates,
epsilon=epsilon,
paths=paths,
p_matrix=p_matrix,
typical_space=ts,
atypical_space=ats
)
# Set pre-calculated properties
pinfospace._upper = upper
pinfospace._lower = lower
return (pinfospace, p_vs_time) if return_p else pinfospace
######################## Entry ########################
def main():
print('info.py')
### Testing ###
p = stp.rand_p(3)
R = stp.self_assembly_transition_matrix()
paths = stp.complete_path_space(3, 4)
pinfospace = PartitionedInfoSpace.partition_space(R, p, paths)
print( f'pinfospace.total_probability: {pinfospace.total_probability}' )
print(pinfospace.ats.num_paths)
if __name__ == '__main__':
main()