Skip to content

Drop AWS SDK v1: replace KCL with direct v2 stream poller#296

Draft
dkropachev wants to merge 1 commit intomasterfrom
drop-aws-sdk-v1
Draft

Drop AWS SDK v1: replace KCL with direct v2 stream poller#296
dkropachev wants to merge 1 commit intomasterfrom
drop-aws-sdk-v1

Conversation

@dkropachev
Copy link
Contributor

@dkropachev dkropachev commented Feb 25, 2026

Summary

Replace the KCL-based DynamoDB Streams replication with a direct AWS SDK v2 poller, eliminating the last reason for depending on AWS SDK v1.

What changed

  • Remove spark-kinesis-dynamodb module (5 files, ~800 lines) and its dynamodb-streams-kinesis-adapter dependency — the sole reason for depending on AWS SDK v1
  • Delete AttributeValueUtils.scala (v1-to-v2 conversion no longer needed)
  • New DynamoStreamPoller (~130 lines) using the v2 DynamoDbStreamsClient directly to poll shards
  • Remove Spark StreamingContext dependency — replaced with a ScheduledExecutorService that polls shards on the driver, since stream items are processed directly without Spark RDDs (v2 AttributeValue is not Serializable)
  • Remove spark-streaming dependency entirely

Stream replication hardening

  • Batch writes: replace individual putItem/deleteItem with BatchWriteItemRequest (up to 25 items/call) with retry for unprocessed items
  • Shard leasing: lease-based coordination using conditional UpdateItem expressions, enabling multi-runner setups with automatic failover on lease expiry
  • Deterministic checkpoint table (migrator_{table}) for cross-run resume and multi-runner coordination
  • Client lifecycle: pass all SDK clients to StreamHandle, close them in stop() after awaitTermination(30s)
  • Retry with exponential backoff for rate-limiting errors (LimitExceededException, ProvisionedThroughputExceededException, InternalServerError)
  • Consecutive error threshold: stops replication after configurable max consecutive failures
  • Periodic summary stats logging (every 60 poll cycles)
  • Thread safety: all mutable state confined to the scheduler thread (initialDelay=0)

New configuration options (all optional, in DynamoDB source settings)

  • streamingPollIntervalSeconds (default: 5)
  • streamingMaxConsecutiveErrors (default: 50)
  • streamingPollingPoolSize (default: max(4, availableProcessors))
  • streamingLeaseDurationMs (default: 60000)

Test coverage (40 tests)

Unit tests (16, no infrastructure needed):

  • recordToItem: INSERT, MODIFY, REMOVE, unknown event
  • retryRandom: success, retry+succeed, exhaust, non-retryable, non-DDB
  • pollShard: success, LimitExceeded retry, ProvisionedThroughput retry, exhaust retries, non-retryable, non-DDB, shard closed

Integration tests (24, need DynamoDB Local + Alternator):

  • Checkpoint table: create idempotency, create when missing
  • tryClaimShard: unclaimed, expired lease with checkpoint, active lease rejection, re-claim own shard
  • renewLeaseAndCheckpoint: with checkpoint, without, stolen lease, expiry
  • Multi-runner: disjoint claims, dead worker reassignment
  • DynamoDB Streams thin-wrappers: getStreamArn, listShards, getShardIterator, getRecords, resume-from-checkpoint, recordToItem with real records
  • Edge cases: consecutive error threshold termination, lost-lease-mid-cycle, run() with renamesMap

Test infrastructure:

  • StreamPollerOps trait extracted from DynamoStreamPoller for testability
  • TestStreamPoller: manual test double with configurable function vars

Notes

  • The only remaining v1 dependency is hadoop-aws (transitive only — no migrator code imports v1)
  • README and CONTRIBUTING.md updated to reflect build.shmake build and docker-build-jar.shmake docker-build-jar

Test plan

  • sbt migrator/compile passes
  • sbt tests/compile passes
  • sbt migrator/assembly builds successfully
  • No com.amazonaws imports remain in Scala sources
  • 16 unit tests pass
  • 24 integration tests pass (DynamoDB Local + Alternator)
  • CI passes

🤖 Generated with Claude Code

@dkropachev dkropachev marked this pull request as ready for review February 25, 2026 13:34
@dkropachev dkropachev marked this pull request as draft February 25, 2026 14:56
@dkropachev dkropachev force-pushed the drop-aws-sdk-v1 branch 4 times, most recently from a3adafb to af6836b Compare March 1, 2026 13:01
Remove the spark-kinesis-dynamodb module (5 files, ~800 lines) and its
dynamodb-streams-kinesis-adapter dependency, which was the sole reason
for depending on AWS SDK v1.

Replace with DynamoStreamPoller (~130 lines) that uses the v2
DynamoDbStreamsClient directly to poll shards. This eliminates KCL's
unnecessary checkpointing, lease management, and CloudWatch metrics.

- Delete spark-kinesis-dynamodb/ module entirely
- Delete AttributeValueUtils.scala (v1→v2 conversion no longer needed)
- Rewrite DynamoStreamReplication to use v2 AttributeValue natively
- Update DynamoStreamReplicationIntegrationTest to use v2 types
- Remove unused import in DynamoDBS3Export.scala
- Clean up build.sbt (remove module, adapter dep, version val)

The only remaining v1 dependency is hadoop-aws (transitive, no code imports).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant