123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- from sympy.concrete.summations import (Sum, summation)
- from sympy.core.basic import Basic
- from sympy.core.cache import cacheit
- from sympy.core.function import Lambda
- from sympy.core.numbers import I
- from sympy.core.relational import (Eq, Ne)
- from sympy.core.singleton import S
- from sympy.core.symbol import (Dummy, symbols)
- from sympy.core.sympify import sympify
- from sympy.functions.combinatorial.factorials import factorial
- from sympy.functions.elementary.exponential import exp
- from sympy.functions.elementary.integers import floor
- from sympy.functions.elementary.piecewise import Piecewise
- from sympy.logic.boolalg import And
- from sympy.polys.polytools import poly
- from sympy.series.series import series
- from sympy.polys.polyerrors import PolynomialError
- from sympy.stats.crv import reduce_rational_inequalities_wrap
- from sympy.stats.rv import (NamedArgsMixin, SinglePSpace, SingleDomain,
- random_symbols, PSpace, ConditionalDomain, RandomDomain,
- ProductDomain, Distribution)
- from sympy.stats.symbolic_probability import Probability
- from sympy.sets.fancysets import Range, FiniteSet
- from sympy.sets.sets import Union
- from sympy.sets.contains import Contains
- from sympy.utilities import filldedent
- from sympy.core.sympify import _sympify
- class DiscreteDistribution(Distribution):
- def __call__(self, *args):
- return self.pdf(*args)
- class SingleDiscreteDistribution(DiscreteDistribution, NamedArgsMixin):
- """ Discrete distribution of a single variable.
- Serves as superclass for PoissonDistribution etc....
- Provides methods for pdf, cdf, and sampling
- See Also:
- sympy.stats.crv_types.*
- """
- set = S.Integers
- def __new__(cls, *args):
- args = list(map(sympify, args))
- return Basic.__new__(cls, *args)
- @staticmethod
- def check(*args):
- pass
- @cacheit
- def compute_cdf(self, **kwargs):
- """ Compute the CDF from the PDF.
- Returns a Lambda.
- """
- x = symbols('x', integer=True, cls=Dummy)
- z = symbols('z', real=True, cls=Dummy)
- left_bound = self.set.inf
- # CDF is integral of PDF from left bound to z
- pdf = self.pdf(x)
- cdf = summation(pdf, (x, left_bound, floor(z)), **kwargs)
- # CDF Ensure that CDF left of left_bound is zero
- cdf = Piecewise((cdf, z >= left_bound), (0, True))
- return Lambda(z, cdf)
- def _cdf(self, x):
- return None
- def cdf(self, x, **kwargs):
- """ Cumulative density function """
- if not kwargs:
- cdf = self._cdf(x)
- if cdf is not None:
- return cdf
- return self.compute_cdf(**kwargs)(x)
- @cacheit
- def compute_characteristic_function(self, **kwargs):
- """ Compute the characteristic function from the PDF.
- Returns a Lambda.
- """
- x, t = symbols('x, t', real=True, cls=Dummy)
- pdf = self.pdf(x)
- cf = summation(exp(I*t*x)*pdf, (x, self.set.inf, self.set.sup))
- return Lambda(t, cf)
- def _characteristic_function(self, t):
- return None
- def characteristic_function(self, t, **kwargs):
- """ Characteristic function """
- if not kwargs:
- cf = self._characteristic_function(t)
- if cf is not None:
- return cf
- return self.compute_characteristic_function(**kwargs)(t)
- @cacheit
- def compute_moment_generating_function(self, **kwargs):
- t = Dummy('t', real=True)
- x = Dummy('x', integer=True)
- pdf = self.pdf(x)
- mgf = summation(exp(t*x)*pdf, (x, self.set.inf, self.set.sup))
- return Lambda(t, mgf)
- def _moment_generating_function(self, t):
- return None
- def moment_generating_function(self, t, **kwargs):
- if not kwargs:
- mgf = self._moment_generating_function(t)
- if mgf is not None:
- return mgf
- return self.compute_moment_generating_function(**kwargs)(t)
- @cacheit
- def compute_quantile(self, **kwargs):
- """ Compute the Quantile from the PDF.
- Returns a Lambda.
- """
- x = Dummy('x', integer=True)
- p = Dummy('p', real=True)
- left_bound = self.set.inf
- pdf = self.pdf(x)
- cdf = summation(pdf, (x, left_bound, x), **kwargs)
- set = ((x, p <= cdf), )
- return Lambda(p, Piecewise(*set))
- def _quantile(self, x):
- return None
- def quantile(self, x, **kwargs):
- """ Cumulative density function """
- if not kwargs:
- quantile = self._quantile(x)
- if quantile is not None:
- return quantile
- return self.compute_quantile(**kwargs)(x)
- def expectation(self, expr, var, evaluate=True, **kwargs):
- """ Expectation of expression over distribution """
- # TODO: support discrete sets with non integer stepsizes
- if evaluate:
- try:
- p = poly(expr, var)
- t = Dummy('t', real=True)
- mgf = self.moment_generating_function(t)
- deg = p.degree()
- taylor = poly(series(mgf, t, 0, deg + 1).removeO(), t)
- result = 0
- for k in range(deg+1):
- result += p.coeff_monomial(var ** k) * taylor.coeff_monomial(t ** k) * factorial(k)
- return result
- except PolynomialError:
- return summation(expr * self.pdf(var),
- (var, self.set.inf, self.set.sup), **kwargs)
- else:
- return Sum(expr * self.pdf(var),
- (var, self.set.inf, self.set.sup), **kwargs)
- def __call__(self, *args):
- return self.pdf(*args)
- class DiscreteDomain(RandomDomain):
- """
- A domain with discrete support with step size one.
- Represented using symbols and Range.
- """
- is_Discrete = True
- class SingleDiscreteDomain(DiscreteDomain, SingleDomain):
- def as_boolean(self):
- return Contains(self.symbol, self.set)
- class ConditionalDiscreteDomain(DiscreteDomain, ConditionalDomain):
- """
- Domain with discrete support of step size one, that is restricted by
- some condition.
- """
- @property
- def set(self):
- rv = self.symbols
- if len(self.symbols) > 1:
- raise NotImplementedError(filldedent('''
- Multivariate conditional domains are not yet implemented.'''))
- rv = list(rv)[0]
- return reduce_rational_inequalities_wrap(self.condition,
- rv).intersect(self.fulldomain.set)
- class DiscretePSpace(PSpace):
- is_real = True
- is_Discrete = True
- @property
- def pdf(self):
- return self.density(*self.symbols)
- def where(self, condition):
- rvs = random_symbols(condition)
- assert all(r.symbol in self.symbols for r in rvs)
- if len(rvs) > 1:
- raise NotImplementedError(filldedent('''Multivariate discrete
- random variables are not yet supported.'''))
- conditional_domain = reduce_rational_inequalities_wrap(condition,
- rvs[0])
- conditional_domain = conditional_domain.intersect(self.domain.set)
- return SingleDiscreteDomain(rvs[0].symbol, conditional_domain)
- def probability(self, condition):
- complement = isinstance(condition, Ne)
- if complement:
- condition = Eq(condition.args[0], condition.args[1])
- try:
- _domain = self.where(condition).set
- if condition == False or _domain is S.EmptySet:
- return S.Zero
- if condition == True or _domain == self.domain.set:
- return S.One
- prob = self.eval_prob(_domain)
- except NotImplementedError:
- from sympy.stats.rv import density
- expr = condition.lhs - condition.rhs
- dens = density(expr)
- if not isinstance(dens, DiscreteDistribution):
- from sympy.stats.drv_types import DiscreteDistributionHandmade
- dens = DiscreteDistributionHandmade(dens)
- z = Dummy('z', real=True)
- space = SingleDiscretePSpace(z, dens)
- prob = space.probability(condition.__class__(space.value, 0))
- if prob is None:
- prob = Probability(condition)
- return prob if not complement else S.One - prob
- def eval_prob(self, _domain):
- sym = list(self.symbols)[0]
- if isinstance(_domain, Range):
- n = symbols('n', integer=True)
- inf, sup, step = (r for r in _domain.args)
- summand = ((self.pdf).replace(
- sym, n*step))
- rv = summation(summand,
- (n, inf/step, (sup)/step - 1)).doit()
- return rv
- elif isinstance(_domain, FiniteSet):
- pdf = Lambda(sym, self.pdf)
- rv = sum(pdf(x) for x in _domain)
- return rv
- elif isinstance(_domain, Union):
- rv = sum(self.eval_prob(x) for x in _domain.args)
- return rv
- def conditional_space(self, condition):
- # XXX: Converting from set to tuple. The order matters to Lambda
- # though so we should be starting with a set...
- density = Lambda(tuple(self.symbols), self.pdf/self.probability(condition))
- condition = condition.xreplace({rv: rv.symbol for rv in self.values})
- domain = ConditionalDiscreteDomain(self.domain, condition)
- return DiscretePSpace(domain, density)
- class ProductDiscreteDomain(ProductDomain, DiscreteDomain):
- def as_boolean(self):
- return And(*[domain.as_boolean for domain in self.domains])
- class SingleDiscretePSpace(DiscretePSpace, SinglePSpace):
- """ Discrete probability space over a single univariate variable """
- is_real = True
- @property
- def set(self):
- return self.distribution.set
- @property
- def domain(self):
- return SingleDiscreteDomain(self.symbol, self.set)
- def sample(self, size=(), library='scipy', seed=None):
- """
- Internal sample method.
- Returns dictionary mapping RandomSymbol to realization value.
- """
- return {self.value: self.distribution.sample(size, library=library, seed=seed)}
- def compute_expectation(self, expr, rvs=None, evaluate=True, **kwargs):
- rvs = rvs or (self.value,)
- if self.value not in rvs:
- return expr
- expr = _sympify(expr)
- expr = expr.xreplace({rv: rv.symbol for rv in rvs})
- x = self.value.symbol
- try:
- return self.distribution.expectation(expr, x, evaluate=evaluate,
- **kwargs)
- except NotImplementedError:
- return Sum(expr * self.pdf, (x, self.set.inf, self.set.sup),
- **kwargs)
- def compute_cdf(self, expr, **kwargs):
- if expr == self.value:
- x = Dummy("x", real=True)
- return Lambda(x, self.distribution.cdf(x, **kwargs))
- else:
- raise NotImplementedError()
- def compute_density(self, expr, **kwargs):
- if expr == self.value:
- return self.distribution
- raise NotImplementedError()
- def compute_characteristic_function(self, expr, **kwargs):
- if expr == self.value:
- t = Dummy("t", real=True)
- return Lambda(t, self.distribution.characteristic_function(t, **kwargs))
- else:
- raise NotImplementedError()
- def compute_moment_generating_function(self, expr, **kwargs):
- if expr == self.value:
- t = Dummy("t", real=True)
- return Lambda(t, self.distribution.moment_generating_function(t, **kwargs))
- else:
- raise NotImplementedError()
- def compute_quantile(self, expr, **kwargs):
- if expr == self.value:
- p = Dummy("p", real=True)
- return Lambda(p, self.distribution.quantile(p, **kwargs))
- else:
- raise NotImplementedError()
|