Skip to content

Commit 4cc0ec5

Browse files
committed
Drop AWS SDK v1: replace KCL-based streaming with direct v2 poller
Remove the spark-kinesis-dynamodb module (5 files, ~800 lines) and its dynamodb-streams-kinesis-adapter dependency, which was the sole reason for depending on AWS SDK v1. Replace with DynamoStreamPoller (~130 lines) that uses the v2 DynamoDbStreamsClient directly to poll shards. This eliminates KCL's unnecessary checkpointing, lease management, and CloudWatch metrics. - Delete spark-kinesis-dynamodb/ module entirely - Delete AttributeValueUtils.scala (v1→v2 conversion no longer needed) - Rewrite DynamoStreamReplication to use v2 AttributeValue natively - Update DynamoStreamReplicationIntegrationTest to use v2 types - Remove unused import in DynamoDBS3Export.scala - Clean up build.sbt (remove module, adapter dep, version val) The only remaining v1 dependency is hadoop-aws (transitive, no code imports).
1 parent 59867e0 commit 4cc0ec5

File tree

79 files changed

+7351
-1416
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+7351
-1416
lines changed

.github/workflows/build.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ jobs:
2121
target/
2222
migrator/target/scala-2.13/classes/
2323
tests/target/scala-2.13/classes/
24-
spark-kinesis-dynamodb/target/scala-2.13/classes/
2524
project/target/
2625
key: sbt-target-${{ runner.os }}-${{ hashFiles('**/*.scala', '**/*.sbt', 'project/build.properties') }}
2726
restore-keys: |

.github/workflows/tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ jobs:
5959
target/
6060
migrator/target/scala-2.13/classes/
6161
tests/target/scala-2.13/classes/
62-
spark-kinesis-dynamodb/target/scala-2.13/classes/
6362
project/target/
6463
key: sbt-target-${{ runner.os }}-${{ hashFiles('**/*.scala', '**/*.sbt', 'project/build.properties') }}
6564
restore-keys: |

.gitignore

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,12 @@ AGENTS.md
178178
claude.log
179179
claude_history.json
180180
claude_config.json
181-
CLAUDE.md
181+
CLAUDE.md
182+
183+
# Docker test runtime data (written by Scylla containers)
184+
tests/docker/scylla/*
185+
!tests/docker/scylla/.gitkeep
186+
tests/docker/scylla-source/*
187+
!tests/docker/scylla-source/.gitkeep
188+
tests/docker/parquet/*
189+
!tests/docker/parquet/.gitkeep

ansible/files/config.dynamodb.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ source:
44
# if using DynamoDB local - specify endpoint with http:// prefix
55
# Otherwise, if using aws configure, leave commented out and it will use that
66
#endpoint:
7-
# host: http://dynamo-local-db-source
7+
# host: dynamo-local-db-source
88
# port: 8000
99
#credentials:
1010
# accessKey: empty
@@ -36,7 +36,7 @@ target:
3636
# The easiest and least performant option is just point to a single nodes IP.
3737
# YOUR-SCYLLA-NODE0-IP scylla
3838
endpoint:
39-
host: http://YOUR_ALTERNATOR_INSTANCE
39+
host: YOUR_ALTERNATOR_INSTANCE
4040
port: 8000
4141
credentials:
4242
accessKey: empty

build.sbt

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import sbt.librarymanagement.InclExclRule
44
val awsSdkVersion = "2.23.19"
55
val sparkVersion = "3.5.8"
66
val hadoopVersion = "3.3.4"
7-
val dynamodbStreamsKinesisAdapterVersion =
8-
"1.5.4" // Note This version still depends on AWS SDK 1.x, but there is no more recent version that supports AWS SDK v2.
97

108
inThisBuild(
119
List(
@@ -15,20 +13,6 @@ inThisBuild(
1513
)
1614
)
1715

18-
// Augmentation of spark-streaming-kinesis-asl to also work with DynamoDB Streams
19-
lazy val `spark-kinesis-dynamodb` = project
20-
.in(file("spark-kinesis-dynamodb"))
21-
.settings(
22-
libraryDependencies ++= Seq(
23-
("org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion)
24-
.excludeAll(
25-
InclExclRule("org.apache.spark", s"spark-streaming_${scalaBinaryVersion.value}")
26-
), // For some reason, the Spark dependency is not marked as provided in spark-streaming-kinesis-asl. We exclude it and then add it as provided.
27-
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
28-
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion
29-
)
30-
)
31-
3216
lazy val migrator = (project in file("migrator"))
3317
.enablePlugins(BuildInfoPlugin)
3418
.settings(
@@ -43,8 +27,7 @@ lazy val migrator = (project in file("migrator"))
4327
Test / fork := false,
4428
scalafmtOnCompile := true,
4529
libraryDependencies ++= Seq(
46-
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
47-
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
30+
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
4831
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) // Note: this package still depends on the AWS SDK v1
4932
// Exclude the AWS bundle because it creates many conflicts when generating the assembly
5033
.excludeAll(
@@ -54,6 +37,7 @@ lazy val migrator = (project in file("migrator"))
5437
"software.amazon.awssdk" % "dynamodb" % awsSdkVersion,
5538
"software.amazon.awssdk" % "s3" % awsSdkVersion,
5639
"software.amazon.awssdk" % "sts" % awsSdkVersion,
40+
"software.amazon.awssdk" % "cloudwatch" % awsSdkVersion,
5741
"com.scylladb" %% "spark-scylladb-connector" % "4.0.0",
5842
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the Spark ScyllaDB connector
5943
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.8.0",
@@ -71,9 +55,9 @@ lazy val migrator = (project in file("migrator"))
7155
case "mime.types" => MergeStrategy.first
7256
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.concat
7357
case PathList("META-INF", "versions", _, "module-info.class") =>
74-
MergeStrategy.discard // OK as long as we dont rely on Java 9+ features such as SPI
58+
MergeStrategy.discard // OK as long as we don't rely on Java 9+ features such as SPI
7559
case "module-info.class" =>
76-
MergeStrategy.discard // OK as long as we dont rely on Java 9+ features such as SPI
60+
MergeStrategy.discard // OK as long as we don't rely on Java 9+ features such as SPI
7761
case x =>
7862
val oldStrategy = (assembly / assemblyMergeStrategy).value
7963
oldStrategy(x)
@@ -86,15 +70,13 @@ lazy val migrator = (project in file("migrator"))
8670
.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner)
8771
.evaluated
8872
)
89-
.dependsOn(`spark-kinesis-dynamodb`)
9073

9174
lazy val tests = project
9275
.in(file("tests"))
9376
.settings(
9477
libraryDependencies ++= Seq(
9578
"software.amazon.awssdk" % "dynamodb" % awsSdkVersion,
9679
"org.apache.spark" %% "spark-sql" % sparkVersion,
97-
"org.apache.spark" %% "spark-streaming" % sparkVersion,
9880
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
9981
"com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
10082
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
@@ -118,4 +100,4 @@ lazy val tests = project
118100

119101
lazy val root = project
120102
.in(file("."))
121-
.aggregate(migrator, `spark-kinesis-dynamodb`, tests)
103+
.aggregate(migrator, tests)

config.yaml.example

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,41 @@ source:
104104
#
105105
# # how many tasks per executor?
106106
# maxMapTasks: 1
107+
#
108+
# # --- Stream replication tuning (only relevant when target.streamChanges is true) ---
109+
#
110+
# # How often to poll DynamoDB Streams for new records (seconds). Default: 5.
111+
# # streamingPollIntervalSeconds: 5
112+
#
113+
# # Maximum consecutive poll failures before stopping stream replication. Default: 50.
114+
# # streamingMaxConsecutiveErrors: 50
115+
#
116+
# # Thread pool size for polling shards in parallel. Default: max(4, available CPUs).
117+
# # streamingPollingPoolSize: 4
118+
#
119+
# # Lease duration in milliseconds. If a worker doesn't renew within this window,
120+
# # other workers can claim the shard. Default: 60000 (60 seconds).
121+
# # streamingLeaseDurationMs: 60000
122+
#
123+
# # Maximum records to fetch per GetRecords call. Default: DynamoDB default (1000).
124+
# # streamingMaxRecordsPerPoll: 1000
125+
#
126+
# # Maximum records processed per second across all shards. Default: unlimited.
127+
# # streamingMaxRecordsPerSecond: 5000
128+
#
129+
# # Publish stream replication metrics to CloudWatch. Default: false.
130+
# # Requires valid AWS credentials in the source region.
131+
# # streamingEnableCloudWatchMetrics: false
132+
#
133+
# # Overall timeout for DynamoDB Streams API calls (seconds). Default: 30.
134+
# # streamApiCallTimeoutSeconds: 30
135+
#
136+
# # Per-attempt timeout for DynamoDB Streams API calls (seconds). Default: 10.
137+
# # streamApiCallAttemptTimeoutSeconds: 10
138+
#
139+
# # Timeout for awaiting parallel shard poll results (seconds). Default: 60.
140+
# # Increase if the polling pool is saturated and polls take longer than expected.
141+
# # streamingPollFutureTimeoutSeconds: 60
107142

108143
# Example for loading from a DynamoDB S3 export (see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.Output.html)
109144
# source:

docs/source/migrate-from-dynamodb.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ To read from the Alternator, you need to provide an ``endpoint`` instead of a ``
3838
type: dynamodb
3939
table: <table>
4040
endpoint:
41-
host: http://<host>
41+
host: <host>
4242
port: <port>
4343
4444
Where ``<host>`` and ``<port>`` should be replaced with the host name and TCP port of your Alternator instance.

docs/source/stream-changes.rst

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Enable this feature by setting the property ``streamChanges`` to ``true`` in the
2020
2121
In this mode, the migrator has to be interrupted manually with ``Control`` + ``C`` (or by sending a ``SIGINT`` signal to the ``spark-submit`` process). Currently, the created stream is not deleted when the migrator is stopped. You have to delete it manually (e.g. via the AWS Console).
2222

23-
Note that for the migration to be performed without loosing writes, the initial snapshot transfer must complete within 24 hours. Otherwise, some captured changes may be lost due to the retention period of the table’s stream.
23+
Note that for the migration to be performed without losing writes, the initial snapshot transfer must complete within 24 hours. Otherwise, some captured changes may be lost due to the retention period of the table’s stream.
2424

2525
Optionally, you can skip the initial snapshot transfer and only replicate the changed items by setting the property ``skipInitialSnapshotTransfer`` to ``true``:
2626

@@ -31,3 +31,137 @@ Optionally, you can skip the initial snapshot transfer and only replicate the ch
3131
# ...
3232
streamChanges: true
3333
skipInitialSnapshotTransfer: true
34+
35+
Tuning Stream Replication
36+
-------------------------
37+
38+
The following optional properties can be set in the **source** configuration to tune stream replication behavior:
39+
40+
.. list-table::
41+
:header-rows: 1
42+
:widths: 35 15 50
43+
44+
* - Property
45+
- Default
46+
- Description
47+
* - ``streamingPollIntervalSeconds``
48+
- ``5``
49+
- How often (in seconds) to poll DynamoDB Streams for new records.
50+
* - ``streamingMaxConsecutiveErrors``
51+
- ``50``
52+
- Maximum consecutive poll failures before stopping stream replication.
53+
* - ``streamingPollingPoolSize``
54+
- ``max(4, CPUs)``
55+
- Thread pool size for polling shards in parallel.
56+
* - ``streamingLeaseDurationMs``
57+
- ``60000``
58+
- Lease duration in milliseconds. If a worker doesn't renew within this window, other workers can claim the shard.
59+
* - ``streamingMaxRecordsPerPoll``
60+
- unset (DynamoDB service default: ``1000``)
61+
- Maximum records to fetch per ``GetRecords`` call. When not set, the DynamoDB Streams service default of 1000 is used.
62+
* - ``streamingMaxRecordsPerSecond``
63+
- unlimited
64+
- Maximum records processed per second across all shards. Use this to avoid overwhelming the target.
65+
* - ``streamingEnableCloudWatchMetrics``
66+
- ``false``
67+
- Publish stream replication metrics (records processed, active shards, iterator age) to CloudWatch.
68+
* - ``streamApiCallTimeoutSeconds``
69+
- ``30``
70+
- Overall timeout for DynamoDB Streams API calls (seconds).
71+
* - ``streamApiCallAttemptTimeoutSeconds``
72+
- ``10``
73+
- Per-attempt timeout for DynamoDB Streams API calls (seconds).
74+
* - ``streamingPollFutureTimeoutSeconds``
75+
- ``60``
76+
- Timeout for awaiting parallel shard poll results. Increase if the polling pool is saturated and polls take longer than expected.
77+
78+
Example:
79+
80+
.. code-block:: yaml
81+
82+
source:
83+
type: dynamodb
84+
table: my-table
85+
# ...
86+
streamingPollIntervalSeconds: 2
87+
streamingMaxRecordsPerSecond: 5000
88+
streamingPollingPoolSize: 8
89+
90+
Checkpoint Table
91+
----------------
92+
93+
The migrator creates a DynamoDB table named ``migrator_<tableName>_<hash>`` in the source account to track replication progress. This table stores the last processed sequence number for each shard and is used for lease coordination when multiple workers are running.
94+
95+
The checkpoint table has a single hash key (``leaseKey``) and the following columns:
96+
97+
.. list-table::
98+
:header-rows: 1
99+
:widths: 25 10 65
100+
101+
* - Column
102+
- Type
103+
- Description
104+
* - ``leaseKey``
105+
- S
106+
- Hash key. The DynamoDB Streams shard ID.
107+
* - ``checkpoint``
108+
- S
109+
- Last successfully processed sequence number, or ``SHARD_END`` when the shard is fully consumed.
110+
* - ``leaseOwner``
111+
- S
112+
- Worker ID that currently owns this shard's lease.
113+
* - ``leaseExpiryEpochMs``
114+
- N
115+
- Epoch milliseconds when the lease expires. Set to ``0`` when released.
116+
* - ``parentShardId``
117+
- S
118+
- Parent shard ID (if any), used to ensure child shards are processed after their parent is drained.
119+
* - ``leaseTransferTo``
120+
- S
121+
- Worker ID requesting a graceful lease transfer. Removed when the lease is released.
122+
* - ``leaseCounter``
123+
- N
124+
- Monotonically increasing counter incremented on each lease renewal.
125+
126+
The checkpoint table persists across restarts. If the migrator is restarted, it automatically resumes from the last checkpointed position rather than re-processing the entire stream from the beginning. To force re-processing from ``TRIM_HORIZON``, manually delete the checkpoint table before restarting the migrator.
127+
128+
CloudWatch Metrics
129+
------------------
130+
131+
When ``streamingEnableCloudWatchMetrics`` is enabled, the migrator publishes the following metrics to the ``ScyllaMigrator/StreamReplication`` namespace. All metrics include a ``TableName`` dimension set to the source table name.
132+
133+
.. list-table::
134+
:header-rows: 1
135+
:widths: 30 15 55
136+
137+
* - Metric Name
138+
- Unit
139+
- Description
140+
* - ``RecordsProcessed``
141+
- Count
142+
- Number of records processed since the last publish interval (delta).
143+
* - ``ActiveShards``
144+
- Count
145+
- Number of shards currently owned by this worker.
146+
* - ``MaxIteratorAgeMs``
147+
- Milliseconds
148+
- Age of the oldest record seen in the most recent poll cycle, indicating how far behind the stream consumer is.
149+
* - ``PollDurationMs``
150+
- Milliseconds
151+
- Wall-clock time of the most recent poll cycle.
152+
* - ``WriteFailures``
153+
- Count
154+
- Number of write failures since the last publish interval (delta).
155+
* - ``CheckpointFailures``
156+
- Count
157+
- Number of checkpoint failures since the last publish interval (delta).
158+
* - ``DeadLetterItems``
159+
- Count
160+
- Number of items that exhausted write retries since the last publish interval (delta).
161+
162+
Metrics are published every 60 poll cycles (e.g., every 5 minutes with the default 5-second poll interval).
163+
164+
Reserved Attribute Names
165+
------------------------
166+
167+
The stream replication process uses an internal attribute named ``_dynamo_op_type`` to distinguish between put and delete operations. If a source table has an attribute with this exact name, it will be silently overwritten during stream replication. Avoid using ``_dynamo_op_type`` as an attribute name in tables being migrated.

docs/source/tutorials/dynamodb-to-scylladb-alternator/spark-data/config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
source:
22
type: dynamodb
33
endpoint:
4-
host: http://dynamodb
4+
host: dynamodb
55
port: 8000
66
region: us-west-1
77
credentials:
@@ -12,7 +12,7 @@ source:
1212
target:
1313
type: dynamodb
1414
endpoint:
15-
host: http://scylla
15+
host: scylla
1616
port: 8001
1717
region: us-west-1
1818
credentials:

migrator/src/main/scala/com/scylladb/migrator/AttributeValueUtils.scala

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)