Skip to content

Commit a3adafb

Browse files
committed
Add test coverage for stream replication (40 tests)
Unit tests (16, no infrastructure needed): - recordToItem: INSERT, MODIFY, REMOVE, unknown event (4 tests) - retryRandom: success, retry+succeed, exhaust, non-retryable, non-DDB (5) - pollShard: success, LimitExceeded retry, ProvisionedThroughput retry, exhaust retries, non-retryable, non-DDB, shard closed (7) Integration tests (24, need DynamoDB Local + Alternator): - Checkpoint table: create idempotency, create when missing (2) - tryClaimShard: unclaimed, expired lease with checkpoint, active lease rejection, re-claim own shard (4) - renewLeaseAndCheckpoint: with checkpoint, without, stolen lease, expiry (4) - Multi-runner: disjoint claims, dead worker reassignment (2) - getStreamArn error when no stream enabled (1) - DynamoDB Streams thin-wrappers: getStreamArn, listShards, getShardIterator (TRIM_HORIZON + LATEST), getRecords, getShardIteratorAfterSequence, recordToItem with real records, resume-from-checkpoint (8) - Consecutive error threshold terminates after maxConsecutiveErrors (1) - Lost-lease-mid-cycle removes shard from tracking (1) - run() with renamesMap applies column renames (1) Test infrastructure: - TestStreamPoller: manual test double for StreamPollerOps with configurable function vars
1 parent 2d2260c commit a3adafb

9 files changed

+1439
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.scylladb.migrator
2+
3+
import com.scylladb.migrator.alternator.MigratorSuiteWithDynamoDBLocal
4+
import software.amazon.awssdk.services.dynamodb.model.{
5+
AttributeDefinition,
6+
CreateTableRequest,
7+
DeleteTableRequest,
8+
DescribeTableRequest,
9+
KeySchemaElement,
10+
KeyType,
11+
ProvisionedThroughput,
12+
ResourceNotFoundException,
13+
ScalarAttributeType
14+
}
15+
16+
/** Integration tests for DynamoStreamPoller. Requires DynamoDB Local on port 8001. */
17+
class DynamoStreamPollerIntegrationTest extends MigratorSuiteWithDynamoDBLocal {
18+
19+
private val tableName = "StreamPollerTest"
20+
21+
override def beforeEach(context: BeforeEach): Unit = {
22+
super.beforeEach(context)
23+
try {
24+
sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
25+
sourceDDb()
26+
.waiter()
27+
.waitUntilTableNotExists(DescribeTableRequest.builder().tableName(tableName).build())
28+
} catch {
29+
case _: ResourceNotFoundException => ()
30+
}
31+
}
32+
33+
test("getStreamArn: throws when table has no stream enabled") {
34+
// Create table without streams
35+
sourceDDb().createTable(
36+
CreateTableRequest
37+
.builder()
38+
.tableName(tableName)
39+
.keySchema(KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build())
40+
.attributeDefinitions(
41+
AttributeDefinition
42+
.builder()
43+
.attributeName("id")
44+
.attributeType(ScalarAttributeType.S)
45+
.build()
46+
)
47+
.provisionedThroughput(
48+
ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()
49+
)
50+
.build()
51+
)
52+
sourceDDb()
53+
.waiter()
54+
.waitUntilTableExists(DescribeTableRequest.builder().tableName(tableName).build())
55+
56+
val ex = intercept[RuntimeException] {
57+
DynamoStreamPoller.getStreamArn(sourceDDb(), tableName)
58+
}
59+
assert(ex.getMessage.contains("does not have a stream enabled"))
60+
}
61+
62+
override def afterEach(context: AfterEach): Unit = {
63+
try
64+
sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
65+
catch {
66+
case _: Exception => ()
67+
}
68+
super.afterEach(context)
69+
}
70+
}
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
package com.scylladb.migrator
2+
3+
import com.scylladb.migrator.alternator.MigratorSuiteWithDynamoDBLocal
4+
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
5+
import software.amazon.awssdk.regions.Region
6+
import software.amazon.awssdk.services.dynamodb.model.{
7+
AttributeDefinition,
8+
AttributeValue,
9+
CreateTableRequest,
10+
DeleteTableRequest,
11+
DescribeTableRequest,
12+
KeySchemaElement,
13+
KeyType,
14+
ProvisionedThroughput,
15+
PutItemRequest,
16+
ResourceNotFoundException,
17+
ScalarAttributeType,
18+
ShardIteratorType,
19+
StreamSpecification,
20+
StreamViewType
21+
}
22+
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient
23+
24+
import java.net.URI
25+
import scala.jdk.CollectionConverters._
26+
27+
/** Integration tests for DynamoStreamPoller thin-wrapper methods using DynamoDB Local streams.
28+
* Requires DynamoDB Local on port 8001 with streams support.
29+
*/
30+
class DynamoStreamPollerStreamsIntegrationTest extends MigratorSuiteWithDynamoDBLocal {
31+
32+
private val tableName = "StreamPollerStreamsTest"
33+
34+
private lazy val streamsClient: DynamoDbStreamsClient =
35+
DynamoDbStreamsClient
36+
.builder()
37+
.region(Region.of("dummy"))
38+
.endpointOverride(new URI("http://localhost:8001"))
39+
.credentialsProvider(
40+
StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy"))
41+
)
42+
.build()
43+
44+
private def createStreamEnabledTable(): String = {
45+
try {
46+
sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
47+
sourceDDb()
48+
.waiter()
49+
.waitUntilTableNotExists(DescribeTableRequest.builder().tableName(tableName).build())
50+
} catch { case _: ResourceNotFoundException => () }
51+
52+
sourceDDb().createTable(
53+
CreateTableRequest
54+
.builder()
55+
.tableName(tableName)
56+
.keySchema(KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build())
57+
.attributeDefinitions(
58+
AttributeDefinition
59+
.builder()
60+
.attributeName("id")
61+
.attributeType(ScalarAttributeType.S)
62+
.build()
63+
)
64+
.provisionedThroughput(
65+
ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()
66+
)
67+
.streamSpecification(
68+
StreamSpecification
69+
.builder()
70+
.streamEnabled(true)
71+
.streamViewType(StreamViewType.NEW_AND_OLD_IMAGES)
72+
.build()
73+
)
74+
.build()
75+
)
76+
sourceDDb()
77+
.waiter()
78+
.waitUntilTableExists(DescribeTableRequest.builder().tableName(tableName).build())
79+
80+
// Return the stream ARN
81+
DynamoStreamPoller.getStreamArn(sourceDDb(), tableName)
82+
}
83+
84+
private def putItem(id: String, value: String): Unit =
85+
sourceDDb().putItem(
86+
PutItemRequest
87+
.builder()
88+
.tableName(tableName)
89+
.item(
90+
Map(
91+
"id" -> AttributeValue.fromS(id),
92+
"value" -> AttributeValue.fromS(value)
93+
).asJava
94+
)
95+
.build()
96+
)
97+
98+
test("getStreamArn: returns valid ARN for stream-enabled table") {
99+
val streamArn = createStreamEnabledTable()
100+
assert(streamArn != null)
101+
assert(streamArn.nonEmpty)
102+
assert(streamArn.contains(tableName), s"ARN should contain table name: $streamArn")
103+
}
104+
105+
test("listShards: returns shards for a stream") {
106+
val streamArn = createStreamEnabledTable()
107+
// Insert an item to ensure at least one shard exists
108+
putItem("ls-1", "val1")
109+
110+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
111+
assert(shards.nonEmpty, "Expected at least one shard")
112+
shards.foreach { shard =>
113+
assert(shard.shardId() != null)
114+
assert(shard.shardId().nonEmpty)
115+
}
116+
}
117+
118+
test("getShardIterator: returns iterator with TRIM_HORIZON") {
119+
val streamArn = createStreamEnabledTable()
120+
putItem("gi-1", "val1")
121+
122+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
123+
assert(shards.nonEmpty)
124+
125+
val iterator = DynamoStreamPoller.getShardIterator(
126+
streamsClient,
127+
streamArn,
128+
shards.head.shardId(),
129+
ShardIteratorType.TRIM_HORIZON
130+
)
131+
assert(iterator != null)
132+
assert(iterator.nonEmpty)
133+
}
134+
135+
test("getShardIterator: returns iterator with LATEST") {
136+
val streamArn = createStreamEnabledTable()
137+
putItem("gi-2", "val2")
138+
139+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
140+
assert(shards.nonEmpty)
141+
142+
val iterator = DynamoStreamPoller.getShardIterator(
143+
streamsClient,
144+
streamArn,
145+
shards.head.shardId(),
146+
ShardIteratorType.LATEST
147+
)
148+
assert(iterator != null)
149+
assert(iterator.nonEmpty)
150+
}
151+
152+
test("getRecords: reads records from a shard") {
153+
val streamArn = createStreamEnabledTable()
154+
putItem("gr-1", "val1")
155+
putItem("gr-2", "val2")
156+
157+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
158+
assert(shards.nonEmpty)
159+
160+
val iterator = DynamoStreamPoller.getShardIterator(
161+
streamsClient,
162+
streamArn,
163+
shards.head.shardId(),
164+
ShardIteratorType.TRIM_HORIZON
165+
)
166+
167+
val (records, nextIter) = DynamoStreamPoller.getRecords(streamsClient, iterator)
168+
assert(records.nonEmpty, "Expected records from stream")
169+
records.foreach { record =>
170+
assert(record.dynamodb() != null)
171+
assert(record.dynamodb().sequenceNumber() != null)
172+
}
173+
}
174+
175+
test("getShardIteratorAfterSequence: resumes from a sequence number") {
176+
val streamArn = createStreamEnabledTable()
177+
putItem("gas-1", "val1")
178+
putItem("gas-2", "val2")
179+
180+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
181+
assert(shards.nonEmpty)
182+
val shardId = shards.head.shardId()
183+
184+
// Read first batch to get a sequence number
185+
val iterator = DynamoStreamPoller.getShardIterator(
186+
streamsClient,
187+
streamArn,
188+
shardId,
189+
ShardIteratorType.TRIM_HORIZON
190+
)
191+
val (records, _) = DynamoStreamPoller.getRecords(streamsClient, iterator)
192+
assert(records.nonEmpty, "Expected at least one record to get a sequence number")
193+
194+
val seqNum = records.head.dynamodb().sequenceNumber()
195+
196+
// Resume after that sequence number
197+
val afterIter = DynamoStreamPoller.getShardIteratorAfterSequence(
198+
streamsClient,
199+
streamArn,
200+
shardId,
201+
seqNum
202+
)
203+
assert(afterIter != null)
204+
assert(afterIter.nonEmpty)
205+
206+
// Reading from the "after" iterator should skip the first record
207+
val (afterRecords, _) = DynamoStreamPoller.getRecords(streamsClient, afterIter)
208+
// The records after the first sequence number should not include the first record
209+
afterRecords.foreach { record =>
210+
assert(
211+
record.dynamodb().sequenceNumber() != records.head.dynamodb().sequenceNumber(),
212+
"After-sequence iterator should skip the record at the given sequence number"
213+
)
214+
}
215+
}
216+
217+
test("recordToItem: converts real stream records correctly") {
218+
val streamArn = createStreamEnabledTable()
219+
putItem("rti-1", "val1")
220+
221+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
222+
val iterator = DynamoStreamPoller.getShardIterator(
223+
streamsClient,
224+
streamArn,
225+
shards.head.shardId(),
226+
ShardIteratorType.TRIM_HORIZON
227+
)
228+
val (records, _) = DynamoStreamPoller.getRecords(streamsClient, iterator)
229+
assert(records.nonEmpty)
230+
231+
val putMarker = AttributeValue.fromBool(true)
232+
val deleteMarker = AttributeValue.fromBool(false)
233+
val result = DynamoStreamPoller.recordToItem(records.head, "_op", putMarker, deleteMarker)
234+
assert(result.isDefined)
235+
val item = result.get.asScala.toMap
236+
assertEquals(item("_op"), putMarker)
237+
assertEquals(item("id"), AttributeValue.fromS("rti-1"))
238+
}
239+
240+
test("resume-from-checkpoint: AFTER_SEQUENCE_NUMBER resumes past checkpointed record") {
241+
val streamArn = createStreamEnabledTable()
242+
// Insert 3 items to create stream records
243+
putItem("rfc-1", "val1")
244+
putItem("rfc-2", "val2")
245+
putItem("rfc-3", "val3")
246+
247+
val shards = DynamoStreamPoller.listShards(streamsClient, streamArn)
248+
assert(shards.nonEmpty)
249+
val shardId = shards.head.shardId()
250+
251+
// Read all records from the beginning
252+
val iterator = DynamoStreamPoller.getShardIterator(
253+
streamsClient,
254+
streamArn,
255+
shardId,
256+
ShardIteratorType.TRIM_HORIZON
257+
)
258+
val (allRecords, _) = DynamoStreamPoller.getRecords(streamsClient, iterator)
259+
assert(allRecords.size >= 2, s"Expected at least 2 records, got ${allRecords.size}")
260+
261+
// Use the first record's sequence number as the "checkpoint"
262+
val checkpointSeqNum = allRecords.head.dynamodb().sequenceNumber()
263+
264+
// Resume from the checkpoint using AFTER_SEQUENCE_NUMBER (this is what
265+
// startStreaming does when tryClaimShard returns a stored checkpoint)
266+
val resumeIter = DynamoStreamPoller.getShardIteratorAfterSequence(
267+
streamsClient,
268+
streamArn,
269+
shardId,
270+
checkpointSeqNum
271+
)
272+
val (resumedRecords, _) = DynamoStreamPoller.getRecords(streamsClient, resumeIter)
273+
274+
// Verify none of the resumed records have the checkpointed sequence number
275+
resumedRecords.foreach { record =>
276+
assert(
277+
record.dynamodb().sequenceNumber() != checkpointSeqNum,
278+
s"Resumed records should not include the checkpointed sequence number $checkpointSeqNum"
279+
)
280+
}
281+
282+
// Verify we got fewer records than the full read (we skipped at least the first one)
283+
assert(
284+
resumedRecords.size < allRecords.size,
285+
s"Resumed read (${resumedRecords.size}) should have fewer records than full read (${allRecords.size})"
286+
)
287+
}
288+
289+
override def afterEach(context: AfterEach): Unit = {
290+
try
291+
sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
292+
catch { case _: Exception => () }
293+
super.afterEach(context)
294+
}
295+
296+
override def afterAll(): Unit = {
297+
streamsClient.close()
298+
super.afterAll()
299+
}
300+
}

0 commit comments

Comments
 (0)