@@ -15,13 +15,7 @@ import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
1515import org .apache .hadoop .dynamodb .{ DynamoDBConstants , DynamoDbClientBuilderTransformer }
1616import org .apache .hadoop .mapred .JobConf
1717import org .apache .log4j .LogManager
18- import software .amazon .awssdk .auth .credentials .{
19- AwsBasicCredentials ,
20- AwsCredentialsProvider ,
21- AwsSessionCredentials ,
22- DefaultCredentialsProvider ,
23- StaticCredentialsProvider
24- }
18+ import software .amazon .awssdk .auth .credentials .AwsCredentialsProvider
2519import software .amazon .awssdk .regions .Region
2620import software .amazon .awssdk .services .dynamodb .{ DynamoDbClient , DynamoDbClientBuilder }
2721import software .amazon .awssdk .core .SdkRequest
@@ -293,13 +287,13 @@ object DynamoUtils {
293287 for (settings <- alternatorSettings) {
294288 val routingScope = (settings.datacenter, settings.rack) match {
295289 case (Some (dc), Some (rack)) =>
296- RackScope .of(dc, rack, DatacenterScope .of(dc, ClusterScope .create()))
290+ Some ( RackScope .of(dc, rack, DatacenterScope .of(dc, ClusterScope .create() )))
297291 case (Some (dc), None ) =>
298- DatacenterScope .of(dc, ClusterScope .create())
299- case _ => null
292+ Some ( DatacenterScope .of(dc, ClusterScope .create() ))
293+ case _ => None
300294 }
301- if (routingScope != null )
302- altBuilder.withRoutingScope(routingScope )
295+ for (scope <- routingScope )
296+ altBuilder.withRoutingScope(scope )
303297 for (interval <- settings.activeRefreshIntervalMs)
304298 altBuilder.withActiveRefreshIntervalMs(interval)
305299 for (interval <- settings.idleRefreshIntervalMs)
@@ -442,47 +436,44 @@ object DynamoUtils {
442436 private var conf : Configuration = null
443437
444438 override def apply (builder : DynamoDbClientBuilder ): DynamoDbClientBuilder = {
439+ val maybeEndpoint =
440+ Option (conf.get(DynamoDBConstants .ENDPOINT )).map(DynamoDBEndpoint .fromRendered)
441+ val maybeRegion = Option (conf.get(DynamoDBConstants .REGION ))
442+ val maybeCreds : Option [AwsCredentialsProvider ] =
443+ (
444+ Option (conf.get(DynamoDBConstants .DYNAMODB_ACCESS_KEY_CONF )),
445+ Option (conf.get(DynamoDBConstants .DYNAMODB_SECRET_KEY_CONF ))
446+ ) match {
447+ case (Some (accessKey), Some (secretKey)) =>
448+ Some (
449+ AWSCredentials (
450+ accessKey,
451+ secretKey,
452+ Option (conf.get(DynamoDBConstants .DYNAMODB_SESSION_TOKEN_CONF ))
453+ ).toProvider
454+ )
455+ case _ => None
456+ }
445457 val effectiveBuilder : DynamoDbClientBuilder =
446- Option (conf.get(DynamoDBConstants .ENDPOINT )) match {
447- case Some (customEndpoint) =>
448- val altBuilder = AlternatorDynamoDbClient .builder()
449- altBuilder.endpointOverride(URI .create(customEndpoint))
450- for (region <- Option (conf.get(DynamoDBConstants .REGION )))
451- altBuilder.region(Region .of(region))
452- (
453- Option (conf.get(DynamoDBConstants .DYNAMODB_ACCESS_KEY_CONF )),
454- Option (conf.get(DynamoDBConstants .DYNAMODB_SECRET_KEY_CONF ))
455- ) match {
456- case (Some (accessKey), Some (secretKey)) =>
457- val awsCreds =
458- Option (conf.get(DynamoDBConstants .DYNAMODB_SESSION_TOKEN_CONF )) match {
459- case Some (token) =>
460- AwsSessionCredentials .create(accessKey, secretKey, token)
461- case None =>
462- AwsBasicCredentials .create(accessKey, secretKey)
463- }
464- altBuilder.credentialsProvider(StaticCredentialsProvider .create(awsCreds))
465- case _ => // No credentials configured - use anonymous (Alternator default)
466- }
467- applyAlternatorSettings(
468- altBuilder,
469- Some (
470- AlternatorSettings (
471- datacenter = Option (conf.get(AlternatorDatacenterConfig )),
472- rack = Option (conf.get(AlternatorRackConfig )),
473- activeRefreshIntervalMs =
474- Option (conf.get(AlternatorActiveRefreshConfig )).map(_.toLong),
475- idleRefreshIntervalMs =
476- Option (conf.get(AlternatorIdleRefreshConfig )).map(_.toLong),
477- compression = Option (conf.get(AlternatorCompressionConfig )).map(_.toBoolean),
478- optimizeHeaders =
479- Option (conf.get(AlternatorOptimizeHeadersConfig )).map(_.toBoolean)
480- )
458+ if (maybeEndpoint.isDefined) {
459+ val altBuilder = AlternatorDynamoDbClient .builder()
460+ AwsUtils .configureClientBuilder(altBuilder, maybeEndpoint, maybeRegion, maybeCreds)
461+ applyAlternatorSettings(
462+ altBuilder,
463+ Some (
464+ AlternatorSettings (
465+ datacenter = Option (conf.get(AlternatorDatacenterConfig )),
466+ rack = Option (conf.get(AlternatorRackConfig )),
467+ activeRefreshIntervalMs =
468+ Option (conf.get(AlternatorActiveRefreshConfig )).map(_.toLong),
469+ idleRefreshIntervalMs = Option (conf.get(AlternatorIdleRefreshConfig )).map(_.toLong),
470+ compression = Option (conf.get(AlternatorCompressionConfig )).map(_.toBoolean),
471+ optimizeHeaders = Option (conf.get(AlternatorOptimizeHeadersConfig )).map(_.toBoolean)
481472 )
482473 )
483- altBuilder
484- case None => builder
485- }
474+ )
475+ altBuilder
476+ } else builder
486477 val overrideConf = ClientOverrideConfiguration .builder()
487478 if (conf.get(RemoveConsumedCapacityConfig , " false" ).toBoolean)
488479 overrideConf.addExecutionInterceptor(new RemoveConsumedCapacityInterceptor )
0 commit comments