Skip to content

Commit 0152efd

Browse files
committed
Mixture Models API
- Mixtures for continuous random variables - Stochastic Mixture Models top level trait - Gaussian Process Mixture model implementation
1 parent b3932de commit 0152efd

File tree

6 files changed

+175
-30
lines changed

6 files changed

+175
-30
lines changed

dynaml-core/src/main/scala-2.11/io/github/mandar2812/dynaml/algebra/PartitionedVector.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,25 @@ import org.apache.spark.rdd.RDD
77
import scala.collection.immutable.NumericRange
88

99
/**
10-
* @author mandar2812 date 13/10/2016.
1110
* A distributed vector that is stored in blocks.
1211
* @param data The underlying [[RDD]] which should consist of
1312
* block indices and a breeze [[DenseVector]] containing
1413
* all the elements in the said block.
15-
*/
14+
* @param num_rows Number of elements as a [[Long]], in case not specified
15+
* is calculated on instance creation.
16+
* @param num_row_blocks Number of blocks, in case not specified
17+
* is calculated on instance creation.
18+
* @author mandar2812 date 13/10/2016.
19+
*
20+
* */
1621
private[dynaml] class PartitionedVector(data: Stream[(Long, DenseVector[Double])],
1722
num_rows: Long = -1L,
1823
num_row_blocks: Long = -1L)
1924
extends AbstractPartitionedVector[DenseVector[Double]](data, num_row_blocks)
2025
with NumericOps[PartitionedVector] {
2126

27+
self =>
28+
2229
lazy val rows: Long = if(num_rows == -1L) data.map(_._2.length).sum.toLong else num_rows
2330

2431
override lazy val cols: Long = 1L
@@ -36,6 +43,8 @@ private[dynaml] class PartitionedVector(data: Stream[(Long, DenseVector[Double])
3643

3744
def toBreezeVector = DenseVector.vertcat(data.sortBy(_._1).map(_._2):_*)
3845

46+
def toStream = PartitionedVector.toStream(self)
47+
3948
def reverse: PartitionedVector = map(c => (rowBlocks - 1L - c._1, DenseVector(c._2.toArray.reverse)))
4049

4150
}
@@ -46,7 +55,7 @@ object PartitionedVector {
4655
/**
4756
* Create a [[PartitionedVector]] given the input blocks.
4857
*
49-
*/
58+
* */
5059
def apply(data: Stream[(Long, DenseVector[Double])], num_rows: Long = -1L): PartitionedVector = {
5160

5261
val nC = if(num_rows == -1L) data.map(_._2.length).sum else num_rows
@@ -58,7 +67,7 @@ object PartitionedVector {
5867
/**
5968
* Create a [[PartitionedVector]] from a tabulation function
6069
*
61-
*/
70+
* */
6271
def apply(length: Long, numElementsPerBlock: Int, tabFunc: (Long) => Double): PartitionedVector = {
6372
val num_blocks: Long = math.ceil(length.toDouble/numElementsPerBlock).toLong
6473
val blockIndices = 0L until num_blocks
@@ -76,7 +85,7 @@ object PartitionedVector {
7685
* @param length The size of the stream
7786
* @param num_elements_per_block The size of each block
7887
* @return A [[PartitionedVector]] instance.
79-
*/
88+
* */
8089
def apply(d: Stream[Double], length: Long, num_elements_per_block: Int): PartitionedVector = {
8190
val num_blocks: Long = math.ceil(length.toDouble/num_elements_per_block).toLong
8291
val data = d.grouped(num_elements_per_block)
@@ -92,7 +101,7 @@ object PartitionedVector {
92101
* @param v input vector
93102
* @param num_elements_per_block The size of each block
94103
* @return A [[PartitionedVector]] instance.
95-
*/
104+
* */
96105
def apply(v: DenseVector[Double], num_elements_per_block: Int): PartitionedVector = {
97106
val blocks = v.toArray
98107
.grouped(num_elements_per_block)
@@ -106,7 +115,7 @@ object PartitionedVector {
106115

107116
/**
108117
* Vertically merge a number of partitioned vectors.
109-
*/
118+
* */
110119
def vertcat(vectors: PartitionedVector*): PartitionedVector = {
111120
//sanity check
112121
assert(vectors.map(_.colBlocks).distinct.length == 1,
@@ -121,22 +130,28 @@ object PartitionedVector {
121130

122131
/**
123132
* Populate a partitioned vector with zeros.
124-
*/
133+
* */
125134
def zeros(numElements: Long, numElementsPerBlock: Int): PartitionedVector =
126135
PartitionedVector(numElements, numElementsPerBlock, _ => 0.0)
127136

128137
/**
129138
* Populate a partitioned vector with ones.
130-
*/
139+
* */
131140
def ones(numElements: Long, numElementsPerBlock: Int): PartitionedVector =
132141
PartitionedVector(numElements, numElementsPerBlock, _ => 1.0)
133142

134143
/**
135144
* Populate a partitioned vector with I.I.D samples from a
136145
* specified [[RandomVariable]]
137-
*/
146+
* */
138147
def rand(numElements: Long, numElementsPerBlock: Int, r: RandomVariable[Double]): PartitionedVector =
139-
PartitionedVector(numElements, numElementsPerBlock, _ => r.sample())
148+
PartitionedVector(numElements, numElementsPerBlock, _ => r.draw)
149+
150+
/**
151+
* Convert a [[PartitionedVector]] to a Scala [[Stream]].
152+
* */
153+
def toStream(pvec: PartitionedVector): Stream[Double] =
154+
pvec._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)
140155

141156

142157
}

dynaml-core/src/main/scala-2.11/io/github/mandar2812/dynaml/models/StochasticProcessModel.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ package io.github.mandar2812.dynaml.models
2020

2121
import io.github.mandar2812.dynaml.kernels.CovarianceFunction
2222
import io.github.mandar2812.dynaml.pipes.DataPipe
23-
import io.github.mandar2812.dynaml.probability.ContinuousRandomVariable
23+
import io.github.mandar2812.dynaml.probability.{ContinuousMixtureRV, ContinuousRandomVariable}
2424
import org.apache.log4j.Logger
2525

2626
/**
27-
* date 26/08/16.
2827
* High Level description of a stochastic process based predictive model.
2928
*
30-
* @author mandar2812
3129
* @tparam T The underlying data structure storing the training & test data.
3230
* @tparam I The type of the index set (i.e. Double for time series, DenseVector for GP regression)
3331
* @tparam Y The type of the output label
3432
* @tparam W Implementing class of the posterior distribution
35-
*/
33+
* @author mandar2812 date 26/08/16.
34+
*
35+
* */
3636
trait StochasticProcessModel[T, I, Y, W] extends Model[T, I, Y] {
3737

3838
/** Calculates posterior predictive distribution for
@@ -62,15 +62,14 @@ trait StochasticProcessModel[T, I, Y, W] extends Model[T, I, Y] {
6262
}
6363

6464
/**
65-
* @author mandar2812
66-
*
6765
* Processes which can be specified by upto second order statistics i.e. mean and covariance
6866
* @tparam T The underlying data structure storing the training & test data.
6967
* @tparam I The type of the index set (i.e. Double for time series, DenseVector for GP regression)
7068
* @tparam Y The type of the output label
7169
* @tparam K The type returned by the kernel function.
7270
* @tparam M The data structure holding the kernel/covariance matrix
7371
* @tparam W Implementing class of the posterior distribution
72+
* @author mandar2812
7473
*
7574
* */
7675
trait SecondOrderProcessModel[T, I, Y, K, M, W] extends StochasticProcessModel[T, I, Y, W] {
@@ -92,13 +91,13 @@ trait SecondOrderProcessModel[T, I, Y, K, M, W] extends StochasticProcessModel[T
9291
}
9392

9493
/**
95-
* @author mandar2812 date: 11/10/2016
96-
*
9794
* Blueprint for a continuous valued stochastic process, abstracts away the behavior
9895
* common to sub-classes such as [[io.github.mandar2812.dynaml.models.gp.GPRegression]],
9996
* [[io.github.mandar2812.dynaml.models.stp.StudentTRegression]] and others.
10097
*
101-
*/
98+
* @author mandar2812 date: 11/10/2016
99+
*
100+
* */
102101
abstract class ContinuousProcessModel[T, I, Y, W <: ContinuousRandomVariable[_]]
103102
extends StochasticProcessModel[T, I, Y, W] {
104103

@@ -138,4 +137,17 @@ abstract class ContinuousProcessModel[T, I, Y, W <: ContinuousRandomVariable[_]]
138137
i._2._4))
139138
}
140139

141-
}
140+
}
141+
142+
/**
143+
* A process which is a multinomial mixture of
144+
* component processes.
145+
* @tparam I The type of the index set (i.e. Double for time series, DenseVector for GP regression)
146+
* @tparam Y The type of the output label
147+
* @tparam W Implementing class of the posterior distribution,
148+
* should inherit from [[ContinuousMixtureRV]]
149+
* @author mandar2812 date 14/06/2017
150+
* */
151+
abstract class StochasticProcessMixtureModel[
152+
T, I, Y, W <: ContinuousMixtureRV[_, _]] extends
153+
ContinuousProcessModel[T, I, Y, W]

dynaml-core/src/main/scala-2.11/io/github/mandar2812/dynaml/models/gp/AbstractGPRegressionModel.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ abstract class AbstractGPRegressionModel[T, I: ClassTag](
7474

7575
override protected val g: T = data
7676

77+
def _data = g
78+
7779
val npoints = num
7880

7981
protected var blockSize = 1000
@@ -309,13 +311,12 @@ abstract class AbstractGPRegressionModel[T, I: ClassTag](
309311
val varD: PartitionedVector = bdiag(postcov)
310312
val stdDev = varD._data.map(c => (c._1, sqrt(c._2))).map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)*/
311313

312-
val mean = posterior.mu._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)
314+
val mean = posterior.mu.toStream
313315

314316
val (lower, upper) = posterior.underlyingDist.confidenceInterval(sigma.toDouble)
315317

316-
val lowerErrorBars = lower._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a++b)
317-
val upperErrorBars = upper._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a++b)
318-
318+
val lowerErrorBars = lower.toStream
319+
val upperErrorBars = upper.toStream
319320

320321
logger.info("Generating error bars")
321322
//val preds = (mean zip stdDev).map(j => (j._1, j._1 - sigma*j._2, j._1 + sigma*j._2))
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.github.mandar2812.dynaml.models.gp
2+
3+
import breeze.linalg.DenseVector
4+
import io.github.mandar2812.dynaml.algebra.PartitionedVector
5+
import io.github.mandar2812.dynaml.models.StochasticProcessMixtureModel
6+
import io.github.mandar2812.dynaml.probability.{ContinuousDistrMixture, MultGaussianPRV}
7+
import io.github.mandar2812.dynaml.algebra.PartitionedMatrixOps._
8+
import org.apache.log4j.Logger
9+
10+
import scala.reflect.ClassTag
11+
12+
/**
13+
*
14+
* Represents a multinomial mixture of GP models
15+
* @tparam I The index set (input domain) over which each component GP is
16+
* defined.
17+
*
18+
* @author mandar2812 date 14/06/2017.
19+
* */
20+
class GaussianProcessMixture[I: ClassTag](
21+
val component_processes: Seq[AbstractGPRegressionModel[_, I]],
22+
val weights: DenseVector[Double]) extends
23+
StochasticProcessMixtureModel[
24+
Seq[(I, Double)], I, Double,
25+
ContinuousDistrMixture[
26+
PartitionedVector,
27+
MultGaussianPRV]] {
28+
29+
private val logger = Logger.getLogger(this.getClass)
30+
31+
/**
32+
*
33+
* Calculates posterior predictive distribution for
34+
* a particular set of test data points.
35+
*
36+
* @param test A Sequence or Sequence like data structure
37+
* storing the values of the input patters.
38+
* */
39+
override def predictiveDistribution[U <: Seq[I]](test: U) =
40+
ContinuousDistrMixture[PartitionedVector, MultGaussianPRV](
41+
component_processes.map(_.predictiveDistribution(test)),
42+
weights)
43+
44+
45+
/**
46+
* The training data
47+
* */
48+
override protected val g: Seq[(I, Double)] = Seq()
49+
50+
/**
51+
* Draw three predictions from the posterior predictive distribution
52+
* 1) Mean or MAP estimate Y
53+
* 2) Y- : The lower error bar estimate (mean - sigma*stdDeviation)
54+
* 3) Y+ : The upper error bar. (mean + sigma*stdDeviation)
55+
* */
56+
override def predictionWithErrorBars[U <: Seq[I]](testData: U, sigma: Int) = {
57+
58+
val posterior_components = component_processes.map(_.predictiveDistribution(testData))
59+
60+
val post_means = posterior_components.map(_.mu)
61+
62+
//._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)
63+
64+
val error_bars_components = posterior_components.map(
65+
_.underlyingDist.confidenceInterval(sigma.toDouble)
66+
)
67+
68+
val weightsArr = weights.toArray
69+
70+
val mean = post_means.zip(weightsArr).map(c => c._1*c._2).reduce((a, b) => a+b).toStream
71+
72+
val combined_error_bars_vec = error_bars_components.zip(weightsArr)
73+
.map(c => (c._1._1*c._2,c._1._2*c._2))
74+
.reduce((a,b) => (a._1+b._1, a._2+b._2))
75+
76+
val (lowerErrorBars, upperErrorBars) = (
77+
combined_error_bars_vec._1.toStream,
78+
combined_error_bars_vec._2.toStream)
79+
80+
81+
logger.info("Generating error bars")
82+
//val preds = (mean zip stdDev).map(j => (j._1, j._1 - sigma*j._2, j._1 + sigma*j._2))
83+
val preds = mean.zip(lowerErrorBars.zip(upperErrorBars)).map(t => (t._1, t._2._1, t._2._2))
84+
(testData zip preds).map(i => (i._1, i._2._1, i._2._2, i._2._3))
85+
}
86+
87+
88+
/**
89+
* Convert from the underlying data structure to
90+
* Seq[(I, Y)] where I is the index set of the GP
91+
* and Y is the value/label type.
92+
* */
93+
override def dataAsSeq(data: Seq[(I, Double)]) = data
94+
95+
/**
96+
* Predict the value of the
97+
* target variable given a
98+
* point.
99+
*
100+
* */
101+
override def predict(point: I) = predictionWithErrorBars(Seq(point), 1).head._2
102+
}

dynaml-core/src/main/scala-2.11/io/github/mandar2812/dynaml/models/stp/AbstractSTPRegressionModel.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,12 @@ abstract class AbstractSTPRegressionModel[T, I](
223223
val varD: PartitionedVector = bdiag(postcov)
224224
val stdDev = varD._data.map(c => (c._1, sqrt(c._2))).map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)*/
225225

226-
val mean = posterior.mean._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a ++ b)
226+
val mean = posterior.mean.toStream
227227

228228
val (lower, upper) = posterior.underlyingDist.confidenceInterval(sigma.toDouble)
229229

230-
val lowerErrorBars = lower._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a++b)
231-
val upperErrorBars = upper._data.map(_._2.toArray.toStream).reduceLeft((a, b) => a++b)
230+
val lowerErrorBars = lower.toStream
231+
val upperErrorBars = upper.toStream
232232

233233

234234
logger.info("Generating error bars")

dynaml-core/src/main/scala-2.11/io/github/mandar2812/dynaml/probability/MixtureRV.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,26 @@ object MixtureRV {
3636

3737
}
3838

39+
/**
40+
* A random variable mixture over a continuous domain
41+
* @author mandar2812 date 14/06/2017
42+
* */
3943
trait ContinuousMixtureRV[Domain, BaseRV <: ContinuousRandomVariable[Domain]] extends
4044
ContinuousRandomVariable[Domain] with
4145
MixtureRV[Domain, BaseRV]
4246

43-
47+
/**
48+
* A random variable mixture over a continuous domain,
49+
* having a computable probability distribution
50+
* @author mandar2812 date 14/06/2017
51+
* */
4452
class ContinuousDistrMixture[
4553
Domain, BaseRV <: ContinuousDistrRV[Domain]](
4654
distributions: Seq[BaseRV],
4755
selector: MultinomialRV) extends
4856
ContinuousMixtureRV[Domain, BaseRV] with
4957
ContinuousDistrRV[Domain] {
50-
58+
5159
override val mixture_selector = selector
5260

5361
override val components = distributions
@@ -57,3 +65,10 @@ Domain, BaseRV <: ContinuousDistrRV[Domain]](
5765
selector.underlyingDist.params)
5866
}
5967

68+
object ContinuousDistrMixture {
69+
70+
def apply[Domain, BaseRV <: ContinuousDistrRV[Domain]](
71+
distributions: Seq[BaseRV],
72+
selector: DenseVector[Double]): ContinuousDistrMixture[Domain, BaseRV] =
73+
new ContinuousDistrMixture(distributions, MultinomialRV(selector))
74+
}

0 commit comments

Comments
 (0)