@@ -11,18 +11,20 @@ class SourceStorageFactory(
11
11
private val tempPath : Path ,
12
12
) {
13
13
private val s3SourceClient: MinioClient ? = if (resourceConfig.sourceType == ResourceType .S3 ) {
14
- requireNotNull(resourceConfig.s3).createS3Client()
14
+ requireNotNull(resourceConfig.s3) { " Missing S3 configuration" }
15
+ .createS3Client()
15
16
} else null
16
17
17
18
private val azureSourceClient: BlobServiceClient ? =
18
19
if (resourceConfig.sourceType == ResourceType .AZURE ) {
19
- requireNotNull(resourceConfig.azure).createAzureClient()
20
+ requireNotNull(resourceConfig.azure) { " Missing Azure configuration" }
21
+ .createAzureClient()
20
22
} else null
21
23
22
24
fun createSourceStorage () = when (resourceConfig.sourceType) {
23
25
ResourceType .S3 -> {
24
- val s3Config = requireNotNull(resourceConfig.s3)
25
- val minioClient = requireNotNull(s3SourceClient)
26
+ val s3Config = requireNotNull(resourceConfig.s3) { " Missing S3 configuration for source storage " }
27
+ val minioClient = requireNotNull(s3SourceClient) { " Missing S3 client configuration for source storage " }
26
28
S3SourceStorage (minioClient, s3Config, tempPath)
27
29
}
28
30
ResourceType .HDFS -> {
@@ -34,8 +36,8 @@ class SourceStorageFactory(
34
36
createSourceStorage.invoke(factory) as SourceStorage
35
37
}
36
38
ResourceType .AZURE -> {
37
- val azureClient = requireNotNull(azureSourceClient)
38
- val azureConfig = requireNotNull(resourceConfig.azure)
39
+ val azureClient = requireNotNull(azureSourceClient) { " Missing Azure client configuration for source storage " }
40
+ val azureConfig = requireNotNull(resourceConfig.azure) { " Missing Azure configuration for source storage " }
39
41
AzureSourceStorage (azureClient, azureConfig, tempPath)
40
42
}
41
43
else -> throw IllegalStateException (" Cannot create kafka storage for type ${resourceConfig.sourceType} " )
0 commit comments