Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/streaming/streamingkmeans/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Streaming k-means clustering
==============================
## Introduction
This application is following Streaming k-means clustering on Spark, you can see for details at
<https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html>.

The DataSource used is `RandomRBFGenerator`, which is referenced by Huawei `StreamDM` <https://github.com/huawei-noah/streamDM>.

## Gearpump topology
The Gearpump topology is as following:

![kmeans](https://cloud.githubusercontent.com/assets/5796671/14097520/93a2b498-f5a4-11e5-8df8-ef2b62c3b5ff.PNG)

The `Source Processor` will produce points by time, then broadcast the point to the `Distribution Processor`.
The number of tasks of the `Distribution Processor` is k, where each task save one center and the corresponding points.
When `Distribution Processor` receives a point from `Source Processor`, it will calculate the distance of this point to its center, and then send the distance along with the point and its `taskId` to the `Collection Processor`.
When `Collection Processor` receives the distance from `Distribution Processor`, it will accumulate the number of current points, determine if it's time to update center, choose the smallest distance and then send the point along with its corresponding `Distribution Processor` taskId by broadcast partitioner.
When `Distribution Processor` receives the result message, task with the corresponding `taskId` will accumulate the point. If `Distribution Processor` receives that it's time to update center, then all the tasks will update its corresponding center.

This procedure is streaming and the center of cluster will change by time.

## How to use it
You can used this application by command:

```
bin/gear app -jar examples/streamingkmeans-2.11-0.7.7-SNAPSHOT-assembly.jar io.gearpump.streaming.examples.streamingkmeans.StreamingKmeansExample
```

As an option, you can configure the clustering task by the following command:

```
-k <how many clusters (k in kmeans)>
-dimension <dimension of a point>
-maxBatch <number of data a batch for DataSourceProcessor>
-maxNumber <number of data to do a clustering procedure>
-decayFactor <decay factor for clustering, used by updating center>
```

## Evaluation
The number of task of the `Distribution Processor` is k, where each task saves one cluster center.
It will output the cluster center once they have been updated.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
gearpump {
serializers {
"io.gearpump.streaming.examples.streamingkmeans.InputMessage" = ""
"io.gearpump.streaming.examples.streamingkmeans.ResultMessage" = ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.gearpump.streaming.examples.streamingkmeans

import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}

class ClusterCollection(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output

private val k = conf.getInt("k").get
private val maxNumber = conf.getInt("maxNumber").get

private[streamingkmeans] var minTaskId = 0
private[streamingkmeans] var minDistance = Double.MaxValue
private[streamingkmeans] var minDistPoint : List[Double] = null

private[streamingkmeans] var currentNumber = 0
private[streamingkmeans] var totalNumber = 0

override def onStart(startTime: StartTime): Unit = super.onStart(startTime)

override def onNext(msg: Message): Unit = {
if (null == msg) {
return
}

val (taskId, distance, point) = msg.msg.asInstanceOf[(Int, Double, List[Double])]
if (distance < minDistance) {
minDistance = distance
minDistPoint = point
minTaskId = taskId
}

currentNumber += 1
if (k == currentNumber) {
currentNumber = 0
totalNumber += 1
if (maxNumber == totalNumber) {
totalNumber = 0
output(new Message(new ResultMessage(minTaskId, minDistPoint, true)))
} else {
output(new Message(new ResultMessage(minTaskId, minDistPoint, false)))
}
}
}

override def onStop(): Unit = super.onStop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.gearpump.streaming.examples.streamingkmeans

import java.util.concurrent.LinkedBlockingQueue

import io.gearpump.Message
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}

import scala.collection.mutable
import scala.util.Random

class ClusterDistribution(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output

private[streamingkmeans] val dataQueue: LinkedBlockingQueue[List[Double]] = new LinkedBlockingQueue[List[Double]]()
private[streamingkmeans] var isBegin: Boolean = true

private val decayFactor = conf.getDouble("decayFactor").get
private val dimension = conf.getInt("dimension").get

private[streamingkmeans] val center: Array[Double] = new Array[Double](dimension)
private[streamingkmeans] val points: mutable.MutableList[List[Double]] = new mutable.MutableList()
private[streamingkmeans] var previousNumber = 0
private[streamingkmeans] var currentNumber = 0


/**
* init center randomly
*/
private[streamingkmeans] def initCenter(): Unit = {
val random = new Random()
for (i <- center.indices) {
center.update(i, random.nextGaussian())
}
}

/**
* The update algorithm uses the "mini-batch" KMeans rule,
* generalized to incorporate forgetfullness (i.e. decay).
* The update rule (for each cluster) is:
*
* {{{
* c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t]
* n_t+t = n_t * a + m_t
* }}}
*
* Where c_t is the previously estimated centroid for that cluster,
* n_t is the number of points assigned to it thus far, x_t is the centroid
* estimated on the current batch, and m_t is the number of points assigned
* to that centroid in the current batch.
*
* The decay factor 'a' scales the contribution of the clusters as estimated thus far,
* by applying a as a discount weighting on the current point when evaluating
* new incoming data. If a=1, all batches are weighted equally. If a=0, new centroids
* are determined entirely by recent data. Lower values correspond to
* more forgetting.
*/
private[streamingkmeans] def updateCenter(): Unit = {
if (0 == currentNumber) {
return
}

val newCenter: Array[Double] = new Array[Double](dimension)
for (i <- newCenter.indices) {
var sum = 0.0
for (point <- points) {
sum += point(i)
}
sum /= currentNumber
newCenter.update(i, sum)
}

for (i <- center.indices) {
center.update(i,
(center(i) * previousNumber * decayFactor + newCenter(i) * currentNumber)
/ (previousNumber + currentNumber))
}
}

private[streamingkmeans] def getDistance(point: List[Double]): Double = {
var distance = 0.0
for (i <- 0 until dimension) {
distance += ((point(i) - center(i)) * (point(i) - center(i)))
}
Math.sqrt(distance)
}

override def onStart(startTime: StartTime): Unit = {
initCenter()
}

override def onNext(msg: Message): Unit = {
if (null == msg) {
return
}

val message = msg.msg.asInstanceOf[ClusterMessage]

message match {
case InputMessage(point) =>
if (isBegin) {
isBegin = false
output(new Message((taskContext.taskId.index, getDistance(point), point)))
} else {
dataQueue.put(point)
Copy link
Member

@manuzhang manuzhang May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use non-blocking "offer" is better

}
case ResultMessage(taskId, point, doCluster) =>
if (taskContext.taskId.index == taskId) {
points += point
currentNumber += 1
}
if (doCluster) {
updateCenter()
LOG.info(s"task ${taskContext.taskId.index}, center ${center.mkString(",")}")
points.clear()
previousNumber += currentNumber
currentNumber = 0
}
val newPoint = dataQueue.take()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a blocking call. We suggest against block in Task. You may use "poll" instead.

output(new Message((taskContext.taskId.index, getDistance(newPoint), newPoint)))
}
}

override def onStop(): Unit = super.onStop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.gearpump.streaming.examples.streamingkmeans

trait ClusterMessage extends Serializable
case class InputMessage(point: List[Double]) extends ClusterMessage
case class ResultMessage(taskId: Int, point: List[Double], doCluster: Boolean) extends ClusterMessage
Loading