@@ -5,8 +5,59 @@ import { Consumer, stringDeserializers } from '@platformatic/kafka';
55import Zenko from 'world/Zenko' ;
66import { putObject } from './utils/utils' ;
77import { waitForBucketInConnectorPipeline } from './utils/kafka' ;
8+ import { createKubeAppsV1Client } from './utils/kubernetes' ;
89
910const KAFKA_TESTS_TIMEOUT = Number ( process . env . KAFKA_TESTS_TIMEOUT ) || 60000 ;
11+ const BACKBEAT_DEPENDENCY_ANNOTATION = 'operator.zenko.io/dependencies' ;
12+ const BACKBEAT_DATA_SERVICES = [ 'backbeat-config' ] ;
13+
14+ /**
15+ * Logs the current status of all backbeat-related deployments without blocking.
16+ * Used to confirm whether pods are mid-rollout when notification events are triggered.
17+ */
18+ async function logBackbeatDeploymentStatus ( world : Zenko , namespace = 'default' ) : Promise < void > {
19+ try {
20+ const appsClient = createKubeAppsV1Client ( world ) ;
21+ const allDeployments = await appsClient . listNamespacedDeployment ( { namespace } ) ;
22+
23+ const backbeatDeployments = allDeployments . items . filter ( dep => {
24+ const annotations = dep . metadata ?. annotations ;
25+ return annotations &&
26+ BACKBEAT_DATA_SERVICES . some ( svc => annotations [ BACKBEAT_DEPENDENCY_ANNOTATION ] ?. includes ( svc ) ) ;
27+ } ) ;
28+
29+ const statusSummary = backbeatDeployments . map ( dep => {
30+ const name = dep . metadata ?. name ;
31+ const s = dep . status ;
32+ const ready = s ?. readyReplicas ?? 0 ;
33+ const updated = s ?. updatedReplicas ?? 0 ;
34+ const available = s ?. availableReplicas ?? 0 ;
35+ const desired = s ?. replicas ?? 0 ;
36+ const stable = ready === desired && updated === desired && available === desired ;
37+ return { name, desired, ready, updated, available, stable } ;
38+ } ) ;
39+
40+ const unstable = statusSummary . filter ( s => ! s . stable ) ;
41+
42+ // eslint-disable-next-line no-console
43+ console . log ( 'Backbeat deployment status snapshot' , {
44+ total : statusSummary . length ,
45+ unstableCount : unstable . length ,
46+ deployments : statusSummary ,
47+ } ) ;
48+
49+ if ( unstable . length > 0 ) {
50+ // eslint-disable-next-line no-console
51+ console . warn ( 'Backbeat deployments NOT stable at event trigger time' , {
52+ unstable,
53+ } ) ;
54+ }
55+ } catch ( err ) {
56+ // eslint-disable-next-line no-console
57+ console . error ( 'Failed to check backbeat deployment status' , { err } ) ;
58+ }
59+ }
60+
1061
1162const allNotificationTypes = [
1263 's3:ObjectCreated:Put' ,
@@ -272,6 +323,7 @@ When('i unsubscribe from {string} notifications for destination {int}',
272323 await Utils . sleep ( 10000 ) ;
273324 } ) ;
274325
326+
275327When ( 'a {string} event is triggered {string} {string}' ,
276328 async function ( this : Zenko , notificationType : string , enable : string , filterType : string ) {
277329 this . resetCommand ( ) ;
@@ -283,6 +335,7 @@ When('a {string} event is triggered {string} {string}',
283335 `${ objName } ${ this . getSaved < string > ( 'objectNameSufix' ) } ` ;
284336 }
285337 this . addToSaved ( 'objectName' , objName ) ;
338+ await logBackbeatDeploymentStatus ( this ) ;
286339 switch ( notificationType ) {
287340 case 's3:ObjectCreated:Put' :
288341 await putObject ( this , objName ) ;
0 commit comments