Skip to content

Commit bfd6a14

Browse files
authored
Merge pull request #295 from lensesio-dev/public-pr-2017
provide a custom file namer
2 parents 66e3f94 + 9d4e485 commit bfd6a14

File tree

17 files changed

+410
-174
lines changed

17 files changed

+410
-174
lines changed

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.conversion.SinkData
5353
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.StructSinkData
5454
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
5555
import io.lenses.streamreactor.connect.cloud.common.sink.naming.FileNamer
56+
import io.lenses.streamreactor.connect.cloud.common.sink.naming.FileNamerConfig
5657
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
5758
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
5859
import org.apache.avro.generic.GenericRecord
@@ -117,11 +118,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
117118
)
118119

119120
"avro sink" should "write 2 records to avro format in s3" in {
120-
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
121-
identity[String],
122-
AvroFormatSelection.extension,
123-
None,
124-
)))._2
121+
val sink = writerManagerCreator.from(
122+
avroConfig(
123+
new OffsetFileNamer(
124+
FileNamerConfig(
125+
partitionPaddingStrategy = NoOpPaddingStrategy,
126+
offsetPaddingStrategy = NoOpPaddingStrategy,
127+
extension = AvroFormatSelection.extension,
128+
suffix = None,
129+
),
130+
),
131+
),
132+
)._2
125133
firstUsers.zipWithIndex.foreach {
126134
case (struct: Struct, index: Int) =>
127135
val writeRes = sink.write(
@@ -154,11 +162,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
154162
}
155163

156164
"avro sink" should "write 2 records to avro format in s3 and add a suffix to the key generated" in {
157-
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
158-
identity[String],
159-
AvroFormatSelection.extension,
160-
Some("-my-suffix"),
161-
)))._2
165+
val sink = writerManagerCreator.from(
166+
avroConfig(
167+
new OffsetFileNamer(
168+
FileNamerConfig(
169+
partitionPaddingStrategy = NoOpPaddingStrategy,
170+
offsetPaddingStrategy = NoOpPaddingStrategy,
171+
extension = AvroFormatSelection.extension,
172+
suffix = Some("-my-suffix"),
173+
),
174+
),
175+
),
176+
)._2
162177
firstUsers.zipWithIndex.foreach {
163178
case (struct: Struct, index: Int) =>
164179
val writeRes = sink.write(
@@ -191,11 +206,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
191206
}
192207

193208
"avro sink" should "write multiple files and keeping the earliest timestamp" in {
194-
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
195-
identity[String],
196-
AvroFormatSelection.extension,
197-
None,
198-
)))._2
209+
val sink = writerManagerCreator.from(
210+
avroConfig(
211+
new OffsetFileNamer(
212+
FileNamerConfig(
213+
partitionPaddingStrategy = NoOpPaddingStrategy,
214+
offsetPaddingStrategy = NoOpPaddingStrategy,
215+
extension = AvroFormatSelection.extension,
216+
suffix = None,
217+
),
218+
),
219+
),
220+
)._2
199221
firstUsers.zip(List(0 -> 100, 1 -> 99, 2 -> 101, 3 -> 102)).foreach {
200222
case (struct: Struct, (index: Int, timestamp: Int)) =>
201223
val writeRes = sink.write(
@@ -227,11 +249,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
227249
}
228250

229251
"avro sink" should "write BigDecimal" in {
230-
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
231-
identity[String],
232-
AvroFormatSelection.extension,
233-
None,
234-
)))._2
252+
val sink = writerManagerCreator.from(
253+
avroConfig(
254+
new OffsetFileNamer(
255+
FileNamerConfig(
256+
partitionPaddingStrategy = NoOpPaddingStrategy,
257+
offsetPaddingStrategy = NoOpPaddingStrategy,
258+
extension = AvroFormatSelection.extension,
259+
suffix = None,
260+
),
261+
),
262+
),
263+
)._2
235264
val usersWithDecimal1 =
236265
new Struct(UsersSchemaDecimal)
237266
.put("name", "sam")
@@ -307,11 +336,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
307336
new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44),
308337
)
309338

310-
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamer(
311-
identity[String],
312-
AvroFormatSelection.extension,
313-
None,
314-
)))._2
339+
val sink = writerManagerCreator.from(
340+
avroConfig(
341+
new OffsetFileNamer(
342+
FileNamerConfig(
343+
partitionPaddingStrategy = NoOpPaddingStrategy,
344+
offsetPaddingStrategy = NoOpPaddingStrategy,
345+
extension = AvroFormatSelection.extension,
346+
suffix = None,
347+
),
348+
),
349+
),
350+
)._2
315351
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
316352
case (user, index) =>
317353
sink.write(

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.conversion.NullSinkData
5353
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.SinkData
5454
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.StructSinkData
5555
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
56+
import io.lenses.streamreactor.connect.cloud.common.sink.naming.FileNamerConfig
5657
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
5758
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers
5859
import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users
@@ -94,9 +95,12 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
9495
JsonFormatSelection,
9596
defaultPartitionSelection(Values),
9697
new OffsetFileNamer(
97-
identity[String],
98-
JsonFormatSelection.extension,
99-
None,
98+
FileNamerConfig(
99+
partitionPaddingStrategy = NoOpPaddingStrategy,
100+
offsetPaddingStrategy = NoOpPaddingStrategy,
101+
extension = JsonFormatSelection.extension,
102+
suffix = None,
103+
),
100104
),
101105
new PaddingService(Map[String, PaddingStrategy](
102106
"partition" -> NoOpPaddingStrategy,
@@ -164,9 +168,12 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
164168
AvroFormatSelection,
165169
defaultPartitionSelection(Values),
166170
new OffsetFileNamer(
167-
identity[String],
168-
JsonFormatSelection.extension,
169-
None,
171+
FileNamerConfig(
172+
partitionPaddingStrategy = NoOpPaddingStrategy,
173+
offsetPaddingStrategy = NoOpPaddingStrategy,
174+
extension = JsonFormatSelection.extension,
175+
suffix = None,
176+
),
170177
),
171178
new PaddingService(Map[String, PaddingStrategy](
172179
"partition" -> NoOpPaddingStrategy,
@@ -236,9 +243,12 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
236243
AvroFormatSelection,
237244
defaultPartitionSelection(Values),
238245
new OffsetFileNamer(
239-
identity[String],
240-
JsonFormatSelection.extension,
241-
None,
246+
FileNamerConfig(
247+
partitionPaddingStrategy = NoOpPaddingStrategy,
248+
offsetPaddingStrategy = NoOpPaddingStrategy,
249+
extension = JsonFormatSelection.extension,
250+
suffix = None,
251+
),
242252
),
243253
new PaddingService(Map[String, PaddingStrategy](
244254
"partition" -> NoOpPaddingStrategy,
@@ -315,9 +325,12 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
315325
AvroFormatSelection,
316326
defaultPartitionSelection(Values),
317327
new OffsetFileNamer(
318-
identity[String],
319-
JsonFormatSelection.extension,
320-
None,
328+
FileNamerConfig(
329+
partitionPaddingStrategy = NoOpPaddingStrategy,
330+
offsetPaddingStrategy = NoOpPaddingStrategy,
331+
extension = JsonFormatSelection.extension,
332+
suffix = None,
333+
),
321334
),
322335
new PaddingService(Map[String, PaddingStrategy](
323336
"partition" -> NoOpPaddingStrategy,

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.conversion.NullSinkData
5252
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.SinkData
5353
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.StructSinkData
5454
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
55+
import io.lenses.streamreactor.connect.cloud.common.sink.naming.FileNamerConfig
5556
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
5657
import org.apache.avro.generic.GenericRecord
5758
import org.apache.kafka.connect.data.Schema
@@ -90,9 +91,12 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
9091
ParquetFormatSelection,
9192
defaultPartitionSelection(Values),
9293
new OffsetFileNamer(
93-
identity[String],
94-
ParquetFormatSelection.extension,
95-
None,
94+
FileNamerConfig(
95+
partitionPaddingStrategy = NoOpPaddingStrategy,
96+
offsetPaddingStrategy = NoOpPaddingStrategy,
97+
extension = ParquetFormatSelection.extension,
98+
suffix = None,
99+
),
96100
),
97101
new PaddingService(Map[String, PaddingStrategy](
98102
"partition" -> NoOpPaddingStrategy,

kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,13 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
9898
)
9999

100100
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
101-
List(DataStorageSettings(true, true, true, false, false)),
101+
List(DataStorageSettings(envelope = true,
102+
key = true,
103+
value = true,
104+
metadata = false,
105+
headers = false,
106+
customNamerFactory = None,
107+
)),
102108
)
103109

104110
}
@@ -122,7 +128,10 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
122128
)
123129

124130
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
125-
List(DataStorageSettings(true, true, true, false, false), DataStorageSettings(true, true, true, false, true)),
131+
List(
132+
DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false, None),
133+
DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = true, None),
134+
),
126135
)
127136

128137
}
@@ -194,7 +203,13 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
194203
)
195204

196205
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
197-
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)),
206+
List(DataStorageSettings(envelope = true,
207+
key = true,
208+
value = true,
209+
metadata = false,
210+
headers = false,
211+
customNamerFactory = None,
212+
)),
198213
)
199214
}
200215

@@ -204,7 +219,13 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
204219
)
205220

206221
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
207-
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)),
222+
List(DataStorageSettings(envelope = true,
223+
key = true,
224+
value = true,
225+
metadata = false,
226+
headers = false,
227+
customNamerFactory = None,
228+
)),
208229
)
209230
}
210231

@@ -214,7 +235,13 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
214235
)
215236

216237
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
217-
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = true, headers = true)),
238+
List(DataStorageSettings(envelope = true,
239+
key = true,
240+
value = true,
241+
metadata = true,
242+
headers = true,
243+
customNamerFactory = None,
244+
)),
218245
)
219246
}
220247

0 commit comments

Comments
 (0)