@@ -11,10 +11,12 @@ import com.google.common.collect.Sets
1111import dev.failsafe.Failsafe
1212import dev.failsafe.RetryPolicy
1313import dev.failsafe.function.CheckedRunnable
14+ import io.airbyte.api.client.model.generated.ConnectionEventsRequestBody
1415import io.airbyte.api.client.model.generated.ConnectionScheduleData
1516import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule
1617import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody
1718import io.airbyte.api.client.model.generated.DestinationSyncMode
19+ import io.airbyte.api.client.model.generated.JobIdRequestBody
1820import io.airbyte.api.client.model.generated.JobRead
1921import io.airbyte.api.client.model.generated.JobStatus
2022import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody
@@ -24,6 +26,7 @@ import io.airbyte.api.client.model.generated.SyncMode
2426import io.airbyte.api.client.model.generated.WorkspaceCreate
2527import io.airbyte.commons.DEFAULT_ORGANIZATION_ID
2628import io.airbyte.commons.json.Jsons
29+ import io.airbyte.featureflag.tests.TestFlagsSetter
2730import io.airbyte.test.utils.AcceptanceTestHarness
2831import io.airbyte.test.utils.AcceptanceTestUtils
2932import io.airbyte.test.utils.AcceptanceTestUtils.createAirbyteApiClient
@@ -131,6 +134,55 @@ class AcceptanceTestsResources {
131134
132135 LOGGER .info(STATE_AFTER_SYNC_ONE , testHarness.getConnectionState(connectionId))
133136
137+ // postgres_init.sql inserts 5 records. Assert that we wrote stats correctly.
138+ // (this is a bit sketchy, in that theoretically the source could emit a state message,
139+ // then fail an attempt, and a subsequent attempt would then not read all the records.
140+ // But with just 5 records, that seems unlikely.)
141+ val lastAttempt =
142+ testHarness
143+ .getJobInfoRead(connectionSyncRead1.job.id)
144+ .attempts
145+ .last()
146+ .attempt
147+ testHarness.apiClient.jobsApi.getJobDebugInfo(JobIdRequestBody (connectionSyncRead1.job.id))
148+ Assertions .assertAll(
149+ " totalStats were incorrect" ,
150+ { Assertions .assertEquals(5 , lastAttempt.totalStats!! .recordsEmitted, " totalStats.recordsEmitted was incorrect" ) },
151+ { Assertions .assertEquals(118 , lastAttempt.totalStats!! .bytesEmitted, " totalStats.bytesEmitted was incorrect" ) },
152+ { Assertions .assertEquals(1 , lastAttempt.totalStats!! .stateMessagesEmitted, " totalStats.stateMessagesEmitted was incorrect" ) },
153+ // the API doesn't return records/bytes committed on totalStats, so don't assert against them
154+ // { Assertions.assertEquals(5, lastAttempt.totalStats!!.recordsCommitted, "totalStats.recordsCommitted was incorrect") },
155+ // { Assertions.assertEquals(118, lastAttempt.totalStats!!.bytesCommitted, "totalStats.bytesCommitted was incorrect") },
156+ )
157+ Assertions .assertEquals(1 , lastAttempt.streamStats!! .size, " Expected to see stats for exactly one stream. Got ${lastAttempt.streamStats} " )
158+ val lastAttemptStreamStats = lastAttempt.streamStats!! .first()
159+ Assertions .assertAll(
160+ " streamStats were incorrect" ,
161+ { Assertions .assertEquals(" id_and_name" , lastAttemptStreamStats.streamName) },
162+ { Assertions .assertNull(lastAttemptStreamStats.streamNamespace) },
163+ )
164+ Assertions .assertAll(
165+ " streamStats were incorrect" ,
166+ { Assertions .assertEquals(5 , lastAttemptStreamStats.stats.recordsEmitted, " streamStats.recordsEmitted was incorrect" ) },
167+ { Assertions .assertEquals(118 , lastAttemptStreamStats.stats.bytesEmitted, " streamStats.bytesEmitted was incorrect" ) },
168+ { Assertions .assertEquals(5 , lastAttemptStreamStats.stats.recordsCommitted, " streamStats.recordsCommitted was incorrect" ) },
169+ // the API doesn't return stateMessagesEmitted / bytesCommitted on streamStats, so don't assert against them
170+ // { Assertions.assertEquals(1, lastAttemptStreamStats.stats.stateMessagesEmitted, "streamStats.stateMessagesEmitted was incorrect") },
171+ // { Assertions.assertEquals(118, lastAttemptStreamStats.stats.bytesCommitted, "streamStats.bytesCommitted was incorrect") },
172+ )
173+ // this was the only way I found to get to bytesLoaded (conceptually equivalent to bytesCommitted)
174+ val lastConnectionEventSummary =
175+ testHarness
176+ .apiClient
177+ .connectionApi
178+ .listConnectionEvents(ConnectionEventsRequestBody (connectionId))
179+ .events
180+ .first()
181+ // summary is declared as Any, so we need to explicitly cast here.
182+ .summary as Map <String , Int >
183+ // would you believe "bytesLoaded" isn't declared as a constant anywhere?
184+ Assertions .assertEquals(118 , lastConnectionEventSummary[" bytesLoaded" ])
185+
134186 val src = testHarness.getSourceDatabase()
135187 val dst = testHarness.getDestinationDatabase()
136188 assertSourceAndDestinationDbRawRecordsInSync(
@@ -348,6 +400,7 @@ class AcceptanceTestsResources {
348400 AcceptanceTestUtils .getAirbyteApiUrl(),
349401 Map .of(GATEWAY_AUTH_HEADER , CLOUD_API_USER_HEADER_VALUE ),
350402 )
403+ val testFlagsSetter = TestFlagsSetter (AIRBYTE_SERVER_HOST )
351404
352405 // If a workspace id is passed, use that. Otherwise, create a new workspace.
353406 // NOTE: we want to sometimes use a pre-configured workspace e.g., if we run against a production
@@ -393,7 +446,7 @@ class AcceptanceTestsResources {
393446 LOGGER .info(" pg source definition: {}" , sourceDef.dockerImageTag)
394447 LOGGER .info(" pg destination definition: {}" , destinationDef.dockerImageTag)
395448
396- testHarness = AcceptanceTestHarness (apiClient = airbyteApiClient, defaultWorkspaceId = workspaceId)
449+ testHarness = AcceptanceTestHarness (apiClient = airbyteApiClient, defaultWorkspaceId = workspaceId, testFlagsSetter = testFlagsSetter )
397450
398451 testHarness.ensureCleanSlate()
399452 }
@@ -424,6 +477,7 @@ class AcceptanceTestsResources {
424477 // NOTE: this is just a base64 encoding of a jwt representing a test user in some deployments.
425478 const val CLOUD_API_USER_HEADER_VALUE : String = " eyJ1c2VyX2lkIjogImNsb3VkLWFwaSIsICJlbWFpbF92ZXJpZmllZCI6ICJ0cnVlIn0K"
426479 const val AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID : String = " AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID"
480+ val AIRBYTE_SERVER_HOST : String = Optional .ofNullable(System .getenv(" AIRBYTE_SERVER_HOST" )).orElse(" http://localhost:8001" )
427481 val POSTGRES_SOURCE_DEF_ID : UUID = UUID .fromString(" decd338e-5647-4c0b-adf4-da0e75f5a750" )
428482 val POSTGRES_DEST_DEF_ID : UUID = UUID .fromString(" 25c5221d-dce2-4163-ade9-739ef790f503" )
429483 const val KUBE : String = " KUBE"
0 commit comments