123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- """
- Finite Discrete Random Variables Module
- See Also
- ========
- sympy.stats.frv_types
- sympy.stats.rv
- sympy.stats.crv
- """
- from itertools import product
- from sympy.concrete.summations import Sum
- from sympy.core.basic import Basic
- from sympy.core.cache import cacheit
- from sympy.core.function import Lambda
- from sympy.core.mul import Mul
- from sympy.core.numbers import (I, nan)
- from sympy.core.relational import Eq
- from sympy.core.singleton import S
- from sympy.core.symbol import (Dummy, Symbol)
- from sympy.core.sympify import sympify
- from sympy.functions.elementary.exponential import exp
- from sympy.functions.elementary.piecewise import Piecewise
- from sympy.logic.boolalg import (And, Or)
- from sympy.sets.sets import Intersection
- from sympy.core.containers import Dict
- from sympy.core.logic import Logic
- from sympy.core.relational import Relational
- from sympy.core.sympify import _sympify
- from sympy.sets.sets import FiniteSet
- from sympy.stats.rv import (RandomDomain, ProductDomain, ConditionalDomain,
- PSpace, IndependentProductPSpace, SinglePSpace, random_symbols,
- sumsets, rv_subs, NamedArgsMixin, Density, Distribution)
- class FiniteDensity(dict):
- """
- A domain with Finite Density.
- """
- def __call__(self, item):
- """
- Make instance of a class callable.
- If item belongs to current instance of a class, return it.
- Otherwise, return 0.
- """
- item = sympify(item)
- if item in self:
- return self[item]
- else:
- return 0
- @property
- def dict(self):
- """
- Return item as dictionary.
- """
- return dict(self)
- class FiniteDomain(RandomDomain):
- """
- A domain with discrete finite support
- Represented using a FiniteSet.
- """
- is_Finite = True
- @property
- def symbols(self):
- return FiniteSet(sym for sym, val in self.elements)
- @property
- def elements(self):
- return self.args[0]
- @property
- def dict(self):
- return FiniteSet(*[Dict(dict(el)) for el in self.elements])
- def __contains__(self, other):
- return other in self.elements
- def __iter__(self):
- return self.elements.__iter__()
- def as_boolean(self):
- return Or(*[And(*[Eq(sym, val) for sym, val in item]) for item in self])
- class SingleFiniteDomain(FiniteDomain):
- """
- A FiniteDomain over a single symbol/set
- Example: The possibilities of a *single* die roll.
- """
- def __new__(cls, symbol, set):
- if not isinstance(set, FiniteSet) and \
- not isinstance(set, Intersection):
- set = FiniteSet(*set)
- return Basic.__new__(cls, symbol, set)
- @property
- def symbol(self):
- return self.args[0]
- @property
- def symbols(self):
- return FiniteSet(self.symbol)
- @property
- def set(self):
- return self.args[1]
- @property
- def elements(self):
- return FiniteSet(*[frozenset(((self.symbol, elem), )) for elem in self.set])
- def __iter__(self):
- return (frozenset(((self.symbol, elem),)) for elem in self.set)
- def __contains__(self, other):
- sym, val = tuple(other)[0]
- return sym == self.symbol and val in self.set
- class ProductFiniteDomain(ProductDomain, FiniteDomain):
- """
- A Finite domain consisting of several other FiniteDomains
- Example: The possibilities of the rolls of three independent dice
- """
- def __iter__(self):
- proditer = product(*self.domains)
- return (sumsets(items) for items in proditer)
- @property
- def elements(self):
- return FiniteSet(*self)
- class ConditionalFiniteDomain(ConditionalDomain, ProductFiniteDomain):
- """
- A FiniteDomain that has been restricted by a condition
- Example: The possibilities of a die roll under the condition that the
- roll is even.
- """
- def __new__(cls, domain, condition):
- """
- Create a new instance of ConditionalFiniteDomain class
- """
- if condition is True:
- return domain
- cond = rv_subs(condition)
- return Basic.__new__(cls, domain, cond)
- def _test(self, elem):
- """
- Test the value. If value is boolean, return it. If value is equality
- relational (two objects are equal), return it with left-hand side
- being equal to right-hand side. Otherwise, raise ValueError exception.
- """
- val = self.condition.xreplace(dict(elem))
- if val in [True, False]:
- return val
- elif val.is_Equality:
- return val.lhs == val.rhs
- raise ValueError("Undecidable if %s" % str(val))
- def __contains__(self, other):
- return other in self.fulldomain and self._test(other)
- def __iter__(self):
- return (elem for elem in self.fulldomain if self._test(elem))
- @property
- def set(self):
- if isinstance(self.fulldomain, SingleFiniteDomain):
- return FiniteSet(*[elem for elem in self.fulldomain.set
- if frozenset(((self.fulldomain.symbol, elem),)) in self])
- else:
- raise NotImplementedError(
- "Not implemented on multi-dimensional conditional domain")
- def as_boolean(self):
- return FiniteDomain.as_boolean(self)
- class SingleFiniteDistribution(Distribution, NamedArgsMixin):
- def __new__(cls, *args):
- args = list(map(sympify, args))
- return Basic.__new__(cls, *args)
- @staticmethod
- def check(*args):
- pass
- @property # type: ignore
- @cacheit
- def dict(self):
- if self.is_symbolic:
- return Density(self)
- return {k: self.pmf(k) for k in self.set}
- def pmf(self, *args): # to be overridden by specific distribution
- raise NotImplementedError()
- @property
- def set(self): # to be overridden by specific distribution
- raise NotImplementedError()
- values = property(lambda self: self.dict.values)
- items = property(lambda self: self.dict.items)
- is_symbolic = property(lambda self: False)
- __iter__ = property(lambda self: self.dict.__iter__)
- __getitem__ = property(lambda self: self.dict.__getitem__)
- def __call__(self, *args):
- return self.pmf(*args)
- def __contains__(self, other):
- return other in self.set
- #=============================================
- #========= Probability Space ===============
- #=============================================
- class FinitePSpace(PSpace):
- """
- A Finite Probability Space
- Represents the probabilities of a finite number of events.
- """
- is_Finite = True
- def __new__(cls, domain, density):
- density = {sympify(key): sympify(val)
- for key, val in density.items()}
- public_density = Dict(density)
- obj = PSpace.__new__(cls, domain, public_density)
- obj._density = density
- return obj
- def prob_of(self, elem):
- elem = sympify(elem)
- density = self._density
- if isinstance(list(density.keys())[0], FiniteSet):
- return density.get(elem, S.Zero)
- return density.get(tuple(elem)[0][1], S.Zero)
- def where(self, condition):
- assert all(r.symbol in self.symbols for r in random_symbols(condition))
- return ConditionalFiniteDomain(self.domain, condition)
- def compute_density(self, expr):
- expr = rv_subs(expr, self.values)
- d = FiniteDensity()
- for elem in self.domain:
- val = expr.xreplace(dict(elem))
- prob = self.prob_of(elem)
- d[val] = d.get(val, S.Zero) + prob
- return d
- @cacheit
- def compute_cdf(self, expr):
- d = self.compute_density(expr)
- cum_prob = S.Zero
- cdf = []
- for key in sorted(d):
- prob = d[key]
- cum_prob += prob
- cdf.append((key, cum_prob))
- return dict(cdf)
- @cacheit
- def sorted_cdf(self, expr, python_float=False):
- cdf = self.compute_cdf(expr)
- items = list(cdf.items())
- sorted_items = sorted(items, key=lambda val_cumprob: val_cumprob[1])
- if python_float:
- sorted_items = [(v, float(cum_prob))
- for v, cum_prob in sorted_items]
- return sorted_items
- @cacheit
- def compute_characteristic_function(self, expr):
- d = self.compute_density(expr)
- t = Dummy('t', real=True)
- return Lambda(t, sum(exp(I*k*t)*v for k,v in d.items()))
- @cacheit
- def compute_moment_generating_function(self, expr):
- d = self.compute_density(expr)
- t = Dummy('t', real=True)
- return Lambda(t, sum(exp(k*t)*v for k,v in d.items()))
- def compute_expectation(self, expr, rvs=None, **kwargs):
- rvs = rvs or self.values
- expr = rv_subs(expr, rvs)
- probs = [self.prob_of(elem) for elem in self.domain]
- if isinstance(expr, (Logic, Relational)):
- parse_domain = [tuple(elem)[0][1] for elem in self.domain]
- bools = [expr.xreplace(dict(elem)) for elem in self.domain]
- else:
- parse_domain = [expr.xreplace(dict(elem)) for elem in self.domain]
- bools = [True for elem in self.domain]
- return sum([Piecewise((prob * elem, blv), (S.Zero, True))
- for prob, elem, blv in zip(probs, parse_domain, bools)])
- def compute_quantile(self, expr):
- cdf = self.compute_cdf(expr)
- p = Dummy('p', real=True)
- set = ((nan, (p < 0) | (p > 1)),)
- for key, value in cdf.items():
- set = set + ((key, p <= value), )
- return Lambda(p, Piecewise(*set))
- def probability(self, condition):
- cond_symbols = frozenset(rs.symbol for rs in random_symbols(condition))
- cond = rv_subs(condition)
- if not cond_symbols.issubset(self.symbols):
- raise ValueError("Cannot compare foreign random symbols, %s"
- %(str(cond_symbols - self.symbols)))
- if isinstance(condition, Relational) and \
- (not cond.free_symbols.issubset(self.domain.free_symbols)):
- rv = condition.lhs if isinstance(condition.rhs, Symbol) else condition.rhs
- return sum(Piecewise(
- (self.prob_of(elem), condition.subs(rv, list(elem)[0][1])),
- (S.Zero, True)) for elem in self.domain)
- return sympify(sum(self.prob_of(elem) for elem in self.where(condition)))
- def conditional_space(self, condition):
- domain = self.where(condition)
- prob = self.probability(condition)
- density = {key: val / prob
- for key, val in self._density.items() if domain._test(key)}
- return FinitePSpace(domain, density)
- def sample(self, size=(), library='scipy', seed=None):
- """
- Internal sample method
- Returns dictionary mapping RandomSymbol to realization value.
- """
- return {self.value: self.distribution.sample(size, library, seed)}
- class SingleFinitePSpace(SinglePSpace, FinitePSpace):
- """
- A single finite probability space
- Represents the probabilities of a set of random events that can be
- attributed to a single variable/symbol.
- This class is implemented by many of the standard FiniteRV types such as
- Die, Bernoulli, Coin, etc....
- """
- @property
- def domain(self):
- return SingleFiniteDomain(self.symbol, self.distribution.set)
- @property
- def _is_symbolic(self):
- """
- Helper property to check if the distribution
- of the random variable is having symbolic
- dimension.
- """
- return self.distribution.is_symbolic
- @property
- def distribution(self):
- return self.args[1]
- def pmf(self, expr):
- return self.distribution.pmf(expr)
- @property # type: ignore
- @cacheit
- def _density(self):
- return {FiniteSet((self.symbol, val)): prob
- for val, prob in self.distribution.dict.items()}
- @cacheit
- def compute_characteristic_function(self, expr):
- if self._is_symbolic:
- d = self.compute_density(expr)
- t = Dummy('t', real=True)
- ki = Dummy('ki')
- return Lambda(t, Sum(d(ki)*exp(I*ki*t), (ki, self.args[1].low, self.args[1].high)))
- expr = rv_subs(expr, self.values)
- return FinitePSpace(self.domain, self.distribution).compute_characteristic_function(expr)
- @cacheit
- def compute_moment_generating_function(self, expr):
- if self._is_symbolic:
- d = self.compute_density(expr)
- t = Dummy('t', real=True)
- ki = Dummy('ki')
- return Lambda(t, Sum(d(ki)*exp(ki*t), (ki, self.args[1].low, self.args[1].high)))
- expr = rv_subs(expr, self.values)
- return FinitePSpace(self.domain, self.distribution).compute_moment_generating_function(expr)
- def compute_quantile(self, expr):
- if self._is_symbolic:
- raise NotImplementedError("Computing quantile for random variables "
- "with symbolic dimension because the bounds of searching the required "
- "value is undetermined.")
- expr = rv_subs(expr, self.values)
- return FinitePSpace(self.domain, self.distribution).compute_quantile(expr)
- def compute_density(self, expr):
- if self._is_symbolic:
- rv = list(random_symbols(expr))[0]
- k = Dummy('k', integer=True)
- cond = True if not isinstance(expr, (Relational, Logic)) \
- else expr.subs(rv, k)
- return Lambda(k,
- Piecewise((self.pmf(k), And(k >= self.args[1].low,
- k <= self.args[1].high, cond)), (S.Zero, True)))
- expr = rv_subs(expr, self.values)
- return FinitePSpace(self.domain, self.distribution).compute_density(expr)
- def compute_cdf(self, expr):
- if self._is_symbolic:
- d = self.compute_density(expr)
- k = Dummy('k')
- ki = Dummy('ki')
- return Lambda(k, Sum(d(ki), (ki, self.args[1].low, k)))
- expr = rv_subs(expr, self.values)
- return FinitePSpace(self.domain, self.distribution).compute_cdf(expr)
- def compute_expectation(self, expr, rvs=None, **kwargs):
- if self._is_symbolic:
- rv = random_symbols(expr)[0]
- k = Dummy('k', integer=True)
- expr = expr.subs(rv, k)
- cond = True if not isinstance(expr, (Relational, Logic)) \
- else expr
- func = self.pmf(k) * k if cond != True else self.pmf(k) * expr
- return Sum(Piecewise((func, cond), (S.Zero, True)),
- (k, self.distribution.low, self.distribution.high)).doit()
- expr = _sympify(expr)
- expr = rv_subs(expr, rvs)
- return FinitePSpace(self.domain, self.distribution).compute_expectation(expr, rvs, **kwargs)
- def probability(self, condition):
- if self._is_symbolic:
- #TODO: Implement the mechanism for handling queries for symbolic sized distributions.
- raise NotImplementedError("Currently, probability queries are not "
- "supported for random variables with symbolic sized distributions.")
- condition = rv_subs(condition)
- return FinitePSpace(self.domain, self.distribution).probability(condition)
- def conditional_space(self, condition):
- """
- This method is used for transferring the
- computation to probability method because
- conditional space of random variables with
- symbolic dimensions is currently not possible.
- """
- if self._is_symbolic:
- self
- domain = self.where(condition)
- prob = self.probability(condition)
- density = {key: val / prob
- for key, val in self._density.items() if domain._test(key)}
- return FinitePSpace(domain, density)
- class ProductFinitePSpace(IndependentProductPSpace, FinitePSpace):
- """
- A collection of several independent finite probability spaces
- """
- @property
- def domain(self):
- return ProductFiniteDomain(*[space.domain for space in self.spaces])
- @property # type: ignore
- @cacheit
- def _density(self):
- proditer = product(*[iter(space._density.items())
- for space in self.spaces])
- d = {}
- for items in proditer:
- elems, probs = list(zip(*items))
- elem = sumsets(elems)
- prob = Mul(*probs)
- d[elem] = d.get(elem, S.Zero) + prob
- return Dict(d)
- @property # type: ignore
- @cacheit
- def density(self):
- return Dict(self._density)
- def probability(self, condition):
- return FinitePSpace.probability(self, condition)
- def compute_density(self, expr):
- return FinitePSpace.compute_density(self, expr)
|