Skip to content

Commit f4cfb4b

Browse files
Bulk Load CDK: Staging refactor + tests; multi-sync ITs correctly wait for ack (#48608)
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
1 parent 86a0d52 commit f4cfb4b

File tree

28 files changed

+615
-319
lines changed

28 files changed

+615
-319
lines changed

airbyte-cdk/bulk/core/base/src/main/resources/application.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,5 @@ airbyte:
88
flush:
99
rate-ms: 900000 # 15 minutes
1010
window-ms: 900000 # 15 minutes
11+
destination:
12+
record-batch-size: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE:209715200}

airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
99
import io.airbyte.cdk.load.test.util.NoopNameMapper
1010
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
1111
import io.airbyte.cdk.load.write.Untyped
12-
import org.junit.jupiter.api.Disabled
1312
import org.junit.jupiter.api.Test
1413

1514
class MockBasicFunctionalityIntegrationTest :
@@ -34,7 +33,6 @@ class MockBasicFunctionalityIntegrationTest :
3433
}
3534

3635
@Test
37-
@Disabled
3836
override fun testMidSyncCheckpointingStreamState() {
3937
super.testMidSyncCheckpointingStreamState()
4038
}

airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ import io.airbyte.cdk.load.test.util.DestinationDataDumper
1313
import io.airbyte.cdk.load.test.util.OutputRecord
1414
import io.airbyte.cdk.load.test.util.RecordDiffer
1515
import java.util.concurrent.ConcurrentHashMap
16+
import java.util.concurrent.ConcurrentLinkedQueue
1617

1718
object MockDestinationBackend {
18-
private val files: MutableMap<String, MutableList<OutputRecord>> = ConcurrentHashMap()
19+
private val files: ConcurrentHashMap<String, ConcurrentLinkedQueue<OutputRecord>> =
20+
ConcurrentHashMap()
1921

2022
fun insert(filename: String, vararg records: OutputRecord) {
2123
getFile(filename).addAll(records)
@@ -96,7 +98,7 @@ object MockDestinationBackend {
9698
}
9799

98100
fun readFile(filename: String): List<OutputRecord> {
99-
return getFile(filename)
101+
return getFile(filename).toList()
100102
}
101103

102104
fun deleteOldRecords(filename: String, minGenerationId: Long) {
@@ -105,8 +107,8 @@ object MockDestinationBackend {
105107
}
106108
}
107109

108-
private fun getFile(filename: String): MutableList<OutputRecord> {
109-
return files.getOrPut(filename) { mutableListOf() }
110+
private fun getFile(filename: String): ConcurrentLinkedQueue<OutputRecord> {
111+
return files.getOrPut(filename) { ConcurrentLinkedQueue() }
110112
}
111113
}
112114

airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationConfiguration.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import io.micronaut.context.annotation.Factory
1111
import jakarta.inject.Singleton
1212

1313
class MockDestinationConfiguration : DestinationConfiguration() {
14-
// override to 10KB instead of 200MB
15-
override val recordBatchSizeBytes = 10 * 1024L
14+
// Micro-batch for testing.
15+
override val recordBatchSizeBytes = 1L
1616
}
1717

1818
@Singleton

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/TimeProvider.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@ import io.micronaut.context.annotation.Secondary
88
import jakarta.inject.Singleton
99

1010
interface TimeProvider {
11+
fun syncTimeMillis(): Long
1112
fun currentTimeMillis(): Long
1213
suspend fun delay(ms: Long)
1314
}
1415

1516
@Singleton
1617
@Secondary
1718
class DefaultTimeProvider : TimeProvider {
19+
private val creationTime = System.currentTimeMillis()
20+
21+
override fun syncTimeMillis(): Long = creationTime
22+
1823
override fun currentTimeMillis(): Long {
1924
return System.currentTimeMillis()
2025
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.micronaut.context.annotation.Secondary
1313
import jakarta.inject.Singleton
1414
import java.util.concurrent.ConcurrentHashMap
1515
import kotlinx.coroutines.CompletableDeferred
16+
import kotlinx.coroutines.withTimeoutOrNull
1617

1718
sealed interface SyncResult
1819

@@ -79,7 +80,8 @@ class DefaultSyncManager(
7980
stream: DestinationStream.Descriptor
8081
): StreamLoader? {
8182
val completable = streamLoaders[stream]
82-
return completable?.let { if (it.isCompleted) it.await() else null }
83+
// `.isCompleted` does not work as expected here.
84+
return completable?.let { withTimeoutOrNull(1000L) { it.await() } }
8385
}
8486

8587
override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean {

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskExceptionHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ T : ScopedTask {
130130
log.warn { "Stream task $innerTask was cancelled." }
131131
throw e
132132
} catch (e: Exception) {
133-
log.error { "Caught exception in sync task $innerTask: $e" }
133+
log.error { "Caught exception in stream task $innerTask: $e" }
134134
handleStreamFailure(stream, e)
135135
}
136136
}

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.test.util
66

77
import java.time.OffsetDateTime
8+
import kotlin.test.assertEquals
89
import org.junit.jupiter.api.Assertions
910
import org.junit.jupiter.api.Test
1011

@@ -118,4 +119,118 @@ class RecordDifferTest {
118119
diff
119120
)
120121
}
122+
123+
/** Verify that the differ can sort records which are identical other than the PK */
124+
@Test
125+
fun testSortPk() {
126+
val diff =
127+
RecordDiffer(primaryKey = listOf(listOf("id")), cursor = null)
128+
.diffRecords(
129+
listOf(
130+
OutputRecord(
131+
extractedAt = 0,
132+
generationId = 0,
133+
data = mapOf("id" to 1, "name" to "foo"),
134+
airbyteMeta = null,
135+
),
136+
OutputRecord(
137+
extractedAt = 0,
138+
generationId = 0,
139+
data = mapOf("id" to 2, "name" to "bar"),
140+
airbyteMeta = null,
141+
),
142+
),
143+
listOf(
144+
OutputRecord(
145+
extractedAt = 0,
146+
generationId = 0,
147+
data = mapOf("id" to 2, "name" to "bar"),
148+
airbyteMeta = null,
149+
),
150+
OutputRecord(
151+
extractedAt = 0,
152+
generationId = 0,
153+
data = mapOf("id" to 1, "name" to "foo"),
154+
airbyteMeta = null,
155+
),
156+
)
157+
)
158+
assertEquals(null, diff)
159+
}
160+
161+
/** Verify that the differ can sort records which are identical other than the cursor */
162+
@Test
163+
fun testSortCursor() {
164+
val diff =
165+
RecordDiffer(primaryKey = listOf(listOf("id")), cursor = listOf("updated_at"))
166+
.diffRecords(
167+
listOf(
168+
OutputRecord(
169+
extractedAt = 0,
170+
generationId = 0,
171+
data = mapOf("id" to 1, "name" to "foo", "updated_at" to 1),
172+
airbyteMeta = null,
173+
),
174+
OutputRecord(
175+
extractedAt = 0,
176+
generationId = 0,
177+
data = mapOf("id" to 1, "name" to "bar", "updated_at" to 2),
178+
airbyteMeta = null,
179+
),
180+
),
181+
listOf(
182+
OutputRecord(
183+
extractedAt = 0,
184+
generationId = 0,
185+
data = mapOf("id" to 1, "name" to "bar", "updated_at" to 2),
186+
airbyteMeta = null,
187+
),
188+
OutputRecord(
189+
extractedAt = 0,
190+
generationId = 0,
191+
data = mapOf("id" to 1, "name" to "foo", "updated_at" to 1),
192+
airbyteMeta = null,
193+
),
194+
),
195+
)
196+
assertEquals(null, diff)
197+
}
198+
199+
/** Verify that the differ can sort records which are identical other than extractedAt */
200+
@Test
201+
fun testSortExtractedAt() {
202+
val diff =
203+
RecordDiffer(primaryKey = listOf(listOf("id")), cursor = null)
204+
.diffRecords(
205+
listOf(
206+
OutputRecord(
207+
extractedAt = 0,
208+
generationId = 0,
209+
data = mapOf("id" to 1, "name" to "foo"),
210+
airbyteMeta = null,
211+
),
212+
OutputRecord(
213+
extractedAt = 1,
214+
generationId = 0,
215+
data = mapOf("id" to 1, "name" to "bar"),
216+
airbyteMeta = null,
217+
),
218+
),
219+
listOf(
220+
OutputRecord(
221+
extractedAt = 1,
222+
generationId = 0,
223+
data = mapOf("id" to 1, "name" to "bar"),
224+
airbyteMeta = null,
225+
),
226+
OutputRecord(
227+
extractedAt = 0,
228+
generationId = 0,
229+
data = mapOf("id" to 1, "name" to "foo"),
230+
airbyteMeta = null,
231+
),
232+
)
233+
)
234+
assertEquals(null, diff)
235+
}
121236
}

airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/file/MockTimeProvider.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,26 @@ import java.util.concurrent.atomic.AtomicLong
1313
@Primary
1414
@Requires(env = ["MockTimeProvider"])
1515
open class MockTimeProvider : TimeProvider {
16+
private var syncTime = AtomicLong(0)
1617
private var currentTime = AtomicLong(0)
1718

18-
override fun currentTimeMillis(): Long {
19-
return currentTime.get()
20-
}
21-
2219
fun setCurrentTime(currentTime: Long) {
2320
this.currentTime.set(currentTime)
2421
}
2522

23+
fun setSyncTime(currentTime: Long) {
24+
this.syncTime.set(currentTime)
25+
}
26+
27+
override fun currentTimeMillis(): Long {
28+
return currentTime.get()
29+
}
30+
2631
override suspend fun delay(ms: Long) {
2732
currentTime.addAndGet(ms)
2833
}
34+
35+
override fun syncTimeMillis(): Long {
36+
return syncTime.get()
37+
}
2938
}

airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ import io.airbyte.cdk.command.ConfigurationSpecification
99
import io.airbyte.cdk.load.command.DestinationCatalog
1010
import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.message.DestinationMessage
12+
import io.airbyte.cdk.load.message.DestinationRecord
1213
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
14+
import io.airbyte.cdk.load.message.StreamCheckpoint
1315
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
16+
import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException
17+
import io.airbyte.cdk.load.test.util.destination_process.NonDockerizedDestination
1418
import io.airbyte.protocol.models.v0.AirbyteMessage
19+
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1520
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
1621
import java.time.Instant
1722
import java.time.LocalDateTime
@@ -20,13 +25,15 @@ import java.time.format.DateTimeFormatter
2025
import java.util.concurrent.atomic.AtomicBoolean
2126
import kotlin.test.fail
2227
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.async
2329
import kotlinx.coroutines.launch
2430
import kotlinx.coroutines.runBlocking
2531
import org.apache.commons.lang3.RandomStringUtils
2632
import org.junit.jupiter.api.AfterEach
2733
import org.junit.jupiter.api.BeforeAll
2834
import org.junit.jupiter.api.BeforeEach
2935
import org.junit.jupiter.api.TestInfo
36+
import org.junit.jupiter.api.assertThrows
3037
import org.junit.jupiter.api.extension.ExtendWith
3138
import org.junit.jupiter.api.parallel.Execution
3239
import org.junit.jupiter.api.parallel.ExecutionMode
@@ -91,6 +98,7 @@ abstract class IntegrationTest(
9198
primaryKey: List<List<String>>,
9299
cursor: List<String>?,
93100
reason: String? = null,
101+
allowUnexpectedRecord: Boolean = false,
94102
) {
95103
val actualRecords: List<OutputRecord> = dataDumper.dumpRecords(config, stream)
96104
val expectedRecords: List<OutputRecord> =
@@ -100,6 +108,7 @@ abstract class IntegrationTest(
100108
primaryKey = primaryKey.map { nameMapper.mapFieldName(it) },
101109
cursor = cursor?.let { nameMapper.mapFieldName(it) },
102110
nullEqualsUnset = nullEqualsUnset,
111+
allowUnexpectedRecord = allowUnexpectedRecord,
103112
)
104113
.diffRecords(expectedRecords, actualRecords)
105114
?.let {
@@ -175,6 +184,69 @@ abstract class IntegrationTest(
175184
}
176185
}
177186

187+
/**
188+
* Run a sync until it acknowledges the given state message, then kill the sync. This method is
189+
* useful for tests that want to verify recovery-from-failure cases, e.g. truncate refresh
190+
* behaviors.
191+
*
192+
* A common pattern is to call [runSyncUntilStateAck], and then call `dumpAndDiffRecords(...,
193+
* allowUnexpectedRecord = true)` to verify that [records] were written to the destination.
194+
*/
195+
fun runSyncUntilStateAck(
196+
configContents: String,
197+
stream: DestinationStream,
198+
records: List<DestinationRecord>,
199+
inputStateMessage: StreamCheckpoint,
200+
allowGracefulShutdown: Boolean,
201+
): AirbyteStateMessage {
202+
val destination =
203+
destinationProcessFactory.createDestinationProcess(
204+
"write",
205+
configContents,
206+
DestinationCatalog(listOf(stream)).asProtocolObject(),
207+
)
208+
return runBlocking(Dispatchers.IO) {
209+
launch {
210+
// expect an exception. we're sending a stream incomplete or killing the
211+
// destination, so it's expected to crash
212+
// TODO: This is a hack, not sure what's going on
213+
if (destination is NonDockerizedDestination) {
214+
assertThrows<DestinationUncleanExitException> { destination.run() }
215+
} else {
216+
destination.run()
217+
}
218+
}
219+
records.forEach { destination.sendMessage(it.asProtocolMessage()) }
220+
destination.sendMessage(inputStateMessage.asProtocolMessage())
221+
222+
val deferred = async {
223+
val outputStateMessage: AirbyteStateMessage
224+
while (true) {
225+
destination.sendMessage("")
226+
val returnedMessages = destination.readMessages()
227+
if (returnedMessages.any { it.type == AirbyteMessage.Type.STATE }) {
228+
outputStateMessage =
229+
returnedMessages
230+
.filter { it.type == AirbyteMessage.Type.STATE }
231+
.map { it.state }
232+
.first()
233+
break
234+
}
235+
}
236+
outputStateMessage
237+
}
238+
val outputStateMessage = deferred.await()
239+
if (allowGracefulShutdown) {
240+
destination.sendMessage("{\"unparseable")
241+
destination.shutdown()
242+
} else {
243+
destination.kill()
244+
}
245+
246+
outputStateMessage
247+
}
248+
}
249+
178250
companion object {
179251
private val hasRunCleaner = AtomicBoolean(false)
180252

0 commit comments

Comments
 (0)