Skip to content

Commit 2fbe319

Browse files
author
jmast
committed
add tests for ttl
1 parent f2b60a3 commit 2fbe319

File tree

3 files changed

+156
-7
lines changed

3 files changed

+156
-7
lines changed

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSpec.scala

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package com.evolutiongaming.kafka.flow.journal
22

33
import cats.effect.{IO, Ref}
4+
import cats.syntax.all.*
45
import com.evolutiongaming.kafka.flow.{CassandraSessionStub, CassandraSpec, KafkaKey}
6+
import com.evolutiongaming.scassandra.syntax.*
57
import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize}
68
import com.evolutiongaming.skafka.{Offset, TopicPartition}
79
import scodec.bits.ByteVector
810

11+
import scala.concurrent.duration.*
12+
import scala.jdk.CollectionConverters.*
13+
914
class JournalSpec extends CassandraSpec {
1015

1116
test("queries") {
@@ -23,13 +28,15 @@ class JournalSpec extends CassandraSpec {
2328
journalBeforeTest <- journals.get(key).toList
2429
_ <- journals.persist(key, record)
2530
journalAfterPersist <- journals.get(key).toList
31+
ttls <- getTtls(key)
2632
_ <- journals.delete(key)
2733
journalAfterDelete <- journals.get(key).toList
28-
29-
_ = assert(clue(journalBeforeTest.isEmpty))
30-
_ = assertEquals(clue(journalAfterPersist), List(record))
31-
_ = assert(clue(journalAfterDelete.isEmpty))
32-
} yield ()
34+
} yield {
35+
assert(clue(journalBeforeTest.isEmpty))
36+
assertEquals(clue(journalAfterPersist), List(record))
37+
assert(clue(journalAfterDelete.isEmpty))
38+
assertEquals(clue(ttls), List(none))
39+
}
3340

3441
test.unsafeRunSync()
3542
}
@@ -42,10 +49,56 @@ class JournalSpec extends CassandraSpec {
4249
journals <- CassandraJournals.withSchema(session, cassandra().sync)
4350
_ <- failAfter.set(1)
4451
records <- journals.get(key).toList.attempt
45-
_ = assert(clue(records.isLeft))
46-
} yield ()
52+
} yield assert(clue(records.isLeft))
4753

4854
test.unsafeRunSync()
4955
}
5056

57+
test("ttl") {
58+
val key = KafkaKey("JournalSpec", "integration-tests-1", TopicPartition.empty, "ttl")
59+
val session = cassandra().session
60+
val test: IO[Unit] = for {
61+
journals <- CassandraJournals.withSchema(session, cassandra().sync, ttl = 1.hour.some)
62+
contents <- IO.fromEither(ByteVector.encodeUtf8("record-contents"))
63+
record = ConsumerRecord[String, ByteVector](
64+
topicPartition = TopicPartition.empty,
65+
offset = Offset.min,
66+
timestampAndType = None,
67+
key = Some(WithSize("ttl")),
68+
value = Some(WithSize(contents, 15))
69+
)
70+
_ <- journals.persist(key, record)
71+
journalAfterPersist <- journals.get(key).toList
72+
ttls <- getTtls(key)
73+
} yield {
74+
assertEquals(clue(journalAfterPersist), List(record))
75+
assertEquals(clue(ttls.size), 1)
76+
assert(clue(ttls.head.isDefined))
77+
}
78+
79+
test.unsafeRunSync()
80+
}
81+
82+
private def getTtls(key: KafkaKey): IO[List[Option[Int]]] = {
83+
val session = cassandra().session
84+
for {
85+
prepared <- session.prepare(
86+
s"""SELECT TTL(value) FROM ${CassandraJournals.DefaultTableName} WHERE
87+
| application_id = :application_id
88+
| AND group_id = :group_id
89+
| AND topic = :topic
90+
| AND partition = :partition
91+
| AND key = :key""".stripMargin
92+
)
93+
bound = prepared
94+
.bind()
95+
.encode("application_id", key.applicationId)
96+
.encode("group_id", key.groupId)
97+
.encode("topic", key.topicPartition.topic)
98+
.encode("partition", key.topicPartition.partition.value)
99+
.encode("key", key.key)
100+
ttls <- session.execute(bound)
101+
} yield ttls.all().asScala.map(row => row.decodeAt[Option[Int]](0)).toList
102+
}
103+
51104
}

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySpec.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import cats.effect.{IO, Ref}
44
import cats.syntax.all.*
55
import com.evolutiongaming.kafka.flow.cassandra.ConsistencyOverrides
66
import com.evolutiongaming.kafka.flow.{CassandraSessionStub, CassandraSpec, KafkaKey}
7+
import com.evolutiongaming.scassandra.syntax.*
78
import com.evolutiongaming.skafka.{Partition, TopicPartition}
89

910
import scala.concurrent.duration.*
11+
import scala.jdk.CollectionConverters.*
1012

1113
class KeySpec extends CassandraSpec {
1214

@@ -30,6 +32,7 @@ class KeySpec extends CassandraSpec {
3032
_ <- List(key1, key2, key3) traverse_ keys.persist
3133
partition1KeysAfterPersist <- keys.all("KeySpec", "integration-tests-1", partition1).toList
3234
partition2KeysAfterPersist <- keys.all("KeySpec", "integration-tests-1", partition2).toList
35+
ttls <- getTtls(key1)
3336
_ <- List(key1, key2, key3) traverse_ keys.delete
3437
partition1KeysAfterDelete <- keys.all("KeySpec", "integration-tests-1", partition1).toList
3538
partition2KeysAfterDelete <- keys.all("KeySpec", "integration-tests-1", partition2).toList
@@ -40,6 +43,7 @@ class KeySpec extends CassandraSpec {
4043
assert(clue(partition2KeysAfterPersist.length == 2))
4144
assert(clue(partition1KeysAfterDelete.isEmpty))
4245
assert(clue(partition2KeysAfterDelete.isEmpty))
46+
assertEquals(clue(ttls), List(none))
4347
}
4448

4549
test.unsafeRunSync()
@@ -62,4 +66,50 @@ class KeySpec extends CassandraSpec {
6266
test.unsafeRunSync()
6367
}
6468

69+
test("ttl") {
70+
val partition = TopicPartition("topic1", Partition.unsafe(1))
71+
val key = KafkaKey("KeySpec", "integration-tests-1", partition, "queries.key1")
72+
val test: IO[Unit] = for {
73+
keys <- CassandraKeys.withSchema(
74+
cassandra().session,
75+
cassandra().sync,
76+
ConsistencyOverrides.none,
77+
CassandraKeys.DefaultSegments,
78+
ttl = 1.hour.some,
79+
)
80+
_ <- keys.persist(key)
81+
partition1KeysAfterPersist <- keys.all(key.applicationId, key.groupId, key.topicPartition).toList
82+
ttls <- getTtls(key)
83+
} yield {
84+
assertEquals(clue(partition1KeysAfterPersist), List(key))
85+
assertEquals(clue(ttls.size), 1)
86+
assert(clue(ttls.head.isDefined))
87+
}
88+
89+
test.unsafeRunSync()
90+
}
91+
92+
private def getTtls(key: KafkaKey): IO[List[Option[Int]]] = {
93+
val session = cassandra().session
94+
for {
95+
prepared <- session.prepare(
96+
s"""SELECT TTL(metadata) FROM ${CassandraKeys.DefaultTableName} WHERE
97+
| application_id = :application_id
98+
| AND group_id = :group_id
99+
| AND topic = :topic
100+
| AND partition = :partition
101+
| AND key = :key
102+
| ALLOW FILTERING
103+
""".stripMargin
104+
)
105+
bound = prepared
106+
.bind()
107+
.encode("application_id", key.applicationId)
108+
.encode("group_id", key.groupId)
109+
.encode("topic", key.topicPartition.topic)
110+
.encode("partition", key.topicPartition.partition.value)
111+
.encode("key", key.key)
112+
ttls <- session.execute(bound)
113+
} yield ttls.all().asScala.map(row => row.decodeAt[Option[Int]](0)).toList
114+
}
65115
}

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSpec.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.evolutiongaming.kafka.flow.snapshot
22

33
import cats.effect.{IO, Ref}
4+
import cats.syntax.all.*
45
import com.evolutiongaming.kafka.flow.{CassandraSessionStub, CassandraSpec, KafkaKey}
6+
import com.evolutiongaming.scassandra.syntax.*
57
import com.evolutiongaming.skafka.{Offset, TopicPartition}
68

9+
import scala.concurrent.duration.*
10+
import scala.jdk.CollectionConverters.*
11+
712
class SnapshotSpec extends CassandraSpec {
813

914
test("queries") {
@@ -14,12 +19,14 @@ class SnapshotSpec extends CassandraSpec {
1419
snapshotBeforeTest <- snapshots.get(key)
1520
_ <- snapshots.persist(key, snapshot)
1621
snapshotAfterPersist <- snapshots.get(key)
22+
ttls <- getTtls(key)
1723
_ <- snapshots.delete(key)
1824
snapshotAfterDelete <- snapshots.get(key)
1925
} yield {
2026
assert(clue(snapshotBeforeTest.isEmpty))
2127
assertEquals(clue(snapshotAfterPersist), Some(snapshot))
2228
assert(clue(snapshotAfterDelete.isEmpty))
29+
assertEquals(clue(ttls), List(none))
2330
}
2431

2532
test.unsafeRunSync()
@@ -38,4 +45,43 @@ class SnapshotSpec extends CassandraSpec {
3845
test.unsafeRunSync()
3946
}
4047

48+
test("ttl") {
49+
val key = KafkaKey("SnapshotSpec", "integration-tests-1", TopicPartition.empty, "queries")
50+
val snapshot = KafkaSnapshot(offset = Offset.min, value = "snapshot-contents")
51+
val test: IO[Unit] = for {
52+
snapshots <- CassandraSnapshots.withSchema[IO, String](cassandra().session, cassandra().sync, ttl = 1.hour.some)
53+
_ <- snapshots.persist(key, snapshot)
54+
snapshotAfterPersist <- snapshots.get(key)
55+
ttls <- getTtls(key)
56+
} yield {
57+
assertEquals(clue(snapshotAfterPersist), snapshot.some)
58+
assertEquals(clue(ttls.size), 1)
59+
assert(clue(ttls.head.isDefined))
60+
}
61+
62+
test.unsafeRunSync()
63+
}
64+
65+
private def getTtls(key: KafkaKey): IO[List[Option[Int]]] = {
66+
val session = cassandra().session
67+
for {
68+
prepared <- session.prepare(
69+
s"""SELECT TTL(value) FROM ${CassandraSnapshots.DefaultTableName} WHERE
70+
| application_id = :application_id
71+
| AND group_id = :group_id
72+
| AND topic = :topic
73+
| AND partition = :partition
74+
| AND key = :key""".stripMargin
75+
)
76+
bound = prepared
77+
.bind()
78+
.encode("application_id", key.applicationId)
79+
.encode("group_id", key.groupId)
80+
.encode("topic", key.topicPartition.topic)
81+
.encode("partition", key.topicPartition.partition.value)
82+
.encode("key", key.key)
83+
ttls <- session.execute(bound)
84+
} yield ttls.all().asScala.map(row => row.decodeAt[Option[Int]](0)).toList
85+
}
86+
4187
}

0 commit comments

Comments
 (0)