from copy import copy
import math
import operator
import sys
import typing
import warnings
from deprecated.sphinx import deprecated
from interval_search import binary_search
import opytional as opyt
from ..stratum_retention_strategy.stratum_retention_algorithms import (
perfect_resolution_algo,
)
from ._HereditaryStratum import HereditaryStratum
from .stratum_ordered_stores import HereditaryStratumOrderedStoreList
from .stratum_ordered_stores._detail import HereditaryStratumOrderedStoreBase
# define type alias for ordered stores
OrderedStore = typing.Union[
typing.Callable[..., HereditaryStratumOrderedStoreBase],
typing.Tuple[HereditaryStratumOrderedStoreBase, int],
None,
]
class HereditaryStratigraphicColumn:
"""Genetic annotation to enable phylogenetic inference.
Primary end-user facing interface for hstrat library. Should be bundled with
digital genomes and propagated via the CloneDescendant method when passing
those genomes from parent to offspring. Provides basis for phylogenetic
analysis of distributed digital evolution populations.
Naming conventions are derived by analogy to Geological "Stratigraphy"
(i.e., <https://en.wikipedia.org/wiki/Stratigraphy>). The "hereditary
stratigraphy" system provided by this software works by associating an
identifier, referred to as a "stratum," with each elapsed generation along
a line of descent. This allows two "columns" to be aligned to detect the
generation of their most recent common ancestor: strata before the MRCA
will be identical and strata after will differ.
Stratum retention policy and stratum differentia bit width can be configured
to tune the time and space complexity of the column, trading-off with
uncertainty induced on estimates of phylogenetic distance back to the most
recent common ancestor of two columns.
Arbitrary user-defined data can be associated with strata by optional
argument to the CloneDescendant method. (Note that a first stratum is
deposited during column initialization, so an optional annotation argument
may also be provided then.)
"""
__slots__ = (
"_always_store_rank_in_stratum",
"_stratum_differentia_bit_width",
"_num_strata_deposited",
"_stratum_ordered_store",
"_stratum_retention_policy",
)
# if True, strata will be constructed with deposition rank stored even if
# the stratum retention condemner does not require it
_always_store_rank_in_stratum: bool
# how many bits wide of differentia should the deposited strata be
# constructed with?
_stratum_differentia_bit_width: int
# counter tracking the number of strata deposited
# incremented *after* a deposition and its coinciding purge are complete
_num_strata_deposited: int
# data structure storing retained strata
_stratum_ordered_store: typing.Any
# functor specifying stratum retention policy
_stratum_retention_policy: typing.Any
[docs]
def __init__(
self: "HereditaryStratigraphicColumn",
stratum_retention_policy: typing.Any = perfect_resolution_algo.Policy(),
*,
always_store_rank_in_stratum: bool = True,
stratum_differentia_bit_width: int = 64,
initial_stratum_annotation: typing.Optional[typing.Any] = None,
stratum_ordered_store: OrderedStore = None,
stratum_ordered_store_factory: OrderedStore = None, # deprecated
):
"""Initialize column to track a new line of descent.
Deposits a first stratum, so GetNumStrataDeposited() will return 1 after
initialization even though the user has not yet called DepositStratum().
Parameters
----------
stratum_retention_policy : any
Policy struct that implements stratum retention policy by specifying
the set of strata ranks that should be pruned from a hereditary
stratigraphic column when the nth stratum is deposited.
always_store_rank_in_stratum : bool, optional
Should the deposition rank be stored as a data member of generated
strata, even if not strictly necessary?
stratum_differentia_bit_width : int, optional
The bit width of the generated differentia. Default 64, allowing
for 2^64 distinct values.
initial_stratum_annotation: any, optional
Optional object to store as an annotation. Allows arbitrary user-
provided data to be associated with the first stratum deposition in the
line of descent.
stratum_ordered_store: callable or tuple of store and count, optional
One of:
* callable to generate a container that implements the necessary
interface to store strata within the column; can be configured for
performance reasons, but has no semantic effect.
* instance of one aforementioned container along with a deposition count
* None, in which case a default-initialized container will be used
stratum_ordered_store_factory: deprecated, alias of stratum_ordered_store.
Notes
-----
If no condemner or predicate functor specifying a stratum retention
policy is provided, the perfect resolution policy where all strata are
retained is used.
"""
self._always_store_rank_in_stratum = always_store_rank_in_stratum
self._stratum_differentia_bit_width = stratum_differentia_bit_width
self._stratum_retention_policy = stratum_retention_policy
if stratum_ordered_store_factory is not None:
warnings.warn(
"""stratum_ordered_store_factory kwarg is deprecated.
Please use stratum_ordered_store kwarg instead.""",
DeprecationWarning,
)
# disallow mixed use of deprecated and replacement
assert stratum_ordered_store is None
stratum_ordered_store = stratum_ordered_store_factory
if stratum_ordered_store is None:
# if no hstrat ordered store is specified, we use a list
stratum_ordered_store = HereditaryStratumOrderedStoreList
if callable(stratum_ordered_store):
# ordered store is actually an ordered store factory
self._stratum_ordered_store = stratum_ordered_store()
self._num_strata_deposited = 0
self.DepositStratum(annotation=initial_stratum_annotation)
elif isinstance(
stratum_ordered_store[0], HereditaryStratumOrderedStoreBase
):
# ordered store is already an instance of an ordered store
(
self._stratum_ordered_store,
self._num_strata_deposited,
) = stratum_ordered_store
else:
raise ValueError(
"""stratum_ordered_store is of invalid type; \
should be callable or tuple(callable instance, deposition count)"""
)
def __eq__(
self: "HereditaryStratigraphicColumn",
other: typing.Any,
) -> bool:
"""Compare for value-wise equality."""
# adapted from https://stackoverflow.com/a/4522896
return (
isinstance(
other,
self.__class__,
)
and self.__slots__ == other.__slots__
and all(
getter(self) == getter(other)
for getter in [
operator.attrgetter(attr) for attr in self.__slots__
]
)
)
def _CanOmitStratumDepositionRank(
self: "HereditaryStratigraphicColumn",
) -> bool:
"""Does the column's associated stratum retention policy provide
calculation of the rank of a stratum as a function of its position?"""
return self._stratum_retention_policy.CalcRankAtColumnIndex is not None
def _ShouldOmitStratumDepositionRank(
self: "HereditaryStratigraphicColumn",
) -> bool:
"""Decide if deposition rank should be stored in stratum.
Implementation detail to inspect configured stratum retention policy
and manual override to decide whether deposition rank should be stored
as a data member of generated strata.
Note that strata are not required to be stored if the stratum retention
policy allows for the rank of a stratum to be calculated as a function
of its position within the column and the number of strata deposited
onto the column. However, it may be beneficial to store the stratum
anyways for performance reasons if this calculation is expensive.
"""
return (
self._CanOmitStratumDepositionRank()
and not self._always_store_rank_in_stratum
)
def _CreateStratum(
self: "HereditaryStratigraphicColumn",
deposition_rank: int,
annotation: typing.Optional[typing.Any] = None,
differentia: typing.Optional[int] = None,
) -> HereditaryStratum:
"""Create a hereditary stratum with stored configuration attributes."""
return HereditaryStratum(
annotation=annotation,
deposition_rank=(
# don't store deposition rank if we know how to
# calculate it from stratum's position in column
None
if self._ShouldOmitStratumDepositionRank()
else deposition_rank
),
differentia_bit_width=self._stratum_differentia_bit_width,
differentia=differentia,
)
[docs]
def DepositStratum(
self: "HereditaryStratigraphicColumn",
annotation: typing.Optional[typing.Any] = None,
) -> None:
"""Elapse a generation.
Parameters
----------
annotation: any, optional
Optional object to store as an annotation. Allows arbitrary user-
provided data to be associated with this stratum deposition in the
line of descent.
"""
new_stratum = self._CreateStratum(
deposition_rank=self._num_strata_deposited,
annotation=annotation,
)
self._stratum_ordered_store.DepositStratum(
rank=self._num_strata_deposited,
stratum=new_stratum,
)
self._PurgeColumn()
self._num_strata_deposited += 1
[docs]
def DepositStrata(
self: "HereditaryStratigraphicColumn",
num_stratum_depositions: int,
) -> None:
"""Elapse n generations.
Parameters
----------
num_stratum_depositions: int
How many generations to elapse?
"""
# fallback to naive approach if IterRetainedRanks not available
policy = self._stratum_retention_policy
if policy.IterRetainedRanks is None or num_stratum_depositions <= 1:
for _ in range(num_stratum_depositions):
self.DepositStratum()
return
prior_deposit_count = self.GetNumStrataDeposited()
target_deposit_count = prior_deposit_count + num_stratum_depositions
# for python 3.7+, dictionaries are guaranteed insertion ordered
assert sys.version_info >= (3, 7)
target_retained_ranks = {
rank: None
for rank in policy.IterRetainedRanks(target_deposit_count)
}
# delete no-longer-needed ranks
self._stratum_ordered_store.DelRanks(
ranks=(
rank
for rank in self.IterRetainedRanks()
if rank not in target_retained_ranks
),
get_column_index_of_rank=self.GetColumnIndexOfRank,
)
# add new retained ranks
for rank in target_retained_ranks:
if rank >= prior_deposit_count:
self._stratum_ordered_store.DepositStratum(
rank=rank,
stratum=self._CreateStratum(rank),
)
# update generation counter
self._num_strata_deposited += num_stratum_depositions
def _PurgeColumn(self: "HereditaryStratigraphicColumn") -> None:
"""Discard stored strata according to the configured retention policy.
Implementation detail. Called after a new stratum has been appended to
the column's store but before it is considered fully deposited (i.e.,
it is reflected in the column's internal deposition counter).
"""
condemned_ranks = self._stratum_retention_policy.GenDropRanks(
num_stratum_depositions_completed=self.GetNumStrataDeposited(),
retained_ranks=self.IterRetainedRanks(),
)
self._stratum_ordered_store.DelRanks(
ranks=condemned_ranks,
get_column_index_of_rank=self.GetColumnIndexOfRank,
)
[docs]
def GetNextRank(self: "HereditaryStratigraphicColumn") -> int:
"""Get the next rank to be deposited on the column.
Returns
-------
int
The next rank to be deposited on the column.
"""
return self.GetNumStrataDeposited()
[docs]
def IterRetainedRanks(
self: "HereditaryStratigraphicColumn",
) -> typing.Iterator[int]:
"""Iterate over deposition ranks of strata stored in the column.
The store may be altered during iteration without iterator
invalidation, although subsequent updates will not be reflected in the
iterator.
"""
if self._ShouldOmitStratumDepositionRank():
if hasattr(self._stratum_retention_policy, "IterRetainedRanks"):
yield from self._stratum_retention_policy.IterRetainedRanks(
self.GetNumStrataDeposited()
)
else:
for idx in range(self.GetNumStrataRetained()):
yield self.GetRankAtColumnIndex(idx)
else:
yield from self._stratum_ordered_store.IterRetainedRanks()
[docs]
def IterRetainedStrata(
self: "HereditaryStratigraphicColumn",
) -> typing.Iterator[HereditaryStratum]:
"""Iterate over strata stored in the column.
Strata yielded from most ancient to most recent.
"""
yield from self._stratum_ordered_store.IterRetainedStrata()
[docs]
def IterRetainedDifferentia(
self: "HereditaryStratigraphicColumn",
) -> typing.Iterator[int]:
"""Iterate over differentia of strata stored in the column.
Differentia yielded from most ancient to most recent.
"""
return map(lambda x: x.GetDifferentia(), self.IterRetainedStrata())
[docs]
def IterRankDifferentiaZip(
self: "HereditaryStratigraphicColumn",
copyable: bool = False,
) -> typing.Iterator[typing.Tuple[int, int]]:
"""Iterate over ranks of retained strata and their differentia.
If `copyable`, return an iterator that can be copied to produce a new
fully-independent iterator at the same position.
Equivalent to `zip(col.IterRetainedRanks(),
col.IterRetainedDifferentia())`, but may be more efficient.
"""
res = self._stratum_ordered_store.IterRankDifferentiaZip(
get_rank_at_column_index=(
self.GetRankAtColumnIndex
if self._ShouldOmitStratumDepositionRank()
else None
),
)
if copyable:
res = iter([*res])
return res
[docs]
def HasAnyAnnotations(
self: "HereditaryStratigraphicColumn",
) -> bool:
"""Do any retained strata have annotations?"""
return any(
stratum.GetAnnotation() is not None
for stratum in self._stratum_ordered_store.IterRetainedStrata()
)
[docs]
def GetNumStrataRetained(self: "HereditaryStratigraphicColumn") -> int:
"""How many strata are currently stored within the column?
May be fewer than the number of strata deposited if strata have been
discarded as part of the configured stratum retention policy.
"""
return self._stratum_ordered_store.GetNumStrataRetained()
[docs]
def GetNumStrataDeposited(self: "HereditaryStratigraphicColumn") -> int:
"""How many strata have been deposited on the column?
Note that a first stratum is deposited on the column during
initialization.
"""
return self._num_strata_deposited
[docs]
def GetStratumAtColumnIndex(
self: "HereditaryStratigraphicColumn",
index: int,
) -> HereditaryStratum:
"""Get the stratum positioned at index i among retained strata.
Index order is from most ancient (index 0) to most recent.
"""
return self._stratum_ordered_store.GetStratumAtColumnIndex(
index,
get_rank_at_column_index=(
self.GetRankAtColumnIndex
if self._ShouldOmitStratumDepositionRank()
else None
),
)
[docs]
def GetStratumAtRank(
self: "HereditaryStratigraphicColumn",
rank: int,
) -> typing.Optional[HereditaryStratum]:
"""Get the stratum deposited at generation g.
Returns None if stratum with rank g is not retained.
"""
return opyt.apply_if(
self.GetColumnIndexOfRank(rank),
self.GetStratumAtColumnIndex,
)
[docs]
def GetRankAtColumnIndex(
self: "HereditaryStratigraphicColumn",
index: int,
) -> int:
"""Map column position to generation of deposition.
What is the deposition rank of the stratum positioned at index i
among retained strata? Index order is from most ancient (index 0) to
most recent.
"""
if self._ShouldOmitStratumDepositionRank():
return self._stratum_retention_policy.CalcRankAtColumnIndex(
index=index,
num_strata_deposited=self.GetNumStrataDeposited(),
)
else:
# fall back to store lookup
return self._stratum_ordered_store.GetRankAtColumnIndex(index)
[docs]
def GetColumnIndexOfRank(
self: "HereditaryStratigraphicColumn",
rank: int,
) -> typing.Optional[int]:
"""Map generation of deposition to column position.
What is the index position within retained strata of the stratum
deposited at rank r? Returns None if no stratum with rank r is present
within the store.
"""
if self._ShouldOmitStratumDepositionRank():
assert self.GetNumStrataRetained()
res_idx = binary_search(
lambda idx: self.GetRankAtColumnIndex(idx) >= rank,
0,
self.GetNumStrataRetained() - 1,
)
if res_idx is None:
return None
elif self.GetRankAtColumnIndex(res_idx) == rank:
return res_idx
else:
return None
else:
# fall back to store lookup
return self._stratum_ordered_store.GetColumnIndexOfRank(rank=rank)
[docs]
def GetNumDiscardedStrata(
self: "HereditaryStratigraphicColumn",
) -> int:
"""How many deposited strata have been discarded?
Determined by number of generations elapsed and the configured column
retention policy.
"""
return self.GetNumStrataDeposited() - self.GetNumStrataRetained()
[docs]
def GetStratumDifferentiaBitWidth(
self: "HereditaryStratigraphicColumn",
) -> int:
"""How many bits wide are the differentia of strata?"""
return self._stratum_differentia_bit_width
[docs]
def HasDiscardedStrata(
self: "HereditaryStratigraphicColumn",
) -> bool:
"""Have any deposited strata been discarded?"""
return self.GetNumDiscardedStrata() > 0
[docs]
@deprecated(
version="1.6.0",
reason="Use calc_probability_differentia_collision_between",
)
def CalcProbabilityDifferentiaCollision(
self: "HereditaryStratigraphicColumn",
) -> float:
"""How likely are differentia collisions?
Calculates the probability of two randomly-differentiated differentia
being identical by coincidence.
"""
return 1.0 / 2**self._stratum_differentia_bit_width
[docs]
@deprecated(
version="1.6.0",
reason="Use "
"calc_min_implausible_spurious_differentia_collisions_between",
)
def CalcMinImplausibleSpuriousConsecutiveDifferentiaCollisions(
self: "HereditaryStratigraphicColumn",
significance_level: float,
) -> int:
"""Determine amount of evidence required to indicate shared ancestry.
Calculates how many differentia collisions are required to reject the
null hypothesis that columns do not share common ancestry at those
ranks at significance level significance_level.
"""
assert 0.0 <= significance_level <= 1.0
log_base = self.CalcProbabilityDifferentiaCollision()
return int(math.ceil(math.log(significance_level, log_base)))
[docs]
def Clone(
self: "HereditaryStratigraphicColumn",
) -> "HereditaryStratigraphicColumn":
"""Create an independent copy of the column.
Contains identical data but may be freely altered without affecting
data within this column.
"""
# shallow copy
result = copy(self)
# do semi-shallow duplication on select elements
result._stratum_ordered_store = self._stratum_ordered_store.Clone()
return result
[docs]
def CloneDescendant(
self: "HereditaryStratigraphicColumn",
stratum_annotation: typing.Optional[typing.Any] = None,
) -> "HereditaryStratigraphicColumn":
"""Return a cloned column that has had an additional stratum deposited.
Does not alter self.
Parameters
----------
stratum_annotation: any, optional
Optional object to store as an annotation. Allows arbitrary user-
provided data to be associated with this stratum deposition in the
line of descent.
"""
res = self.Clone()
res.DepositStratum(annotation=stratum_annotation)
return res
[docs]
def CloneNthDescendant(
self: "HereditaryStratigraphicColumn",
num_stratum_depositions: int,
) -> "HereditaryStratigraphicColumn":
"""Return a cloned column that has had n additional strata deposited.
Does not alter self.
Parameters
----------
num_stratum_depositions: int
How many generations should clone column be descended?
"""
res = self.Clone()
res.DepositStrata(num_stratum_depositions)
return res