Skip to content

Commit 6f8eb26

Browse files
committed
Merge pull request #14 from Shiti/develop
fixed errors in timestamp support and added tests
2 parents 0fb7974 + 05f6646 commit 6f8eb26

File tree

5 files changed

+89
-12
lines changed

5 files changed

+89
-12
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "kafka-connect-cassandra"
22

3-
version := "0.0.4"
3+
version := "0.0.5"
44

55
crossScalaVersions := Seq("2.11.7", "2.10.6")
66

src/it/resources/setup.cql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,11 @@ CREATE TABLE githubstats.monthly_commits (
5656
year INT,
5757
PRIMARY KEY ((user), year, month)
5858
) WITH CLUSTERING ORDER BY (year DESC, month DESC);
59+
60+
CREATE TABLE IF NOT EXISTS test.event_store(
61+
app_id text,
62+
event_type text,
63+
subscription_type text,
64+
event_ts timestamp,
65+
PRIMARY KEY((app_id, event_type), event_ts)
66+
);

src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTaskSpec.scala

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
package com.tuplejump.kafka.connect.cassandra
2121

22-
import scala.collection.JavaConverters._
22+
import java.util.concurrent.TimeUnit
23+
2324
import org.apache.kafka.connect.source.SourceTaskContext
2425

26+
import scala.collection.JavaConverters._
27+
2528
class CassandraSourceTaskSpec extends AbstractFlatSpec {
2629

2730
val query = "SELECT * FROM test.playlists"
@@ -39,7 +42,7 @@ class CassandraSourceTaskSpec extends AbstractFlatSpec {
3942
sourceTask.stop()
4043
}
4144

42-
it should "fetch records from cassandra" in {
45+
it should "fetch records from cassandra in bulk" in {
4346
val sourceTask = new CassandraSourceTask()
4447
val mockContext = mock[SourceTaskContext]
4548

@@ -53,4 +56,64 @@ class CassandraSourceTaskSpec extends AbstractFlatSpec {
5356
sourceTask.stop()
5457
}
5558

59+
def insertStmt(time: Long): String = {
60+
"INSERT INTO test.event_store(app_id,event_type,subscription_type,event_ts) " +
61+
s"VALUES ('website','renewal','annual',$time)"
62+
}
63+
64+
it should "fetch only new records from cassandra" in {
65+
val timeBasedQuery =
66+
"""SELECT * FROM test.event_store WHERE app_id='website' AND event_type='renewal'
67+
| AND event_ts >= previousTime()""".stripMargin
68+
69+
val topic = "events"
70+
val cassandraSourceConfig = sourceConfig(timeBasedQuery, topic)
71+
72+
val sourceTask = new CassandraSourceTask()
73+
val mockContext = mock[SourceTaskContext]
74+
75+
sourceTask.initialize(mockContext)
76+
sourceTask.start(cassandraSourceConfig.asJava)
77+
78+
val oneHrAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)
79+
sourceTask.session.execute(insertStmt(oneHrAgo))
80+
81+
sourceTask.poll().size() should be(0)
82+
83+
val oneHrLater = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)
84+
sourceTask.session.execute(insertStmt(oneHrLater))
85+
86+
val result = sourceTask.poll()
87+
88+
result.size() should be(1)
89+
90+
sourceTask.stop()
91+
}
92+
93+
it should "fetch records from cassandra in given pollInterval" in {
94+
val timeBasedQuery =
95+
"""SELECT * FROM test.event_store WHERE app_id='website' AND event_type='renewal'
96+
| AND event_ts >= previousTime() AND event_ts <= currentTime()""".stripMargin
97+
98+
val topic = "events"
99+
val cassandraSourceConfig = sourceConfig(timeBasedQuery, topic)
100+
101+
val sourceTask = new CassandraSourceTask()
102+
val mockContext = mock[SourceTaskContext]
103+
104+
sourceTask.initialize(mockContext)
105+
sourceTask.start(cassandraSourceConfig.asJava)
106+
107+
val oneHrLater = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)
108+
sourceTask.session.execute(insertStmt(oneHrLater))
109+
110+
val fewSecLater = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)
111+
sourceTask.session.execute(insertStmt(fewSecLater))
112+
113+
val result = sourceTask.poll()
114+
115+
result.size() should be(1)
116+
117+
sourceTask.stop()
118+
}
56119
}

src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTask.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ class CassandraSourceTask extends SourceTask with TaskLifecycle {
5757
//TODO remove https://github.com/tuplejump/kafka-connector/issues/9
5858
Thread.sleep(source.pollInterval)
5959

60-
source slide timestamp
61-
read(source)
60+
val updatedSource = source slide timestamp
61+
read(updatedSource)
6262
case source =>
6363
read(source)
6464
}

src/main/scala/com/tuplejump/kafka/connect/cassandra/Configuration.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import Configuration._
3131
*
3232
* TODO CassandraConnectionConfig
3333
*/
34-
private[kafka] final class Configuration private(val config: immutable.Map[String,String],
34+
private[kafka] final class Configuration private(val config: immutable.Map[String, String],
3535
val source: Option[SourceConfig],
3636
val sink: immutable.List[SinkConfig]) {
3737

@@ -70,8 +70,12 @@ object Configuration {
7070

7171
final val PreviousTime = "previousTime()"
7272

73+
final val PreviousTimeRegex = "previousTime\\(\\)"
74+
7375
final val CurrentTime = "currentTime()"
7476

77+
final val CurrentTimeRegex = "currentTime\\(\\)"
78+
7579
val Empty = new Configuration(Map.empty, SourceConfig.Empty, Nil)
7680

7781
/** Returns a new [[com.tuplejump.kafka.connect.cassandra.Configuration]]. */
@@ -127,8 +131,9 @@ object Configuration {
127131
def slide(now: Long): SourceConfig =
128132
if (timeseries) {
129133
val timestamp = now - pollInterval
130-
copy(query = query.replaceAll(PreviousTime, s"$timestamp")
131-
.replaceAll(CurrentTime, s"$now"))
134+
val updatedQuery: Query = query.replaceAll(PreviousTimeRegex, s"$timestamp")
135+
.replaceAll(CurrentTimeRegex, s"$now")
136+
copy(query = updatedQuery)
132137
} else this
133138

134139
def timeseries: Boolean =
@@ -140,20 +145,20 @@ object Configuration {
140145

141146
val Empty = SourceConfig(Map.empty)
142147

143-
def apply(config: Map[String,String]): Option[SourceConfig] =
148+
def apply(config: Map[String, String]): Option[SourceConfig] =
144149
for {
145150
topic <- get(config, TopicKey)
146151
query <- get(config, QueryKey)
147152
} yield SourceConfig(topic, query, pollInterval(config), None, None)
148153

149-
private def pollInterval(config: Map[String,String]): Long =
154+
private def pollInterval(config: Map[String, String]): Long =
150155
get(config, PollInterval).map(_.toLong).getOrElse(DefaultPollInterval)
151156
}
152157

153158
/** A Kafka [[CassandraSink]] and [[CassandraSinkTask]] configuration.
154159
* INTERNAL API.
155160
*
156-
* @param topic the kafka `topic` name
161+
* @param topic the kafka `topic` name
157162
* @param namespace the cassandra `keyspace.table`
158163
*/
159164
private[kafka] final case class SinkConfig(val topic: TopicName,
@@ -218,7 +223,7 @@ object Configuration {
218223
val ConsistencyLevelKey = "cassandra.output.consistency.level"
219224
val DefaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM
220225

221-
/** The maximum total size of the batch in bytes. Overridden by BatchSizeRowsParam. */
226+
/** The maximum total size of the batch in bytes. Overridden by BatchSizeRowsParam. */
222227
val BatchSizeBytesKey = "cassandra.output.batch.size.bytes"
223228
val BatchBufferSize = 1024
224229

@@ -227,4 +232,5 @@ object Configuration {
227232
val DefaultParallelismLevel = "5"
228233

229234
}
235+
230236
}

0 commit comments

Comments
 (0)