44
55package io.airbyte.test.acceptance
66
7+ import com.fasterxml.jackson.databind.JsonNode
8+ import com.fasterxml.jackson.databind.node.ObjectNode
79import com.google.common.collect.ImmutableMap
810import dev.failsafe.Failsafe
911import dev.failsafe.RetryPolicy
1012import dev.failsafe.function.CheckedSupplier
1113import io.airbyte.api.client.model.generated.CheckConnectionRead
14+ import io.airbyte.api.client.model.generated.ConfiguredStreamMapper
1215import io.airbyte.api.client.model.generated.ConnectionScheduleData
1316import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron
1417import io.airbyte.api.client.model.generated.ConnectionScheduleType
1518import io.airbyte.api.client.model.generated.DestinationSyncMode
1619import io.airbyte.api.client.model.generated.JobStatus
20+ import io.airbyte.api.client.model.generated.SelectedFieldInfo
21+ import io.airbyte.api.client.model.generated.StreamMapperType
1722import io.airbyte.api.client.model.generated.StreamStatusJobType
1823import io.airbyte.api.client.model.generated.StreamStatusRunState
1924import io.airbyte.api.client.model.generated.SyncMode
@@ -26,6 +31,7 @@ import io.airbyte.test.utils.AcceptanceTestUtils.IS_GKE
2631import io.airbyte.test.utils.AcceptanceTestUtils.modifyCatalog
2732import io.airbyte.test.utils.Asserts.assertSourceAndDestinationDbRawRecordsInSync
2833import io.airbyte.test.utils.Asserts.assertStreamStatuses
34+ import io.airbyte.test.utils.Databases
2935import io.airbyte.test.utils.Databases.listAllTables
3036import io.airbyte.test.utils.Databases.retrieveRecordsFromDatabase
3137import io.airbyte.test.utils.TestConnectionCreate
@@ -40,10 +46,12 @@ import org.junit.jupiter.api.parallel.ExecutionMode
4046import org.slf4j.Logger
4147import org.slf4j.LoggerFactory
4248import java.io.IOException
49+ import java.security.KeyPairGenerator
4350import java.time.Duration
4451import java.util.Optional
4552import java.util.Set
4653import java.util.UUID
54+ import javax.crypto.Cipher
4755
4856// TODO switch all the tests back to normal (i.e. non-parameterized) after the sync workflow v2 rollout
4957
@@ -236,8 +244,13 @@ internal abstract class SyncAcceptanceTests(
236244 }
237245 }
238246
247+ /* *
248+ * This test also exercises column selection and mappers.
249+ */
239250 @Test
240251 @Throws(Exception ::class )
252+ // Needed for `keyPair.public.encoded.toHexString()`
253+ @OptIn(ExperimentalStdlibApi ::class )
241254 fun testCronSync () {
242255 testHarness.withFlag(UseSyncV2 , Workspace (workspaceId), value = useV2).use {
243256 val sourceId = testHarness.createPostgresSource().sourceId
@@ -252,20 +265,77 @@ internal abstract class SyncAcceptanceTests(
252265 )
253266 val srcSyncMode = SyncMode .FULL_REFRESH
254267 val dstSyncMode = DestinationSyncMode .OVERWRITE
268+
269+ val keyPair =
270+ KeyPairGenerator
271+ .getInstance(" RSA" )
272+ .also { it.initialize(2048 ) }
273+ .generateKeyPair()
274+
255275 val catalog =
256276 modifyCatalog(
257- discoverResult.catalog,
258- Optional .of(srcSyncMode),
259- Optional .of(dstSyncMode),
260- Optional .empty(),
261- Optional .empty(),
262- Optional .of(true ),
263- Optional .empty(),
264- Optional .empty(),
265- Optional .empty(),
266- Optional .empty(),
267- Optional .empty(),
268- Optional .empty(),
277+ originalCatalog = discoverResult.catalog,
278+ replacementSourceSyncMode = Optional .of(srcSyncMode),
279+ replacementDestinationSyncMode = Optional .of(dstSyncMode),
280+ replacementSelected = Optional .of(true ),
281+ replacementFieldSelectionEnabled = Optional .of(true ),
282+ // Remove the `id` field, keep the `name` field
283+ replacementSelectedFields = Optional .of(listOf (SelectedFieldInfo (listOf (" name" )))),
284+ mappers =
285+ listOf (
286+ // Drop all records except "sherif"
287+ ConfiguredStreamMapper (
288+ StreamMapperType .ROW_MINUS_FILTERING ,
289+ Jsons .deserialize(
290+ """
291+ {
292+ "conditions": {
293+ "comparisonValue": "sherif",
294+ "fieldName": "name",
295+ "type": "EQUAL"
296+ }
297+ }
298+ """ .trimIndent(),
299+ ),
300+ ),
301+ // run a sequence of mappers against the `name` field
302+ ConfiguredStreamMapper (
303+ StreamMapperType .FIELD_MINUS_RENAMING ,
304+ Jsons .deserialize(
305+ """
306+ {
307+ "newFieldName": "name_renamed",
308+ "originalFieldName": "name"
309+ }
310+ """ .trimIndent(),
311+ ),
312+ ),
313+ ConfiguredStreamMapper (
314+ StreamMapperType .HASHING ,
315+ Jsons .deserialize(
316+ """
317+ {
318+ "method": "SHA-256",
319+ "targetField": "name_renamed",
320+ "fieldNameSuffix": "_hashed"
321+ }
322+ """ .trimIndent(),
323+ ),
324+ ),
325+ ConfiguredStreamMapper (
326+ StreamMapperType .ENCRYPTION ,
327+ Jsons .deserialize(
328+ """
329+ {
330+ "algorithm": "RSA",
331+ "fieldNameSuffix": "_encrypted",
332+ "publicKey": "${keyPair.public.encoded.toHexString()} ",
333+ "targetField": "name_renamed_hashed"
334+ }
335+ """ .trimIndent(),
336+ ),
337+ ),
338+ ),
269339 )
270340 val conn =
271341 testHarness.createConnection(
@@ -300,16 +370,37 @@ internal abstract class SyncAcceptanceTests(
300370 ),
301371 ).withMaxRetries(AcceptanceTestsResources .MAX_TRIES )
302372 .build(),
303- ).get<String >(
304- CheckedSupplier <String > {
305- assertSourceAndDestinationDbRawRecordsInSync(
306- testHarness.getSourceDatabase(),
307- testHarness.getDestinationDatabase(),
308- AcceptanceTestHarness .PUBLIC_SCHEMA_NAME ,
309- conn.namespaceFormat!! ,
310- false ,
311- AcceptanceTestsResources .WITHOUT_SCD_TABLE ,
373+ ).get(
374+ CheckedSupplier {
375+ // Can't use any of the utility assertions, because RSA encryption is nondeterministic.
376+ // So we'll do this manually.
377+ val destinationRecords: List <JsonNode > =
378+ Databases .retrieveRawDestinationRecords(
379+ testHarness.getDestinationDatabase(),
380+ conn.namespaceFormat!! ,
381+ AcceptanceTestHarness .STREAM_NAME ,
382+ )
383+
384+ Assertions .assertEquals(1 , destinationRecords.size, " Expected to see exactly one record, got $destinationRecords " )
385+ val onlyRecord = destinationRecords.first() as ObjectNode
386+ Assertions .assertEquals(
387+ listOf (" name_renamed_hashed_encrypted" ),
388+ onlyRecord.fieldNames().asSequence().toList(),
389+ " Expected record to contain a single field `name_renamed_hashed_encrypted`, got $onlyRecord " ,
312390 )
391+ val encryptedBytes = onlyRecord[" name_renamed_hashed_encrypted" ].textValue().hexToByteArray()
392+ val decrypted =
393+ Cipher
394+ .getInstance(" RSA" )
395+ .also { it.init (Cipher .DECRYPT_MODE , keyPair.private) }
396+ .doFinal(encryptedBytes)
397+ .toString(Charsets .UTF_8 )
398+ Assertions .assertEquals(
399+ " 1ba0292c60f8c80a467157c332f641de05256388dff757bdb773987a39ac35e0" ,
400+ decrypted,
401+ """ Expected decrypted value to equal sha256("sherif")""" ,
402+ )
403+
313404 " success" // If the assertion throws after all the retries, then retryWithJitter will return null.
314405 },
315406 )
0 commit comments