Skip to content

Commit d5b51fe

Browse files
Junyu Chensarutak
andcommitted
[SPARK-45720][BUILD][DSTREAM][KINESIS] Upgrade KCL to 2.7.2 and remove AWS SDK for Java 1.x dependency
### What changes were proposed in this pull request? This PR proposes to upgrade KCL to 2.7.2 based on junyuc25 's [PR](#44211) with some updates. By upgrading KCL, we can remove AWS SDK for Java 1.x dependency. * Basic migration guide. * https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html ### Why are the changes needed? * KCL 1.x will reach end-of-life on January 30, 2026. * https://docs.aws.amazon.com/streams/latest/dev/kcl-version-lifecycle-policy.html * Currently, Spark depends on both AWS SDK for Java 1.x and 2.x. 1.x dependency can be removed by this PR. ### Does this PR introduce _any_ user-facing change? Expect the behavior is not changed. ### How was this patch tested? Confirmed all kinesis tests passed with the following commands. * SBT ``` $ ENABLE_KINESIS_TESTS=1 nohup ./build/sbt -Pkinesis-asl 'streaming-kinesis-asl/test' ``` * Maven ``` $ ENABLE_KINESIS_TESTS=1 build/mvn -Pkinesis-asl -Dtest=org.apache.spark.streaming.kinesis.JavaKinesisInputDStreamBuilderSuite -DwildcardSuit\ es=org.apache.spark.streaming.kinesis test ``` Also confirmed existing examples work. ``` # Need to do `build/sbt -Pkinesis-asl package` beforehand # Producer $ bin/run-example streaming.KinesisWordProducerASL kinesis-example-stream https://kinesis.us-west-2.amazonaws.com 10 5 # Consumer $ bin/run-example streaming.KinesisWordCountASL my-stream-app kinesis-example-stream https://kinesis.us-west-2.amazonaws.com ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53256 from sarutak/upgrade-aws-sdk2. Lead-authored-by: Junyu Chen <[email protected]> Co-authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
1 parent d0cbad5 commit d5b51fe

File tree

20 files changed

+565
-373
lines changed

20 files changed

+565
-373
lines changed

connector/kinesis-asl/pom.xml

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,64 @@
5454
<scope>test</scope>
5555
</dependency>
5656
<dependency>
57-
<groupId>com.amazonaws</groupId>
57+
<groupId>software.amazon.kinesis</groupId>
5858
<artifactId>amazon-kinesis-client</artifactId>
5959
<version>${aws.kinesis.client.version}</version>
60+
<exclusions>
61+
<!--
62+
mbknor-jackson-jsonschema is necessary at runtime only if JSON format schema is
63+
registered using GlueSchemaRegistry. kinesis-asl currently doesn't use this feature
64+
so it should be safe to exclude the dependency.
65+
-->
66+
<exclusion>
67+
<groupId>com.kjetland</groupId>
68+
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
69+
</exclusion>
70+
<exclusion>
71+
<groupId>org.lz4</groupId>
72+
<artifactId>lz4-java</artifactId>
73+
</exclusion>
74+
</exclusions>
75+
</dependency>
76+
<dependency>
77+
<groupId>software.amazon.awssdk</groupId>
78+
<artifactId>auth</artifactId>
79+
<version>${aws.java.sdk.v2.version}</version>
80+
</dependency>
81+
<dependency>
82+
<groupId>software.amazon.awssdk</groupId>
83+
<artifactId>sts</artifactId>
84+
<version>${aws.java.sdk.v2.version}</version>
85+
</dependency>
86+
<dependency>
87+
<groupId>software.amazon.awssdk</groupId>
88+
<artifactId>apache-client</artifactId>
89+
<version>${aws.java.sdk.v2.version}</version>
90+
</dependency>
91+
<dependency>
92+
<groupId>software.amazon.awssdk</groupId>
93+
<artifactId>regions</artifactId>
94+
<version>${aws.java.sdk.v2.version}</version>
95+
</dependency>
96+
<dependency>
97+
<groupId>software.amazon.awssdk</groupId>
98+
<artifactId>dynamodb</artifactId>
99+
<version>${aws.java.sdk.v2.version}</version>
100+
</dependency>
101+
<dependency>
102+
<groupId>software.amazon.awssdk</groupId>
103+
<artifactId>kinesis</artifactId>
104+
<version>${aws.java.sdk.v2.version}</version>
105+
</dependency>
106+
<dependency>
107+
<groupId>software.amazon.awssdk</groupId>
108+
<artifactId>cloudwatch</artifactId>
109+
<version>${aws.java.sdk.v2.version}</version>
60110
</dependency>
61111
<dependency>
62-
<groupId>com.amazonaws</groupId>
63-
<artifactId>aws-java-sdk-sts</artifactId>
64-
<version>${aws.java.sdk.version}</version>
112+
<groupId>software.amazon.awssdk</groupId>
113+
<artifactId>sdk-core</artifactId>
114+
<version>${aws.java.sdk.v2.version}</version>
65115
</dependency>
66116
<dependency>
67117
<groupId>software.amazon.kinesis</groupId>

connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.examples.streaming;
1818

19+
import java.net.URI;
1920
import java.nio.charset.StandardCharsets;
2021
import java.util.ArrayList;
2122
import java.util.Arrays;
@@ -38,8 +39,10 @@
3839
import scala.Tuple2;
3940
import scala.reflect.ClassTag$;
4041

41-
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
42-
import com.amazonaws.services.kinesis.AmazonKinesisClient;
42+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
43+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
44+
import software.amazon.awssdk.services.kinesis.KinesisClient;
45+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
4346

4447
/**
4548
* Consumes messages from a Amazon Kinesis streams and does wordcount.
@@ -66,7 +69,7 @@
6669
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
6770
* onto the Kinesis stream.
6871
*
69-
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
72+
* This code uses the DefaultCredentialsProvider to find credentials
7073
* in the following order:
7174
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
7275
* Java System Properties - aws.accessKeyId and aws.secretKey
@@ -106,11 +109,19 @@ public static void main(String[] args) throws Exception {
106109
String endpointUrl = args[2];
107110

108111
// Create a Kinesis client in order to determine the number of shards for the given stream
109-
AmazonKinesisClient kinesisClient =
110-
new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
111-
kinesisClient.setEndpoint(endpointUrl);
112+
KinesisClient kinesisClient =
113+
KinesisClient.builder()
114+
.credentialsProvider(DefaultCredentialsProvider.create())
115+
.endpointOverride(URI.create(endpointUrl))
116+
.httpClientBuilder(ApacheHttpClient.builder())
117+
.build();
118+
119+
DescribeStreamRequest describeStreamRequest =
120+
DescribeStreamRequest.builder()
121+
.streamName(streamName)
122+
.build();
112123
int numShards =
113-
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
124+
kinesisClient.describeStream(describeStreamRequest).streamDescription().shards().size();
114125

115126

116127
// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.

connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.streaming.kinesis;
1818

19-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
19+
import software.amazon.kinesis.common.InitialPositionInStream;
2020

2121
import java.io.Serializable;
2222
import java.util.Date;

connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming
1919

2020
import scala.jdk.CollectionConverters._
2121

22-
import com.amazonaws.regions.RegionUtils
23-
import com.amazonaws.services.kinesis.AmazonKinesis
22+
import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
2423

2524
private[streaming] object KinesisExampleUtils {
2625
def getRegionNameByEndpoint(endpoint: String): String = {
2726
val uri = new java.net.URI(endpoint)
28-
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
27+
val kinesisServiceMetadata = new KinesisServiceMetadata()
28+
kinesisServiceMetadata.regions
2929
.asScala
30-
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
31-
.map(_.getName)
30+
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
31+
.map(_.id)
3232
.getOrElse(
3333
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
3434
}

connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
// scalastyle:off println
1919
package org.apache.spark.examples.streaming
2020

21+
import java.net.URI
2122
import java.nio.ByteBuffer
2223

2324
import scala.util.Random
2425

25-
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
26-
import com.amazonaws.services.kinesis.AmazonKinesisClient
27-
import com.amazonaws.services.kinesis.model.PutRecordRequest
2826
import org.apache.logging.log4j.Level
2927
import org.apache.logging.log4j.core.config.Configurator
28+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
29+
import software.amazon.awssdk.core.SdkBytes
30+
import software.amazon.awssdk.http.apache.ApacheHttpClient
31+
import software.amazon.awssdk.services.kinesis.KinesisClient
32+
import software.amazon.awssdk.services.kinesis.model.{DescribeStreamRequest, PutRecordRequest}
3033

3134
import org.apache.spark.SparkConf
3235
import org.apache.spark.internal.Logging
@@ -101,13 +104,22 @@ object KinesisWordCountASL extends Logging {
101104

102105
// Determine the number of shards from the stream using the low-level Kinesis Client
103106
// from the AWS Java SDK.
104-
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
105-
require(credentials != null,
107+
val credentialsProvider = DefaultCredentialsProvider.create
108+
require(credentialsProvider.resolveCredentials() != null,
106109
"No AWS credentials found. Please specify credentials using one of the methods specified " +
107-
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
108-
val kinesisClient = new AmazonKinesisClient(credentials)
109-
kinesisClient.setEndpoint(endpointUrl)
110-
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
110+
"in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
111+
val kinesisClient = KinesisClient.builder()
112+
.credentialsProvider(credentialsProvider)
113+
.endpointOverride(URI.create(endpointUrl))
114+
.httpClientBuilder(ApacheHttpClient.builder())
115+
.build()
116+
val describeStreamRequest = DescribeStreamRequest.builder()
117+
.streamName(streamName)
118+
.build()
119+
val numShards = kinesisClient.describeStream(describeStreamRequest)
120+
.streamDescription
121+
.shards
122+
.size
111123

112124

113125
// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
@@ -221,8 +233,11 @@ object KinesisWordProducerASL {
221233
val totals = scala.collection.mutable.Map[String, Int]()
222234

223235
// Create the low-level Kinesis Client from the AWS Java SDK.
224-
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
225-
kinesisClient.setEndpoint(endpoint)
236+
val kinesisClient = KinesisClient.builder()
237+
.credentialsProvider(DefaultCredentialsProvider.create())
238+
.endpointOverride(URI.create(endpoint))
239+
.httpClientBuilder(ApacheHttpClient.builder())
240+
.build()
226241

227242
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
228243
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
@@ -247,12 +262,14 @@ object KinesisWordProducerASL {
247262
val partitionKey = s"partitionKey-$recordNum"
248263

249264
// Create a PutRecordRequest with an Array[Byte] version of the data
250-
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
251-
.withPartitionKey(partitionKey)
252-
.withData(ByteBuffer.wrap(data.getBytes()))
265+
val putRecordRequest = PutRecordRequest.builder()
266+
.streamName(stream)
267+
.partitionKey(partitionKey)
268+
.data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(data.getBytes())))
269+
.build()
253270

254271
// Put the record onto the stream and capture the PutRecordResult
255-
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
272+
kinesisClient.putRecord(putRecordRequest)
256273
}
257274

258275
// Sleep for a second

0 commit comments

Comments
 (0)