Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.

Commit 3fccf11

Browse files
committed
fix GEARPUMP-152 upgrade Storm support to 1.0.x
1 parent 1b6a234 commit 3fccf11

23 files changed

+184
-76
lines changed

experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
package org.apache.gearpump.experiments.storm.util;
2020

21-
import backtype.storm.utils.TimeCacheMap;
21+
22+
import org.apache.storm.utils.TimeCacheMap;
2223

2324
/**
2425
* Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@ package org.apache.gearpump.experiments.storm.main
2121
import java.io.{File, FileOutputStream, FileWriter}
2222
import java.nio.ByteBuffer
2323
import java.nio.channels.{Channels, WritableByteChannel}
24-
import java.util.{HashMap => JHashMap, Map => JMap, UUID}
24+
import java.util.{List => JList, HashMap => JHashMap, Map => JMap, UUID}
25+
import org.apache.storm.Config
26+
import org.apache.storm.generated._
27+
import org.apache.storm.nimbus.NimbusInfo
28+
import org.apache.storm.security.auth.{ReqContext, ThriftConnectionType, ThriftServer}
29+
import org.apache.storm.utils.Utils
30+
2531
import scala.collection.JavaConverters._
2632
import scala.concurrent.duration.Duration
2733
import scala.concurrent.{Await, Future}
2834

2935
import akka.actor.ActorSystem
3036
import com.typesafe.config.ConfigValueFactory
31-
import backtype.storm.Config
32-
import backtype.storm.generated._
33-
import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
34-
import backtype.storm.utils.Utils
3537
import org.apache.storm.shade.org.json.simple.JSONValue
3638
import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
3739
import org.slf4j.Logger
@@ -231,7 +233,8 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
231233
val topologySummaryList = topologies.map { case (name, _) =>
232234
new TopologySummary(name, name, 0, 0, 0, 0, "")
233235
}.toSeq
234-
new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
236+
new ClusterSummary(List[SupervisorSummary]().asJava,
237+
topologySummaryList.asJava, List[NimbusSummary]().asJava)
235238
}
236239

237240
override def beginFileDownload(file: String): String = {
@@ -284,6 +287,96 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
284287
new File(jar).delete()
285288
topologies -= name
286289
}
290+
291+
override def updateBlobReplication(key: String, replication: Int): Int = {
292+
// val blobClient = Utils.getNimbusBlobStore(stormConf, NimbusInfo.fromConf(stormConf))
293+
// blobClient.updateBlobReplication(key, replication, ReqContext.context().subject())
294+
throw new UnsupportedOperationException
295+
}
296+
297+
override def setLogConfig(name: String, config: LogConfig): Unit = {
298+
299+
}
300+
301+
override def downloadBlobChunk(session: String): ByteBuffer = {
302+
throw new UnsupportedOperationException
303+
}
304+
305+
override def beginBlobDownload(key: String): BeginDownloadResult = {
306+
throw new UnsupportedOperationException
307+
}
308+
309+
override def cancelBlobUpload(session: String): Unit = {
310+
throw new UnsupportedOperationException
311+
}
312+
313+
override def getTopologyHistory(user: String): TopologyHistoryInfo = {
314+
throw new UnsupportedOperationException
315+
}
316+
317+
override def getTopologyPageInfo(id: String, window: String, is_include_sys: Boolean): TopologyPageInfo = {
318+
throw new UnsupportedOperationException
319+
}
320+
321+
override def getBlobMeta(key: String): ReadableBlobMeta = {
322+
throw new UnsupportedOperationException
323+
}
324+
325+
override def createStateInZookeeper(key: String): Unit = {
326+
327+
}
328+
329+
override def setBlobMeta(key: String, meta: SettableBlobMeta): Unit = {
330+
331+
}
332+
333+
override def getComponentPendingProfileActions(id: String, component_id: String, action: ProfileAction): JList[ProfileRequest] = {
334+
throw new UnsupportedOperationException
335+
}
336+
337+
override def debug(name: String, component: String, enable: Boolean, samplingPercentage: Double): Unit = {
338+
339+
}
340+
341+
override def getComponentPageInfo(topology_id: String, component_id: String, window: String, is_include_sys: Boolean): ComponentPageInfo = {
342+
throw new UnsupportedOperationException
343+
}
344+
345+
override def setWorkerProfiler(id: String, profileRequest: ProfileRequest): Unit = {
346+
347+
}
348+
349+
override def finishBlobUpload(session: String): Unit = {
350+
throw new UnsupportedOperationException
351+
}
352+
353+
override def beginCreateBlob(key: String, meta: SettableBlobMeta): String = {
354+
throw new UnsupportedOperationException
355+
}
356+
357+
override def getBlobReplication(key: String): Int = {
358+
throw new UnsupportedOperationException
359+
}
360+
361+
override def deleteBlob(key: String): Unit = {
362+
throw new UnsupportedOperationException
363+
}
364+
365+
override def listBlobs(session: String): ListBlobsResult = {
366+
throw new UnsupportedOperationException
367+
}
368+
369+
override def getLogConfig(name: String): LogConfig = {
370+
throw new UnsupportedOperationException
371+
}
372+
373+
override def beginUpdateBlob(key: String): String = {
374+
throw new UnsupportedOperationException
375+
}
376+
377+
override def uploadBlobChunk(session: String, chunk: ByteBuffer): Unit = {
378+
throw new UnsupportedOperationException
379+
}
287380
}
288381

289382
case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String)

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
package org.apache.gearpump.experiments.storm.main
2020

21-
import backtype.storm.Config
22-
import backtype.storm.utils.Utils
2321
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
2422
import org.apache.gearpump.util.Constants._
2523
import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
24+
import org.apache.storm.Config
25+
import org.apache.storm.utils.Utils
2626

2727
object GearpumpStormClient extends AkkaApp with ArgumentsParser {
2828

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ package org.apache.gearpump.experiments.storm.processor
2020

2121
import java.util.{Collection => JCollection, List => JList}
2222

23-
import backtype.storm.task.IOutputCollector
24-
import backtype.storm.tuple.Tuple
2523
import org.apache.gearpump.experiments.storm.topology.TimedTuple
2624
import org.apache.gearpump.experiments.storm.util.StormConstants._
2725
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
2826
import org.apache.gearpump.streaming.task.UpdateCheckpointClock
27+
import org.apache.storm.task.IOutputCollector
28+
import org.apache.storm.tuple.Tuple
2929

3030
/**
3131
* this is used by Storm bolt to emit messages

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.gearpump.experiments.storm.producer
2020

2121
import java.util.{List => JList}
2222

23-
import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
2423
import org.apache.gearpump.TimeStamp
2524
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
25+
import org.apache.storm.spout.{ISpoutOutputCollector, ISpout}
2626

2727
case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)
2828

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,6 @@ import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
2525

2626
import akka.actor.ActorRef
2727
import akka.pattern.ask
28-
import backtype.storm.Config
29-
import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
30-
import backtype.storm.metric.api.IMetric
31-
import backtype.storm.spout.{ISpout, SpoutOutputCollector}
32-
import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
33-
import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
34-
import backtype.storm.utils.Utils
3528
import clojure.lang.Atom
3629
import org.apache.commons.io.{FileUtils, IOUtils}
3730
import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector
@@ -43,6 +36,13 @@ import org.apache.gearpump.streaming.DAG
4336
import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
4437
import org.apache.gearpump.util.{Constants, LogUtil}
4538
import org.apache.gearpump.{Message, TimeStamp}
39+
import org.apache.storm.Config
40+
import org.apache.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
41+
import org.apache.storm.metric.api.IMetric
42+
import org.apache.storm.spout.{SpoutOutputCollector, ISpout}
43+
import org.apache.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext}
44+
import org.apache.storm.tuple.{Fields, Tuple, TupleImpl}
45+
import org.apache.storm.utils.Utils
4646
import org.slf4j.Logger
4747

4848
import scala.collection.JavaConverters._
@@ -55,13 +55,15 @@ import scala.concurrent.{Await, Future}
5555
trait GearpumpStormComponent {
5656
/**
5757
* invoked at Task.onStart
58-
* @param startTime task start time
58+
*
59+
* @param startTime task start time
5960
*/
6061
def start(startTime: StartTime): Unit
6162

6263
/**
6364
* invoked at Task.onNext
64-
* @param message incoming message
65+
*
66+
* @param message incoming message
6567
*/
6668
def next(message: Message): Unit
6769

@@ -228,7 +230,8 @@ object GearpumpStormComponent {
228230

229231
/**
230232
* invoked at TICK message when "topology.tick.tuple.freq.secs" is configured
231-
* @param freq tick frequency
233+
*
234+
* @param freq tick frequency
232235
*/
233236
def tick(freq: Int): Unit = {
234237
if (null == tickTuple) {
@@ -241,7 +244,8 @@ object GearpumpStormComponent {
241244
/**
242245
* normalize general config with per component configs
243246
* "topology.transactional.id" and "topology.tick.tuple.freq.secs"
244-
* @param stormConfig general config for all components
247+
*
248+
* @param stormConfig general config for all components
245249
* @param componentCommon common component parts
246250
*/
247251
private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ import java.lang.{Iterable => JIterable}
2222
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
2323

2424
import akka.actor.ActorSystem
25-
import backtype.storm.Config
26-
import backtype.storm.generated._
27-
import backtype.storm.utils.{ThriftTopologyUtils, Utils}
2825
import org.apache.gearpump.cluster.UserConfig
2926
import org.apache.gearpump.experiments.storm.processor.StormProcessor
3027
import org.apache.gearpump.experiments.storm.producer.StormProducer
@@ -34,6 +31,9 @@ import org.apache.gearpump.experiments.storm.util.StormUtil._
3431
import org.apache.gearpump.streaming.Processor
3532
import org.apache.gearpump.streaming.task.Task
3633
import org.apache.gearpump.util.LogUtil
34+
import org.apache.storm.Config
35+
import org.apache.storm.generated.{ComponentCommon, GlobalStreamId, Grouping, Bolt, SpoutSpec, StormTopology}
36+
import org.apache.storm.utils.{ThriftTopologyUtils, Utils}
3737
import org.slf4j.Logger
3838

3939
// TODO: Refactor this file, we should disable using of JavaConversions
@@ -64,7 +64,8 @@ object GearpumpStormTopology {
6464
* 3. provides interface for Gearpump applications to use Storm topology
6565
*
6666
* an implicit ActorSystem is required to create Gearpump processors
67-
* @param name topology name
67+
*
68+
* @param name topology name
6869
* @param topology Storm topology
6970
* @param sysConfig configs from "defaults.yaml" and custom config file
7071
* @param appConfig config submitted from user application
@@ -124,7 +125,8 @@ private[storm] class GearpumpStormTopology(
124125

125126
/**
126127
* creates Gearpump processor from Storm spout
127-
* @param spoutId spout id
128+
*
129+
* @param spoutId spout id
128130
* @param spoutSpec spout spec
129131
* @param stormConfig merged storm config
130132
* @param system actor system
@@ -141,7 +143,8 @@ private[storm] class GearpumpStormTopology(
141143

142144
/**
143145
* creates Gearpump processor from Storm bolt
144-
* @param boltId bolt id
146+
*
147+
* @param boltId bolt id
145148
* @param boltSpec bolt spec
146149
* @param stormConfig merged storm config
147150
* @param system actor system
@@ -192,7 +195,8 @@ private[storm] class GearpumpStormTopology(
192195
* 1. use "topology.tasks" if defined; otherwise use parallelism_hint
193196
* 2. parallelism should not be larger than "topology.max.task.parallelism" if defined
194197
* 3. component config overrides system config
195-
* @param stormConfig System configs without merging "topology.tasks" and
198+
*
199+
* @param stormConfig System configs without merging "topology.tasks" and
196200
* "topology.max.task.parallelism" of component
197201
* @return number of task instances for a component
198202
*/
@@ -223,7 +227,8 @@ private[storm] class GearpumpStormTopology(
223227

224228
/**
225229
* merge component configs "topology.kryo.decorators" and "topology.kryo.register"
226-
* @param componentConfigs list of component configs
230+
*
231+
* @param componentConfigs list of component configs
227232
* @param allConfig existing configs without merging component configs
228233
* @return the two configs merged from all the component configs and existing configs
229234
*/

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ package org.apache.gearpump.experiments.storm.topology
2020

2121
import java.util.{List => JList}
2222

23-
import backtype.storm.task.GeneralTopologyContext
24-
import backtype.storm.tuple.{Tuple, TupleImpl}
2523

2624
import org.apache.gearpump.TimeStamp
25+
import org.apache.storm.task.GeneralTopologyContext
26+
import org.apache.storm.tuple.{TupleImpl, Tuple}
2727

2828
/**
2929
* this carries Storm tuple values in the Gearpump world
@@ -36,10 +36,11 @@ private[storm] class GearpumpTuple(
3636
val sourceStreamId: String,
3737
@transient val targetPartitions: Map[String, Array[Int]]) extends Serializable {
3838
/**
39-
* creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component
39+
* creates a Storm [[org.apache.storm.tuple.Tuple]] to be passed to a Storm component
4040
* this is needed for each incoming message
41-
* because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization
42-
* @param topologyContext topology context used for all tasks
41+
* because we cannot get [[org.apache.storm.task.GeneralTopologyContext]] at deserialization
42+
*
43+
* @param topologyContext topology context used for all tasks
4344
* @return a Tuple
4445
*/
4546
def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
package org.apache.gearpump.experiments.storm.util
2020

2121
import java.util.{List => JList}
22+
import org.apache.storm.generated.GlobalStreamId
23+
import org.apache.storm.grouping.CustomStreamGrouping
24+
import org.apache.storm.task.TopologyContext
25+
import org.apache.storm.tuple.Fields
26+
2227
import scala.util.Random
2328

24-
import backtype.storm.generated.GlobalStreamId
25-
import backtype.storm.grouping.CustomStreamGrouping
26-
import backtype.storm.task.TopologyContext
27-
import backtype.storm.tuple.Fields
2829

2930
/**
3031
* Grouper is identical to that in storm but return gearpump partitions for storm tuple values
@@ -114,7 +115,7 @@ class AllGrouper(numTasks: Int) extends Grouper {
114115
/**
115116
* CustomGrouper allows users to specify grouping strategy
116117
*
117-
* @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
118+
* @param grouping see [[org.apache.storm.grouping.CustomStreamGrouping]]
118119
*/
119120
class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
120121

experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
package org.apache.gearpump.experiments.storm.util
2020

2121
import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap}
22+
import org.apache.storm.generated.{JavaObject, GlobalStreamId, Grouping}
23+
import org.apache.storm.grouping.CustomStreamGrouping
24+
import org.apache.storm.task.TopologyContext
25+
import org.apache.storm.tuple.Fields
26+
import org.apache.storm.utils.Utils
27+
2228
import scala.collection.JavaConverters._
2329

24-
import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
25-
import backtype.storm.grouping.CustomStreamGrouping
26-
import backtype.storm.task.TopologyContext
27-
import backtype.storm.tuple.Fields
28-
import backtype.storm.utils.Utils
2930
import org.slf4j.Logger
3031

3132
import org.apache.gearpump._

0 commit comments

Comments
 (0)