Skip to content

Commit b659439

Browse files
Enrico OlivelliSteNicholas
authored andcommitted
[CELEBORN-2254] Fix support for S3 and add a simple integration test
### What changes were proposed in this pull request? * Fix creating files to S3 (and other DFS) * Add integration test for Spark and S3 (using Minio) * in CI some job will run with the AWS profile because this way we can activate the new integration test (that needs the S3 client dependencies) ### Why are the changes needed? See https://issues.apache.org/jira/browse/CELEBORN-2254. ### Does this PR resolve a correctness bug? No ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * I have added an integration test * I have this patch on out internal fork, to make Celeborn run on k8s with S3 Closes apache#3592 from eolivelli/CELEBORN-2254-apache. Authored-by: Enrico Olivelli <enrico@beast.io> Signed-off-by: SteNicholas <programgeek@163.com>
1 parent 2097ad0 commit b659439

File tree

11 files changed

+198
-16
lines changed

11 files changed

+198
-16
lines changed

.github/workflows/maven.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ jobs:
133133
run: |
134134
SPARK_BINARY_VERSION=${{ matrix.spark }}
135135
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
136-
PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
136+
# enable AWS profile to run end-to-end tests with S3 storage
137+
PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
137138
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
138139
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
139140
build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
@@ -170,7 +171,8 @@ jobs:
170171
run: |
171172
SPARK_BINARY_VERSION=${{ matrix.spark }}
172173
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
173-
PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
174+
# enable AWS profile to run end-to-end tests with S3 storage
175+
PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
174176
TEST_MODULES="client-spark/common,client-spark/spark-3,client-spark/spark-3-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
175177
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
176178
build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test

client/src/main/java/org/apache/celeborn/client/ShuffleClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22+
import java.util.Collections;
2223
import java.util.Map;
2324
import java.util.Optional;
2425
import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +123,7 @@ public static Map<StorageInfo.Type, FileSystem> getHadoopFs(CelebornConf conf) {
122123
hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
123124
} catch (Exception e) {
124125
logger.error("Celeborn initialize DFS failed.", e);
126+
hadoopFs = Collections.emptyMap();
125127
}
126128
}
127129
}

client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.netty.buffer.Unpooled;
3737
import org.apache.commons.lang3.StringUtils;
3838
import org.apache.commons.lang3.tuple.Pair;
39+
import org.apache.hadoop.fs.FileSystem;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

@@ -231,6 +232,13 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
231232

232233
reviveManager = new ReviveManager(this, conf);
233234

235+
if (conf.hasS3Storage()) {
236+
Map<StorageInfo.Type, FileSystem> hadoopFs = getHadoopFs(conf);
237+
FileSystem s3client = hadoopFs.get(StorageInfo.Type.S3);
238+
logger.info("S3 client: {}", s3client);
239+
if (s3client == null)
240+
throw new IllegalStateException("S3 type is requred but the S3 client was not created");
241+
}
234242
logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
235243
}
236244

common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ object CelebornHadoopUtils extends Logging {
9999
dirs.foreach {
100100
case (storageType, dir) => {
101101
val path = new Path(dir)
102+
logInfo(s"Creating HadoopFS for type $storageType at path $path");
102103
hadoopFs.put(storageType, path.getFileSystem(hadoopConf))
103104
}
104105
})

multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
import com.amazonaws.AmazonClientException;
2828
import com.amazonaws.ClientConfiguration;
2929
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
30+
import com.amazonaws.client.builder.AwsClientBuilder;
3031
import com.amazonaws.event.ProgressListener;
3132
import com.amazonaws.retry.PredefinedBackoffStrategies;
3233
import com.amazonaws.retry.PredefinedRetryPolicies;
3334
import com.amazonaws.retry.RetryPolicy;
3435
import com.amazonaws.services.s3.AmazonS3;
3536
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
37+
import com.amazonaws.services.s3.model.*;
3638
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
3739
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
3840
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -61,15 +63,15 @@ public class S3MultipartUploadHandler implements MultipartUploadHandler {
6163

6264
private String uploadId;
6365

64-
private AmazonS3 s3Client;
66+
private final AmazonS3 s3Client;
6567

66-
private String key;
68+
private final String key;
6769

68-
private String bucketName;
70+
private final String bucketName;
6971

70-
private Integer s3MultiplePartUploadMaxRetries;
71-
private Integer baseDelay;
72-
private Integer maxBackoff;
72+
private final Integer s3MultiplePartUploadMaxRetries;
73+
private final Integer baseDelay;
74+
private final Integer maxBackoff;
7375

7476
public S3MultipartUploadHandler(
7577
FileSystem hadoopFs,
@@ -103,12 +105,23 @@ public S3MultipartUploadHandler(
103105
new ClientConfiguration()
104106
.withRetryPolicy(retryPolicy)
105107
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
106-
this.s3Client =
108+
AmazonS3ClientBuilder builder =
107109
AmazonS3ClientBuilder.standard()
108110
.withCredentials(providers)
109-
.withRegion(conf.get(Constants.AWS_REGION))
110-
.withClientConfiguration(clientConfig)
111-
.build();
111+
.withClientConfiguration(clientConfig);
112+
// for MinIO
113+
String endpoint = conf.get("fs.s3a.endpoint");
114+
if (endpoint != null && !endpoint.isEmpty()) {
115+
builder =
116+
builder
117+
.withEndpointConfiguration(
118+
new AwsClientBuilder.EndpointConfiguration(
119+
endpoint, conf.get(Constants.AWS_REGION)))
120+
.withPathStyleAccessEnabled(conf.getBoolean("fs.s3a.path.style.access", false));
121+
} else {
122+
builder = builder.withRegion(conf.get(Constants.AWS_REGION));
123+
}
124+
this.s3Client = builder.build();
112125
this.key = key;
113126
}
114127

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
<maven.plugin.silencer.version>1.7.19</maven.plugin.silencer.version>
156156
<maven.plugin.resources.version>3.3.1</maven.plugin.resources.version>
157157
<openapi.generator.version>7.8.0</openapi.generator.version>
158+
<testcontainers-minio.version>1.21.4</testcontainers-minio.version>
158159

159160
<!-- Allow modules to enable / disable certain build plugins easily. -->
160161
<testJarPhase>prepare-package</testJarPhase>
@@ -525,6 +526,12 @@
525526
<version>${jakarta.ws.rs-api.version}</version>
526527
</dependency>
527528

529+
<dependency>
530+
<groupId>org.testcontainers</groupId>
531+
<artifactId>minio</artifactId>
532+
<version>${testcontainers-minio.version}</version>
533+
</dependency>
534+
528535
<dependency>
529536
<groupId>org.openapitools</groupId>
530537
<artifactId>jackson-databind-nullable</artifactId>

project/CelebornBuild.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ object Dependencies {
9090
val picocliVersion = "4.7.6"
9191
val jmhVersion = "1.37"
9292

93+
// S3 integration tests with Minio
94+
val testContainerMinioVersion = "1.21.4"
95+
9396
// For SSL support
9497
val bouncycastleVersion = "1.77"
9598

@@ -213,6 +216,8 @@ object Dependencies {
213216
val jakartaAnnotationApi = "jakarta.annotation" % "jakarta.annotation-api" % jakartaAnnotationApiVersion
214217
val jakartaWsRsApi = "jakarta.ws.rs" % "jakarta.ws.rs-api" % jakartaWsRsApiVersion
215218

219+
val testContainerMinio = "org.testcontainers" % "minio" % testContainerMinioVersion
220+
216221
// Test dependencies
217222
// https://www.scala-sbt.org/1.x/docs/Testing.html
218223
val junitInterface = "com.github.sbt" % "junit-interface" % junitInterfaceVersion
@@ -425,7 +430,8 @@ object CelebornCommonSettings {
425430
Dependencies.scalatest % "test",
426431
Dependencies.junit % "test",
427432
// https://www.scala-sbt.org/1.x/docs/Testing.html
428-
Dependencies.junitInterface % "test")
433+
Dependencies.junitInterface % "test",
434+
Dependencies.testContainerMinio % "test")
429435
}
430436

431437
object CelebornBuild extends sbt.internal.BuildDef {

tests/spark-it/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@
182182
<artifactId>jakarta.servlet-api</artifactId>
183183
<scope>test</scope>
184184
</dependency>
185+
<dependency>
186+
<groupId>org.testcontainers</groupId>
187+
<artifactId>minio</artifactId>
188+
<scope>test</scope>
189+
</dependency>
185190
</dependencies>
186191
<profiles>
187192
<profile>
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.celeborn.tests.spark
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.sql.SparkSession
22+
import org.scalatest.BeforeAndAfterEach
23+
import org.scalatest.funsuite.AnyFunSuite
24+
import org.testcontainers.containers.MinIOContainer
25+
26+
import org.apache.celeborn.client.ShuffleClient
27+
import org.apache.celeborn.common.CelebornConf
28+
import org.apache.celeborn.common.protocol.ShuffleMode
29+
30+
class BasicEndToEndTieredStorageTest extends AnyFunSuite
31+
with SparkTestBase
32+
with BeforeAndAfterEach {
33+
34+
var container: MinIOContainer = null;
35+
val skipAWSTest = !isClassPresent("org.apache.hadoop.fs.s3a.S3AFileSystem")
36+
37+
def isClassPresent(className: String): Boolean = {
38+
try {
39+
Class.forName(className)
40+
true
41+
} catch {
42+
case _: ClassNotFoundException => false
43+
}
44+
}
45+
46+
override def beforeAll(): Unit = {
47+
48+
if (skipAWSTest)
49+
return
50+
51+
container = new MinIOContainer("minio/minio:RELEASE.2023-09-04T19-57-37Z");
52+
container.start()
53+
54+
// create bucket using Minio command line tool
55+
container.execInContainer(
56+
"mc",
57+
"alias",
58+
"set",
59+
"dockerminio",
60+
"http://minio:9000",
61+
container.getUserName,
62+
container.getPassword)
63+
container.execInContainer("mc", "mb", "dockerminio/sample-bucket")
64+
65+
System.setProperty("aws.accessKeyId", container.getUserName)
66+
System.setProperty("aws.secretKey", container.getPassword)
67+
68+
val s3url = container.getS3URL
69+
val augmentedConfiguration = Map(
70+
CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
71+
CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
72+
CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY|S3",
73+
"celeborn.hadoop.fs.s3a.endpoint" -> s"$s3url",
74+
"celeborn.hadoop.fs.s3a.aws.credentials.provider" -> "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
75+
"celeborn.hadoop.fs.s3a.access.key" -> container.getUserName,
76+
"celeborn.hadoop.fs.s3a.secret.key" -> container.getPassword,
77+
"celeborn.hadoop.fs.s3a.path.style.access" -> "true",
78+
CelebornConf.S3_DIR.key -> "s3://sample-bucket/test/celeborn",
79+
CelebornConf.S3_ENDPOINT_REGION.key -> "dummy-region")
80+
81+
setupMiniClusterWithRandomPorts(
82+
masterConf = augmentedConfiguration,
83+
workerConf = augmentedConfiguration,
84+
workerNum = 1)
85+
}
86+
87+
override def beforeEach(): Unit = {
88+
ShuffleClient.reset()
89+
}
90+
91+
override def afterAll(): Unit = {
92+
System.clearProperty("aws.accessKeyId")
93+
System.clearProperty("aws.secretKey")
94+
if (container != null) {
95+
container.close()
96+
super.afterAll()
97+
}
98+
}
99+
100+
override def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = {
101+
val s3url = container.getS3URL
102+
val newConf = sparkConf
103+
.set("spark." + CelebornConf.ACTIVE_STORAGE_TYPES.key, "MEMORY,S3")
104+
.set("spark." + CelebornConf.S3_DIR.key, "s3://sample-bucket/test/celeborn")
105+
.set("spark." + CelebornConf.S3_ENDPOINT_REGION.key, "dummy-region")
106+
.set("spark.celeborn.hadoop.fs.s3a.endpoint", s"$s3url")
107+
.set(
108+
"spark.celeborn.hadoop.fs.s3a.aws.credentials.provider",
109+
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
110+
.set("spark.celeborn.hadoop.fs.s3a.access.key", container.getUserName)
111+
.set("spark.celeborn.hadoop.fs.s3a.secret.key", container.getPassword)
112+
.set("spark.celeborn.hadoop.fs.s3a.path.style.access", "true")
113+
114+
super.updateSparkConf(newConf, mode)
115+
}
116+
117+
test("celeborn spark integration test - s3") {
118+
assume(
119+
!skipAWSTest,
120+
"Skipping test because AWS Hadoop client is not in the classpath (enable with -Paws")
121+
122+
val s3url = container.getS3URL
123+
log.info(s"s3url $s3url");
124+
val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
125+
val celebornSparkSession = SparkSession.builder()
126+
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
127+
.getOrCreate()
128+
groupBy(celebornSparkSession)
129+
130+
celebornSparkSession.stop()
131+
}
132+
133+
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, Disk
4343
import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSource}
4444
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
4545
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo}
46+
import org.apache.celeborn.common.protocol.StorageInfo.Type
4647
import org.apache.celeborn.common.quota.ResourceConsumption
4748
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
4849
import org.apache.celeborn.service.deploy.worker._
@@ -1081,6 +1082,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
10811082
partitionType: PartitionType,
10821083
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
10831084
val suggestedMountPoint = location.getStorageInfo.getMountPoint
1085+
val storageType = location.getStorageInfo.getType
10841086
var retryCount = 0
10851087
var exception: IOException = null
10861088
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
@@ -1102,7 +1104,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
11021104
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
11031105
}
11041106

1105-
if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
1107+
if (storageType == Type.HDFS && location.getStorageInfo.HDFSAvailable()) {
11061108
val shuffleDir =
11071109
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
11081110
FileSystem.mkdirs(
@@ -1120,9 +1122,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
11201122
fileName,
11211123
hdfsFileInfo)
11221124
return (hdfsFlusher.get, hdfsFileInfo, null)
1123-
} else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
1125+
} else if (storageType == Type.S3 && location.getStorageInfo.S3Available()) {
11241126
val shuffleDir =
11251127
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
1128+
logDebug(s"trying to create S3 file at $shuffleDir");
11261129
FileSystem.mkdirs(
11271130
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
11281131
shuffleDir,
@@ -1138,7 +1141,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
11381141
fileName,
11391142
s3FileInfo)
11401143
return (s3Flusher.get, s3FileInfo, null)
1141-
} else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) {
1144+
} else if (storageType == Type.OSS && location.getStorageInfo.OSSAvailable()) {
11421145
val shuffleDir =
11431146
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
11441147
FileSystem.mkdirs(

0 commit comments

Comments
 (0)