123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- from sympy.concrete.summations import Sum
- from sympy.core.basic import Basic
- from sympy.core.function import Lambda
- from sympy.core.symbol import Dummy
- from sympy.integrals.integrals import Integral
- from sympy.stats.rv import (NamedArgsMixin, random_symbols, _symbol_converter,
- PSpace, RandomSymbol, is_random, Distribution)
- from sympy.stats.crv import ContinuousDistribution, SingleContinuousPSpace
- from sympy.stats.drv import DiscreteDistribution, SingleDiscretePSpace
- from sympy.stats.frv import SingleFiniteDistribution, SingleFinitePSpace
- from sympy.stats.crv_types import ContinuousDistributionHandmade
- from sympy.stats.drv_types import DiscreteDistributionHandmade
- from sympy.stats.frv_types import FiniteDistributionHandmade
- class CompoundPSpace(PSpace):
- """
- A temporary Probability Space for the Compound Distribution. After
- Marginalization, this returns the corresponding Probability Space of the
- parent distribution.
- """
- def __new__(cls, s, distribution):
- s = _symbol_converter(s)
- if isinstance(distribution, ContinuousDistribution):
- return SingleContinuousPSpace(s, distribution)
- if isinstance(distribution, DiscreteDistribution):
- return SingleDiscretePSpace(s, distribution)
- if isinstance(distribution, SingleFiniteDistribution):
- return SingleFinitePSpace(s, distribution)
- if not isinstance(distribution, CompoundDistribution):
- raise ValueError("%s should be an isinstance of "
- "CompoundDistribution"%(distribution))
- return Basic.__new__(cls, s, distribution)
- @property
- def value(self):
- return RandomSymbol(self.symbol, self)
- @property
- def symbol(self):
- return self.args[0]
- @property
- def is_Continuous(self):
- return self.distribution.is_Continuous
- @property
- def is_Finite(self):
- return self.distribution.is_Finite
- @property
- def is_Discrete(self):
- return self.distribution.is_Discrete
- @property
- def distribution(self):
- return self.args[1]
- @property
- def pdf(self):
- return self.distribution.pdf(self.symbol)
- @property
- def set(self):
- return self.distribution.set
- @property
- def domain(self):
- return self._get_newpspace().domain
- def _get_newpspace(self, evaluate=False):
- x = Dummy('x')
- parent_dist = self.distribution.args[0]
- func = Lambda(x, self.distribution.pdf(x, evaluate))
- new_pspace = self._transform_pspace(self.symbol, parent_dist, func)
- if new_pspace is not None:
- return new_pspace
- message = ("Compound Distribution for %s is not implemeted yet" % str(parent_dist))
- raise NotImplementedError(message)
- def _transform_pspace(self, sym, dist, pdf):
- """
- This function returns the new pspace of the distribution using handmade
- Distributions and their corresponding pspace.
- """
- pdf = Lambda(sym, pdf(sym))
- _set = dist.set
- if isinstance(dist, ContinuousDistribution):
- return SingleContinuousPSpace(sym, ContinuousDistributionHandmade(pdf, _set))
- elif isinstance(dist, DiscreteDistribution):
- return SingleDiscretePSpace(sym, DiscreteDistributionHandmade(pdf, _set))
- elif isinstance(dist, SingleFiniteDistribution):
- dens = {k: pdf(k) for k in _set}
- return SingleFinitePSpace(sym, FiniteDistributionHandmade(dens))
- def compute_density(self, expr, *, compound_evaluate=True, **kwargs):
- new_pspace = self._get_newpspace(compound_evaluate)
- expr = expr.subs({self.value: new_pspace.value})
- return new_pspace.compute_density(expr, **kwargs)
- def compute_cdf(self, expr, *, compound_evaluate=True, **kwargs):
- new_pspace = self._get_newpspace(compound_evaluate)
- expr = expr.subs({self.value: new_pspace.value})
- return new_pspace.compute_cdf(expr, **kwargs)
- def compute_expectation(self, expr, rvs=None, evaluate=False, **kwargs):
- new_pspace = self._get_newpspace(evaluate)
- expr = expr.subs({self.value: new_pspace.value})
- if rvs:
- rvs = rvs.subs({self.value: new_pspace.value})
- if isinstance(new_pspace, SingleFinitePSpace):
- return new_pspace.compute_expectation(expr, rvs, **kwargs)
- return new_pspace.compute_expectation(expr, rvs, evaluate, **kwargs)
- def probability(self, condition, *, compound_evaluate=True, **kwargs):
- new_pspace = self._get_newpspace(compound_evaluate)
- condition = condition.subs({self.value: new_pspace.value})
- return new_pspace.probability(condition)
- def conditional_space(self, condition, *, compound_evaluate=True, **kwargs):
- new_pspace = self._get_newpspace(compound_evaluate)
- condition = condition.subs({self.value: new_pspace.value})
- return new_pspace.conditional_space(condition)
- class CompoundDistribution(Distribution, NamedArgsMixin):
- """
- Class for Compound Distributions.
- Parameters
- ==========
- dist : Distribution
- Distribution must contain a random parameter
- Examples
- ========
- >>> from sympy.stats.compound_rv import CompoundDistribution
- >>> from sympy.stats.crv_types import NormalDistribution
- >>> from sympy.stats import Normal
- >>> from sympy.abc import x
- >>> X = Normal('X', 2, 4)
- >>> N = NormalDistribution(X, 4)
- >>> C = CompoundDistribution(N)
- >>> C.set
- Interval(-oo, oo)
- >>> C.pdf(x, evaluate=True).simplify()
- exp(-x**2/64 + x/16 - 1/16)/(8*sqrt(pi))
- References
- ==========
- .. [1] https://en.wikipedia.org/wiki/Compound_probability_distribution
- """
- def __new__(cls, dist):
- if not isinstance(dist, (ContinuousDistribution,
- SingleFiniteDistribution, DiscreteDistribution)):
- message = "Compound Distribution for %s is not implemeted yet" % str(dist)
- raise NotImplementedError(message)
- if not cls._compound_check(dist):
- return dist
- return Basic.__new__(cls, dist)
- @property
- def set(self):
- return self.args[0].set
- @property
- def is_Continuous(self):
- return isinstance(self.args[0], ContinuousDistribution)
- @property
- def is_Finite(self):
- return isinstance(self.args[0], SingleFiniteDistribution)
- @property
- def is_Discrete(self):
- return isinstance(self.args[0], DiscreteDistribution)
- def pdf(self, x, evaluate=False):
- dist = self.args[0]
- randoms = [rv for rv in dist.args if is_random(rv)]
- if isinstance(dist, SingleFiniteDistribution):
- y = Dummy('y', integer=True, negative=False)
- expr = dist.pmf(y)
- else:
- y = Dummy('y')
- expr = dist.pdf(y)
- for rv in randoms:
- expr = self._marginalise(expr, rv, evaluate)
- return Lambda(y, expr)(x)
- def _marginalise(self, expr, rv, evaluate):
- if isinstance(rv.pspace.distribution, SingleFiniteDistribution):
- rv_dens = rv.pspace.distribution.pmf(rv)
- else:
- rv_dens = rv.pspace.distribution.pdf(rv)
- rv_dom = rv.pspace.domain.set
- if rv.pspace.is_Discrete or rv.pspace.is_Finite:
- expr = Sum(expr*rv_dens, (rv, rv_dom._inf,
- rv_dom._sup))
- else:
- expr = Integral(expr*rv_dens, (rv, rv_dom._inf,
- rv_dom._sup))
- if evaluate:
- return expr.doit()
- return expr
- @classmethod
- def _compound_check(self, dist):
- """
- Checks if the given distribution contains random parameters.
- """
- randoms = []
- for arg in dist.args:
- randoms.extend(random_symbols(arg))
- if len(randoms) == 0:
- return False
- return True
|