Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ jobs:
target/
migrator/target/scala-2.13/classes/
tests/target/scala-2.13/classes/
spark-kinesis-dynamodb/target/scala-2.13/classes/
project/target/
key: sbt-target-${{ runner.os }}-${{ hashFiles('**/*.scala', '**/*.sbt', 'project/build.properties') }}
restore-keys: |
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ jobs:
target/
migrator/target/scala-2.13/classes/
tests/target/scala-2.13/classes/
spark-kinesis-dynamodb/target/scala-2.13/classes/
project/target/
key: sbt-target-${{ runner.os }}-${{ hashFiles('**/*.scala', '**/*.sbt', 'project/build.properties') }}
restore-keys: |
Expand Down
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,12 @@ AGENTS.md
claude.log
claude_history.json
claude_config.json
CLAUDE.md
CLAUDE.md

# Docker test runtime data (written by Scylla containers)
tests/docker/scylla/*
!tests/docker/scylla/.gitkeep
tests/docker/scylla-source/*
!tests/docker/scylla-source/.gitkeep
tests/docker/parquet/*
!tests/docker/parquet/.gitkeep
4 changes: 2 additions & 2 deletions ansible/files/config.dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ source:
# if using DynamoDB local - specify endpoint with http:// prefix
# Otherwise, if using aws configure, leave commented out and it will use that
#endpoint:
# host: http://dynamo-local-db-source
# host: dynamo-local-db-source
# port: 8000
#credentials:
# accessKey: empty
Expand Down Expand Up @@ -36,7 +36,7 @@ target:
# The easiest and least performant option is just point to a single nodes IP.
# YOUR-SCYLLA-NODE0-IP scylla
endpoint:
host: http://YOUR_ALTERNATOR_INSTANCE
host: YOUR_ALTERNATOR_INSTANCE
port: 8000
credentials:
accessKey: empty
Expand Down
28 changes: 5 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import sbt.librarymanagement.InclExclRule
val awsSdkVersion = "2.23.19"
val sparkVersion = "3.5.8"
val hadoopVersion = "3.3.4"
val dynamodbStreamsKinesisAdapterVersion =
"1.5.4" // Note This version still depends on AWS SDK 1.x, but there is no more recent version that supports AWS SDK v2.

inThisBuild(
List(
Expand All @@ -15,20 +13,6 @@ inThisBuild(
)
)

// Augmentation of spark-streaming-kinesis-asl to also work with DynamoDB Streams
lazy val `spark-kinesis-dynamodb` = project
.in(file("spark-kinesis-dynamodb"))
.settings(
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion)
.excludeAll(
InclExclRule("org.apache.spark", s"spark-streaming_${scalaBinaryVersion.value}")
), // For some reason, the Spark dependency is not marked as provided in spark-streaming-kinesis-asl. We exclude it and then add it as provided.
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion
)
)

lazy val migrator = (project in file("migrator"))
.enablePlugins(BuildInfoPlugin)
.settings(
Expand All @@ -43,8 +27,7 @@ lazy val migrator = (project in file("migrator"))
Test / fork := false,
scalafmtOnCompile := true,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) // Note: this package still depends on the AWS SDK v1
// Exclude the AWS bundle because it creates many conflicts when generating the assembly
.excludeAll(
Expand All @@ -54,6 +37,7 @@ lazy val migrator = (project in file("migrator"))
"software.amazon.awssdk" % "dynamodb" % awsSdkVersion,
"software.amazon.awssdk" % "s3" % awsSdkVersion,
"software.amazon.awssdk" % "sts" % awsSdkVersion,
"software.amazon.awssdk" % "cloudwatch" % awsSdkVersion,
"com.scylladb" %% "spark-scylladb-connector" % "4.0.0",
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the Spark ScyllaDB connector
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.8.0",
Expand All @@ -71,9 +55,9 @@ lazy val migrator = (project in file("migrator"))
case "mime.types" => MergeStrategy.first
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.concat
case PathList("META-INF", "versions", _, "module-info.class") =>
MergeStrategy.discard // OK as long as we dont rely on Java 9+ features such as SPI
MergeStrategy.discard // OK as long as we don't rely on Java 9+ features such as SPI
case "module-info.class" =>
MergeStrategy.discard // OK as long as we dont rely on Java 9+ features such as SPI
MergeStrategy.discard // OK as long as we don't rely on Java 9+ features such as SPI
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
Expand All @@ -86,15 +70,13 @@ lazy val migrator = (project in file("migrator"))
.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner)
.evaluated
)
.dependsOn(`spark-kinesis-dynamodb`)

lazy val tests = project
.in(file("tests"))
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "dynamodb" % awsSdkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
"com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
Expand All @@ -118,4 +100,4 @@ lazy val tests = project

lazy val root = project
.in(file("."))
.aggregate(migrator, `spark-kinesis-dynamodb`, tests)
.aggregate(migrator, tests)
35 changes: 35 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,41 @@ source:
#
# # how many tasks per executor?
# maxMapTasks: 1
#
# # --- Stream replication tuning (only relevant when target.streamChanges is true) ---
#
# # How often to poll DynamoDB Streams for new records (seconds). Default: 5.
# # streamingPollIntervalSeconds: 5
#
# # Maximum consecutive poll failures before stopping stream replication. Default: 50.
# # streamingMaxConsecutiveErrors: 50
#
# # Thread pool size for polling shards in parallel. Default: max(4, available CPUs).
# # streamingPollingPoolSize: 4
#
# # Lease duration in milliseconds. If a worker doesn't renew within this window,
# # other workers can claim the shard. Default: 60000 (60 seconds).
# # streamingLeaseDurationMs: 60000
#
# # Maximum records to fetch per GetRecords call. Default: DynamoDB default (1000).
# # streamingMaxRecordsPerPoll: 1000
#
# # Maximum records processed per second across all shards. Default: unlimited.
# # streamingMaxRecordsPerSecond: 5000
#
# # Publish stream replication metrics to CloudWatch. Default: false.
# # Requires valid AWS credentials in the source region.
# # streamingEnableCloudWatchMetrics: false
#
# # Overall timeout for DynamoDB Streams API calls (seconds). Default: 30.
# # streamApiCallTimeoutSeconds: 30
#
# # Per-attempt timeout for DynamoDB Streams API calls (seconds). Default: 10.
# # streamApiCallAttemptTimeoutSeconds: 10
#
# # Timeout for awaiting parallel shard poll results (seconds). Default: 60.
# # Increase if the polling pool is saturated and polls take longer than expected.
# # streamingPollFutureTimeoutSeconds: 60

# Example for loading from a DynamoDB S3 export (see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.Output.html)
# source:
Expand Down
2 changes: 1 addition & 1 deletion docs/source/migrate-from-dynamodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ To read from the Alternator, you need to provide an ``endpoint`` instead of a ``
type: dynamodb
table: <table>
endpoint:
host: http://<host>
host: <host>
port: <port>

Where ``<host>`` and ``<port>`` should be replaced with the host name and TCP port of your Alternator instance.
Expand Down
136 changes: 135 additions & 1 deletion docs/source/stream-changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Enable this feature by setting the property ``streamChanges`` to ``true`` in the

In this mode, the migrator has to be interrupted manually with ``Control`` + ``C`` (or by sending a ``SIGINT`` signal to the ``spark-submit`` process). Currently, the created stream is not deleted when the migrator is stopped. You have to delete it manually (e.g. via the AWS Console).

Note that for the migration to be performed without loosing writes, the initial snapshot transfer must complete within 24 hours. Otherwise, some captured changes may be lost due to the retention period of the table’s stream.
Note that for the migration to be performed without losing writes, the initial snapshot transfer must complete within 24 hours. Otherwise, some captured changes may be lost due to the retention period of the table’s stream.

Optionally, you can skip the initial snapshot transfer and only replicate the changed items by setting the property ``skipInitialSnapshotTransfer`` to ``true``:

Expand All @@ -31,3 +31,137 @@ Optionally, you can skip the initial snapshot transfer and only replicate the ch
# ...
streamChanges: true
skipInitialSnapshotTransfer: true

Tuning Stream Replication
-------------------------

The following optional properties can be set in the **source** configuration to tune stream replication behavior:

.. list-table::
:header-rows: 1
:widths: 35 15 50

* - Property
- Default
- Description
* - ``streamingPollIntervalSeconds``
- ``5``
- How often (in seconds) to poll DynamoDB Streams for new records.
* - ``streamingMaxConsecutiveErrors``
- ``50``
- Maximum consecutive poll failures before stopping stream replication.
* - ``streamingPollingPoolSize``
- ``max(4, CPUs)``
- Thread pool size for polling shards in parallel.
* - ``streamingLeaseDurationMs``
- ``60000``
- Lease duration in milliseconds. If a worker doesn't renew within this window, other workers can claim the shard.
* - ``streamingMaxRecordsPerPoll``
- unset (DynamoDB service default: ``1000``)
- Maximum records to fetch per ``GetRecords`` call. When not set, the DynamoDB Streams service default of 1000 is used.
* - ``streamingMaxRecordsPerSecond``
- unlimited
- Maximum records processed per second across all shards. Use this to avoid overwhelming the target.
* - ``streamingEnableCloudWatchMetrics``
- ``false``
- Publish stream replication metrics (records processed, active shards, iterator age) to CloudWatch.
* - ``streamApiCallTimeoutSeconds``
- ``30``
- Overall timeout for DynamoDB Streams API calls (seconds).
* - ``streamApiCallAttemptTimeoutSeconds``
- ``10``
- Per-attempt timeout for DynamoDB Streams API calls (seconds).
* - ``streamingPollFutureTimeoutSeconds``
- ``60``
- Timeout for awaiting parallel shard poll results. Increase if the polling pool is saturated and polls take longer than expected.

Example:

.. code-block:: yaml

source:
type: dynamodb
table: my-table
# ...
streamingPollIntervalSeconds: 2
streamingMaxRecordsPerSecond: 5000
streamingPollingPoolSize: 8

Checkpoint Table
----------------

The migrator creates a DynamoDB table named ``migrator_<tableName>_<hash>`` in the source account to track replication progress. This table stores the last processed sequence number for each shard and is used for lease coordination when multiple workers are running.

The checkpoint table has a single hash key (``leaseKey``) and the following columns:

.. list-table::
:header-rows: 1
:widths: 25 10 65

* - Column
- Type
- Description
* - ``leaseKey``
- S
- Hash key. The DynamoDB Streams shard ID.
* - ``checkpoint``
- S
- Last successfully processed sequence number, or ``SHARD_END`` when the shard is fully consumed.
* - ``leaseOwner``
- S
- Worker ID that currently owns this shard's lease.
* - ``leaseExpiryEpochMs``
- N
- Epoch milliseconds when the lease expires. Set to ``0`` when released.
* - ``parentShardId``
- S
- Parent shard ID (if any), used to ensure child shards are processed after their parent is drained.
* - ``leaseTransferTo``
- S
- Worker ID requesting a graceful lease transfer. Removed when the lease is released.
* - ``leaseCounter``
- N
- Monotonically increasing counter incremented on each lease renewal.

The checkpoint table persists across restarts. If the migrator is restarted, it automatically resumes from the last checkpointed position rather than re-processing the entire stream from the beginning. To force re-processing from ``TRIM_HORIZON``, manually delete the checkpoint table before restarting the migrator.

CloudWatch Metrics
------------------

When ``streamingEnableCloudWatchMetrics`` is enabled, the migrator publishes the following metrics to the ``ScyllaMigrator/StreamReplication`` namespace. All metrics include a ``TableName`` dimension set to the source table name.

.. list-table::
:header-rows: 1
:widths: 30 15 55

* - Metric Name
- Unit
- Description
* - ``RecordsProcessed``
- Count
- Number of records processed since the last publish interval (delta).
* - ``ActiveShards``
- Count
- Number of shards currently owned by this worker.
* - ``MaxIteratorAgeMs``
- Milliseconds
- Age of the oldest record seen in the most recent poll cycle, indicating how far behind the stream consumer is.
* - ``PollDurationMs``
- Milliseconds
- Wall-clock time of the most recent poll cycle.
* - ``WriteFailures``
- Count
- Number of write failures since the last publish interval (delta).
* - ``CheckpointFailures``
- Count
- Number of checkpoint failures since the last publish interval (delta).
* - ``DeadLetterItems``
- Count
- Number of items that exhausted write retries since the last publish interval (delta).

Metrics are published every 60 poll cycles (e.g., every 5 minutes with the default 5-second poll interval).

Reserved Attribute Names
------------------------

The stream replication process uses an internal attribute named ``_dynamo_op_type`` to distinguish between put and delete operations. If a source table has an attribute with this exact name, it will be silently overwritten during stream replication. Avoid using ``_dynamo_op_type`` as an attribute name in tables being migrated.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source:
type: dynamodb
endpoint:
host: http://dynamodb
host: dynamodb
port: 8000
region: us-west-1
credentials:
Expand All @@ -12,7 +12,7 @@ source:
target:
type: dynamodb
endpoint:
host: http://scylla
host: scylla
port: 8001
region: us-west-1
credentials:
Expand Down

This file was deleted.

Loading
Loading