@@ -42,6 +42,7 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
4242import io.airbyte.cdk.load.data.UnionType
4343import io.airbyte.cdk.load.data.UnknownType
4444import io.airbyte.cdk.load.data.json.toAirbyteValue
45+ import io.airbyte.cdk.load.dataflow.stats.ObservabilityMetrics
4546import io.airbyte.cdk.load.message.CheckpointMessage
4647import io.airbyte.cdk.load.message.InputGlobalCheckpoint
4748import io.airbyte.cdk.load.message.InputRecord
@@ -67,6 +68,7 @@ import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitE
6768import io.airbyte.cdk.load.util.Jsons
6869import io.airbyte.cdk.load.util.deserializeToNode
6970import io.airbyte.cdk.load.util.serializeToString
71+ import io.airbyte.protocol.models.v0.AdditionalStats
7072import io.airbyte.protocol.models.v0.AirbyteMessage
7173import io.airbyte.protocol.models.v0.AirbyteRecordMessageFileReference
7274import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
@@ -341,6 +343,7 @@ abstract class BasicFunctionalityIntegrationTest(
341343 dataChannelMedium : DataChannelMedium = DataChannelMedium .STDIO ,
342344 dataChannelFormat : DataChannelFormat = DataChannelFormat .JSONL ,
343345 val testSpeedModeStatsEmission : Boolean = true ,
346+ val includesAdditionalStats : Boolean = true ,
344347) :
345348 IntegrationTest (
346349 additionalMicronautEnvs = additionalMicronautEnvs,
@@ -594,9 +597,10 @@ abstract class BasicFunctionalityIntegrationTest(
594597 ),
595598 outer.additionalProperties,
596599 )
597-
598600 assertEquals(
599- AirbyteStateStats ().withRecordCount(1.0 ),
601+ AirbyteStateStats ()
602+ .withRecordCount(1.0 )
603+ .withAdditionalStats(expectedAdditionalStats()),
600604 outer.destinationStats,
601605 )
602606
@@ -632,7 +636,9 @@ abstract class BasicFunctionalityIntegrationTest(
632636 )
633637
634638 assertEquals(
635- AirbyteStateStats ().withRecordCount(2.0 ),
639+ AirbyteStateStats ()
640+ .withRecordCount(2.0 )
641+ .withAdditionalStats(expectedAdditionalStats()),
636642 outer.destinationStats,
637643 )
638644
@@ -860,7 +866,7 @@ abstract class BasicFunctionalityIntegrationTest(
860866 stateMessagesFromFirstStream[0 ].sourceStats,
861867 )
862868 assertEquals(
863- AirbyteStateStats ().withRecordCount(1.0 ),
869+ AirbyteStateStats ().withRecordCount(1.0 ).withAdditionalStats(expectedAdditionalStats()) ,
864870 stateMessagesFromFirstStream[0 ].destinationStats,
865871 )
866872 assertEquals(
@@ -883,7 +889,7 @@ abstract class BasicFunctionalityIntegrationTest(
883889 stateMessagesFromFirstStream[1 ].sourceStats,
884890 )
885891 assertEquals(
886- AirbyteStateStats ().withRecordCount(1.0 ),
892+ AirbyteStateStats ().withRecordCount(1.0 ).withAdditionalStats(expectedAdditionalStats()) ,
887893 stateMessagesFromFirstStream[1 ].destinationStats,
888894 )
889895 assertEquals(
@@ -912,7 +918,7 @@ abstract class BasicFunctionalityIntegrationTest(
912918 stateMessagesFromSecondStream[0 ].sourceStats,
913919 )
914920 assertEquals(
915- AirbyteStateStats ().withRecordCount(2.0 ),
921+ AirbyteStateStats ().withRecordCount(2.0 ).withAdditionalStats(expectedAdditionalStats()) ,
916922 stateMessagesFromSecondStream[0 ].destinationStats,
917923 )
918924 assertEquals(
@@ -935,7 +941,7 @@ abstract class BasicFunctionalityIntegrationTest(
935941 stateMessagesFromSecondStream[1 ].sourceStats,
936942 )
937943 assertEquals(
938- AirbyteStateStats ().withRecordCount(1.0 ),
944+ AirbyteStateStats ().withRecordCount(1.0 ).withAdditionalStats(expectedAdditionalStats()) ,
939945 stateMessagesFromSecondStream[1 ].destinationStats,
940946 )
941947 assertEquals(
@@ -964,7 +970,7 @@ abstract class BasicFunctionalityIntegrationTest(
964970 stateMessagesFromThirdStream[0 ].sourceStats,
965971 )
966972 assertEquals(
967- AirbyteStateStats ().withRecordCount(1.0 ),
973+ AirbyteStateStats ().withRecordCount(1.0 ).withAdditionalStats(expectedAdditionalStats()) ,
968974 stateMessagesFromThirdStream[0 ].destinationStats,
969975 )
970976 assertEquals(
@@ -987,7 +993,7 @@ abstract class BasicFunctionalityIntegrationTest(
987993 stateMessagesFromSecondStream[1 ].sourceStats,
988994 )
989995 assertEquals(
990- AirbyteStateStats ().withRecordCount(1.0 ),
996+ AirbyteStateStats ().withRecordCount(1.0 ).withAdditionalStats(expectedAdditionalStats()) ,
991997 stateMessagesFromSecondStream[1 ].destinationStats,
992998 )
993999 assertEquals(
@@ -1579,7 +1585,13 @@ abstract class BasicFunctionalityIntegrationTest(
15791585 destinationRecordCount = 1 ,
15801586 checkpointKey = checkpointKeyForMedium(),
15811587 totalRecords = 1L ,
1582- totalBytes = expectedBytes
1588+ totalBytes = expectedBytes,
1589+ additionalStats =
1590+ if (includesAdditionalStats)
1591+ ObservabilityMetrics .entries
1592+ .associate { it.metricName to 0.0 }
1593+ .toMutableMap()
1594+ else mutableMapOf (),
15831595 )
15841596 .asProtocolMessage()
15851597 assertEquals(
@@ -5210,6 +5222,14 @@ abstract class BasicFunctionalityIntegrationTest(
52105222 }
52115223 }
52125224
5225+ private fun expectedAdditionalStats (): AdditionalStats {
5226+ val expectedAdditionalStats = AdditionalStats ()
5227+ ObservabilityMetrics .entries.forEach {
5228+ expectedAdditionalStats.withAdditionalProperty(it.metricName, 0.0 )
5229+ }
5230+ return expectedAdditionalStats
5231+ }
5232+
52135233 protected fun namespaceMapperForMedium (): NamespaceMapper {
52145234 return when (dataChannelMedium) {
52155235 DataChannelMedium .STDIO ->
0 commit comments