Skip to content

Commit a27e280

Browse files
increase notification test reliability by checking for kafka connectors instead of sleep
Issue: ZENKO-5203
1 parent 68e4ca8 commit a27e280

File tree

2 files changed

+75
-4
lines changed

2 files changed

+75
-4
lines changed

tests/ctst/steps/notifications.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { S3, Utils, AWSVersionObject, NotificationDestination } from 'cli-testin
44
import { Consumer, stringDeserializers } from '@platformatic/kafka';
55
import Zenko from 'world/Zenko';
66
import { putObject } from './utils/utils';
7+
import { waitForBucketInConnectorPipeline } from './utils/kafka';
78

89
const KAFKA_TESTS_TIMEOUT = Number(process.env.KAFKA_TESTS_TIMEOUT) || 60000;
910

@@ -185,8 +186,8 @@ When('i subscribe to {string} notifications for destination {int}',
185186
this.addCommandParameter({ notificationConfiguration: `'${JSON.stringify(destinationConfig)}'` });
186187
}
187188
await S3.putBucketNotificationConfiguration(this.getCommandParameters());
188-
// waiting for oplog populator to take the putNotificationConfiguration into account
189-
await Utils.sleep(10000);
189+
const hosts = this.getSaved<NotificationDestination[]>('notificationDestinations')[destination].hosts;
190+
await waitForBucketInConnectorPipeline(hosts, this.getSaved<string>('bucketName'));
190191
});
191192

192193
When('i subscribe to {string} notifications for destination {int} with {string} filter',
@@ -235,8 +236,8 @@ When('i subscribe to {string} notifications for destination {int} with {string}
235236
this.addCommandParameter({ notificationConfiguration: `'${JSON.stringify(destinationConfig)}'` });
236237
}
237238
await S3.putBucketNotificationConfiguration(this.getCommandParameters());
238-
// waiting for oplog populator to take the putNotificationConfiguration into account
239-
await Utils.sleep(10000);
239+
const hosts = this.getSaved<NotificationDestination[]>('notificationDestinations')[destination].hosts;
240+
await waitForBucketInConnectorPipeline(hosts, this.getSaved<string>('bucketName'));
240241
});
241242

242243
When('i unsubscribe from {string} notifications for destination {int}',

tests/ctst/steps/utils/kafka.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { Utils } from 'cli-testing';
2+
3+
function getKafkaConnectUrl(kafkaHosts: string): string {
4+
const releasePrefix = kafkaHosts.split('-base-queue-')[0];
5+
return `http://${releasePrefix}-base-queue-connector:8083/connectors`;
6+
}
7+
8+
interface ConnectorInfo {
9+
info: {
10+
name: string;
11+
config: {
12+
pipeline?: string;
13+
[key: string]: unknown;
14+
};
15+
[key: string]: unknown;
16+
};
17+
}
18+
19+
20+
/**
21+
* Polls the Kafka Connect REST API until the given bucket appears in
22+
* at least one connector's MongoDB change-stream pipeline (`$match` →
23+
* `ns.coll.$in`). This ensures the oplog-populator has propagated a
24+
* new `putBucketNotificationConfiguration` to the connector before the
25+
* test proceeds to trigger events.
26+
*/
27+
export async function waitForBucketInConnectorPipeline(
28+
kafkaHosts: string,
29+
bucketName: string,
30+
timeoutMs = 120000,
31+
intervalMs = 1000,
32+
): Promise<void> {
33+
const url = getKafkaConnectUrl(kafkaHosts);
34+
const deadline = Date.now() + timeoutMs;
35+
let lastConnectorCount = 0;
36+
while (Date.now() < deadline) {
37+
try {
38+
const response = await fetch(`${url}?expand=info`, {
39+
signal: AbortSignal.timeout(5000),
40+
});
41+
const connectors = await response.json() as Record<string, ConnectorInfo>;
42+
lastConnectorCount = Object.keys(connectors).length;
43+
44+
for (const connector of Object.values(connectors)) {
45+
const pipelineStr = connector.info?.config?.pipeline;
46+
if (!pipelineStr) {
47+
continue;
48+
}
49+
try {
50+
const pipeline = JSON.parse(pipelineStr) as Array<Record<string, unknown>>;
51+
const matchStage = pipeline[0]?.$match as Record<string, unknown> | undefined;
52+
const nsColl = matchStage?.['ns.coll'] as Record<string, unknown> | undefined;
53+
const bucketList = nsColl?.$in as string[] | undefined;
54+
if (bucketList?.includes(bucketName)) {
55+
return;
56+
}
57+
} catch {
58+
// pipeline not valid JSON, skip
59+
}
60+
}
61+
} catch {
62+
// Kafka Connect not reachable, retry
63+
}
64+
await Utils.sleep(intervalMs);
65+
}
66+
throw new Error(
67+
`waitForBucketInConnectorPipeline timed out after ${timeoutMs}ms waiting for bucket ` +
68+
`"${bucketName}" in connector pipelines (${lastConnectorCount} connectors checked)`,
69+
);
70+
}

0 commit comments

Comments
 (0)