Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt
- uses: sbt/setup-sbt@v1
- name: Build migrator
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests-aws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt
- uses: sbt/setup-sbt@v1
- name: Download assembly JAR
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt
- uses: sbt/setup-sbt@v1
- name: Check formatting
Expand All @@ -50,7 +50,7 @@ jobs:
- uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt
- uses: sbt/setup-sbt@v1
- name: Download assembly JAR
Expand All @@ -70,7 +70,7 @@ jobs:
- uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 8
java-version: 11
cache: sbt
- uses: sbt/setup-sbt@v1
- name: Download assembly JAR
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ lazy val migrator = (project in file("migrator"))
"com.scylladb" %% "spark-scylladb-connector" % "4.0.0",
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the Spark ScyllaDB connector
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.8.0",
"com.scylladb.alternator" % "load-balancing" % "1.0.0",
"com.scylladb.alternator" % "load-balancing" % "2.0.1",
"io.circe" %% "circe-generic" % "0.14.7",
"io.circe" %% "circe-parser" % "0.14.7",
"io.circe" %% "circe-yaml" % "0.15.1",
Expand Down
15 changes: 15 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ target:
# # Optional - when streamChanges is true, skip the initial snapshot transfer and only stream changes.
# # This setting is ignored if streamChanges is false.
# #skipInitialSnapshotTransfer: false
#
# # Optional - Alternator-specific settings (only used when endpoint is set):
# #alternator:
# # # Preferred datacenter for DC-aware routing (falls back to all nodes if unavailable)
# # datacenter: dc1
# # # Preferred rack for rack-aware routing (requires datacenter, falls back to DC then all nodes)
# # rack: rack1
# # # Interval (ms) for refreshing the node list while the client is active (default: 1000)
# # activeRefreshIntervalMs: 1000
# # # Interval (ms) for refreshing the node list while the client is idle (default: 60000)
# # idleRefreshIntervalMs: 60000
# # # Enable GZIP compression of request bodies (default: false)
# # compression: false
# # # Optimize HTTP headers to reduce traffic overhead (default: false)
# # optimizeHeaders: false

# Savepoints are configuration files (like this one), saved by the migrator as it
# runs. Their purpose is to skip token ranges that have already been copied. This
Expand Down
128 changes: 116 additions & 12 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package com.scylladb.migrator

import com.scylladb.alternator.AlternatorEndpointProvider
import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings }
import com.scylladb.alternator.AlternatorDynamoDbClient
import com.scylladb.alternator.routing.{ ClusterScope, DatacenterScope, RackScope }
import com.scylladb.alternator.RequestCompressionAlgorithm
import com.scylladb.migrator.config.{
AlternatorSettings,
DynamoDBEndpoint,
SourceSettings,
TargetSettings
}
import org.apache.hadoop.conf.{ Configurable, Configuration }
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDbClientBuilderTransformer }
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.{ DynamoDbClient, DynamoDbClientBuilder }
import software.amazon.awssdk.core.SdkRequest
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
Expand Down Expand Up @@ -50,6 +58,13 @@ import scala.jdk.OptionConverters._
object DynamoUtils {
val log = LogManager.getLogger("com.scylladb.migrator.DynamoUtils")
private val RemoveConsumedCapacityConfig = "scylla.migrator.remove_consumed_capacity"
private val AlternatorDatacenterConfig = "scylla.migrator.alternator.datacenter"
private val AlternatorRackConfig = "scylla.migrator.alternator.rack"
private val AlternatorActiveRefreshConfig =
"scylla.migrator.alternator.active_refresh_interval_ms"
private val AlternatorIdleRefreshConfig = "scylla.migrator.alternator.idle_refresh_interval_ms"
private val AlternatorCompressionConfig = "scylla.migrator.alternator.compression"
private val AlternatorOptimizeHeadersConfig = "scylla.migrator.alternator.optimize_headers"

class RemoveConsumedCapacityInterceptor extends ExecutionInterceptor {
override def modifyRequest(ctx: Context.ModifyRequest, attrs: ExecutionAttributes): SdkRequest =
Expand Down Expand Up @@ -82,7 +97,8 @@ object DynamoUtils {
target.region,
if (target.removeConsumedCapacity.getOrElse(false))
Seq(new RemoveConsumedCapacityInterceptor)
else Nil
else Nil,
target.alternator
)

log.info("Checking for table existence at destination")
Expand Down Expand Up @@ -196,7 +212,8 @@ object DynamoUtils {
source.region,
if (source.removeConsumedCapacity.getOrElse(false))
Seq(new RemoveConsumedCapacityInterceptor)
else Nil
else Nil,
source.alternator
)
val sourceStreamsClient =
buildDynamoStreamsClient(
Expand Down Expand Up @@ -247,15 +264,46 @@ object DynamoUtils {
endpoint: Option[DynamoDBEndpoint],
creds: Option[AwsCredentialsProvider],
region: Option[String],
interceptors: Seq[ExecutionInterceptor]
interceptors: Seq[ExecutionInterceptor],
alternatorSettings: Option[AlternatorSettings] = None
): DynamoDbClient = {
val baseBuilder: DynamoDbClientBuilder =
if (endpoint.isDefined) {
val altBuilder = AlternatorDynamoDbClient.builder()
applyAlternatorSettings(altBuilder, alternatorSettings)
altBuilder
} else DynamoDbClient.builder()
val builder =
AwsUtils.configureClientBuilder(DynamoDbClient.builder(), endpoint, region, creds)
AwsUtils.configureClientBuilder(baseBuilder, endpoint, region, creds)
val conf = ClientOverrideConfiguration.builder()
interceptors.foreach(conf.addExecutionInterceptor)
builder.overrideConfiguration(conf.build()).build()
}

private def applyAlternatorSettings(
altBuilder: AlternatorDynamoDbClient.AlternatorDynamoDbClientBuilder,
alternatorSettings: Option[AlternatorSettings]
): Unit =
for (settings <- alternatorSettings) {
val routingScope = (settings.datacenter, settings.rack) match {
case (Some(dc), Some(rack)) =>
Some(RackScope.of(dc, rack, DatacenterScope.of(dc, ClusterScope.create())))
case (Some(dc), None) =>
Some(DatacenterScope.of(dc, ClusterScope.create()))
case _ => None
}
for (scope <- routingScope)
altBuilder.withRoutingScope(scope)
for (interval <- settings.activeRefreshIntervalMs)
altBuilder.withActiveRefreshIntervalMs(interval)
for (interval <- settings.idleRefreshIntervalMs)
altBuilder.withIdleRefreshIntervalMs(interval)
if (settings.compression.getOrElse(false))
altBuilder.withCompressionAlgorithm(RequestCompressionAlgorithm.GZIP)
if (settings.optimizeHeaders.getOrElse(false))
altBuilder.withOptimizeHeaders(true)
}

def buildDynamoStreamsClient(
endpoint: Option[DynamoDBEndpoint],
creds: Option[AwsCredentialsProvider],
Expand Down Expand Up @@ -300,7 +348,8 @@ object DynamoUtils {
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials],
removeConsumedCapacity: Boolean = false
removeConsumedCapacity: Boolean = false,
alternatorSettings: Option[AlternatorSettings] = None
): Unit = {
for (region <- maybeRegion) {
log.info(s"Using AWS region: ${region}")
Expand Down Expand Up @@ -332,6 +381,27 @@ object DynamoUtils {

jobConf.set("mapred.output.format.class", classOf[DynamoDBOutputFormat].getName)
jobConf.set("mapred.input.format.class", classOf[DynamoDBInputFormat].getName)

for (settings <- alternatorSettings) {
setOptionalConf(jobConf, AlternatorDatacenterConfig, settings.datacenter)
setOptionalConf(jobConf, AlternatorRackConfig, settings.rack)
setOptionalConf(
jobConf,
AlternatorActiveRefreshConfig,
settings.activeRefreshIntervalMs.map(_.toString)
)
setOptionalConf(
jobConf,
AlternatorIdleRefreshConfig,
settings.idleRefreshIntervalMs.map(_.toString)
)
setOptionalConf(jobConf, AlternatorCompressionConfig, settings.compression.map(_.toString))
setOptionalConf(
jobConf,
AlternatorOptimizeHeadersConfig,
settings.optimizeHeaders.map(_.toString)
)
}
}

/** @return
Expand Down Expand Up @@ -366,14 +436,48 @@ object DynamoUtils {
private var conf: Configuration = null

override def apply(builder: DynamoDbClientBuilder): DynamoDbClientBuilder = {
for (customEndpoint <- Option(conf.get(DynamoDBConstants.ENDPOINT)))
builder.endpointProvider(
new AlternatorEndpointProvider(URI.create(customEndpoint))
)
val maybeEndpoint =
Option(conf.get(DynamoDBConstants.ENDPOINT)).map(DynamoDBEndpoint.fromRendered)
val maybeRegion = Option(conf.get(DynamoDBConstants.REGION))
val maybeCreds: Option[AwsCredentialsProvider] =
(
Option(conf.get(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF)),
Option(conf.get(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF))
) match {
case (Some(accessKey), Some(secretKey)) =>
Some(
AWSCredentials(
accessKey,
secretKey,
Option(conf.get(DynamoDBConstants.DYNAMODB_SESSION_TOKEN_CONF))
).toProvider
)
case _ => None
}
val effectiveBuilder: DynamoDbClientBuilder =
if (maybeEndpoint.isDefined) {
val altBuilder = AlternatorDynamoDbClient.builder()
AwsUtils.configureClientBuilder(altBuilder, maybeEndpoint, maybeRegion, maybeCreds)
applyAlternatorSettings(
altBuilder,
Some(
AlternatorSettings(
datacenter = Option(conf.get(AlternatorDatacenterConfig)),
rack = Option(conf.get(AlternatorRackConfig)),
activeRefreshIntervalMs =
Option(conf.get(AlternatorActiveRefreshConfig)).map(_.toLong),
idleRefreshIntervalMs = Option(conf.get(AlternatorIdleRefreshConfig)).map(_.toLong),
compression = Option(conf.get(AlternatorCompressionConfig)).map(_.toBoolean),
optimizeHeaders = Option(conf.get(AlternatorOptimizeHeadersConfig)).map(_.toBoolean)
)
)
)
altBuilder
} else builder
val overrideConf = ClientOverrideConfiguration.builder()
if (conf.get(RemoveConsumedCapacityConfig, "false").toBoolean)
overrideConf.addExecutionInterceptor(new RemoveConsumedCapacityInterceptor)
builder.overrideConfiguration(overrideConf.build())
effectiveBuilder.overrideConfiguration(overrideConf.build())
}

override def setConf(configuration: Configuration): Unit =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.scylladb.migrator.config

import io.circe.{ Decoder, Encoder }
import io.circe.generic.semiauto.{ deriveDecoder, deriveEncoder }

case class AlternatorSettings(
datacenter: Option[String] = None,
rack: Option[String] = None,
activeRefreshIntervalMs: Option[Long] = None,
idleRefreshIntervalMs: Option[Long] = None,
compression: Option[Boolean] = None,
optimizeHeaders: Option[Boolean] = None
)

object AlternatorSettings {
implicit val decoder: Decoder[AlternatorSettings] = deriveDecoder
implicit val encoder: Encoder[AlternatorSettings] = deriveEncoder
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ case class DynamoDBEndpoint(host: String, port: Int) {
object DynamoDBEndpoint {
implicit val encoder: Encoder[DynamoDBEndpoint] = deriveEncoder[DynamoDBEndpoint]
implicit val decoder: Decoder[DynamoDBEndpoint] = deriveDecoder[DynamoDBEndpoint]

/** Parse a string produced by [[DynamoDBEndpoint.renderEndpoint]] back into a
* [[DynamoDBEndpoint]].
*/
def fromRendered(rendered: String): DynamoDBEndpoint = {
val colonIdx = rendered.lastIndexOf(':')
DynamoDBEndpoint(rendered.substring(0, colonIdx), rendered.substring(colonIdx + 1).toInt)
}
}

sealed trait SourceSettings
Expand Down Expand Up @@ -42,7 +50,8 @@ object SourceSettings {
readThroughput: Option[Int],
throughputReadPercent: Option[Float],
maxMapTasks: Option[Int],
removeConsumedCapacity: Option[Boolean] = None
removeConsumedCapacity: Option[Boolean] = None,
alternator: Option[AlternatorSettings] = None
) extends SourceSettings {
lazy val finalCredentials: Option[com.scylladb.migrator.AWSCredentials] =
AwsUtils.computeFinalCredentials(credentials, endpoint, region)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object TargetSettings {
streamChanges: Boolean,
skipInitialSnapshotTransfer: Option[Boolean],
removeConsumedCapacity: Option[Boolean] = Some(true),
billingMode: Option[BillingMode] = None
billingMode: Option[BillingMode] = None,
alternator: Option[AlternatorSettings] = None
) extends TargetSettings {
lazy val finalCredentials: Option[com.scylladb.migrator.AWSCredentials] =
AwsUtils.computeFinalCredentials(credentials, endpoint, region)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.scylladb.migrator.readers

import com.scylladb.migrator.{ AWSCredentials, DynamoUtils }
import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf }
import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings }
import com.scylladb.migrator.config.{ AlternatorSettings, DynamoDBEndpoint, SourceSettings }
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
Expand Down Expand Up @@ -38,7 +38,8 @@ object DynamoDB {
source.readThroughput,
source.throughputReadPercent,
skipSegments,
source.removeConsumedCapacity.getOrElse(false)
source.removeConsumedCapacity.getOrElse(false),
source.alternator
)

/** Overload of `readRDD` that does not depend on `SourceSettings.DynamoDB`
Expand All @@ -54,7 +55,8 @@ object DynamoDB {
readThroughput: Option[Int],
throughputReadPercent: Option[Float],
skipSegments: Option[Set[Int]],
removeConsumedCapacity: Boolean = false
removeConsumedCapacity: Boolean = false,
alternatorSettings: Option[AlternatorSettings] = None
): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = {

val dynamoDbClient =
Expand All @@ -64,7 +66,8 @@ object DynamoDB {
region,
if (removeConsumedCapacity)
Seq(new DynamoUtils.RemoveConsumedCapacityInterceptor)
else Nil
else Nil,
alternatorSettings
)

val tableDescription =
Expand Down Expand Up @@ -93,7 +96,8 @@ object DynamoDB {
tableDescription,
maybeTtlDescription,
skipSegments,
removeConsumedCapacity
removeConsumedCapacity,
alternatorSettings
)

val rdd =
Expand All @@ -119,7 +123,8 @@ object DynamoDB {
description: TableDescription,
maybeTtlDescription: Option[TimeToLiveDescription],
skipSegments: Option[Set[Int]],
removeConsumedCapacity: Boolean
removeConsumedCapacity: Boolean,
alternatorSettings: Option[AlternatorSettings] = None
): JobConf = {
val maybeItemCount = Option(description.itemCount).map(_.toLong)
val maybeAvgItemSize =
Expand All @@ -138,7 +143,8 @@ object DynamoDB {
scanSegments,
maxMapTasks,
credentials,
removeConsumedCapacity
removeConsumedCapacity,
alternatorSettings
)
jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, table)
setOptionalConf(jobConf, DynamoDBConstants.ITEM_COUNT, maybeItemCount.map(_.toString))
Expand Down
Loading