@@ -56,7 +56,7 @@ void testStreamCloseWithThreadInterrupt() throws Exception {
5656 String token = System .getenv ("TESTING_INFLUXDB_TOKEN" );
5757 String database = System .getenv ("TESTING_INFLUXDB_DATABASE" );
5858 String measurement = "memory_leak_test_" + System .currentTimeMillis ();
59-
59+ String sql = String . format ( "SELECT * FROM %s" , measurement );
6060
6161 // Prepare config
6262 ClientConfig config = new ClientConfig .Builder ()
@@ -66,23 +66,40 @@ void testStreamCloseWithThreadInterrupt() throws Exception {
6666 .writeNoSync (true )
6767 .build ();
6868
69- // Write test data
7069 try (InfluxDBClient client = InfluxDBClient .getInstance (config )) {
70+ // Write test data
7171 LOG .info ("Writing test data..." );
72- for (int i = 0 ; i < 100 ; i ++) {
72+ for (int i = 0 ; i < 3 ; i ++) {
7373 client .writeRecord (String .format ("%s,id=%04d temp=%f" ,
7474 measurement , i , 20.0 + Math .random () * 10 ));
7575 }
76- }
7776
78- TimeUnit .MILLISECONDS .sleep (500 );
77+ // Wait for data to be queryable (CI environments can be slower)
78+ LOG .info ("Waiting for data to be available..." );
79+ int attempts = 0 ;
80+ boolean hasData = false ;
81+ while (attempts < 10 && !hasData ) {
82+ try (Stream <PointValues > testStream = client .queryPoints (sql )) {
83+ hasData = testStream .findFirst ().isPresent ();
84+ }
85+ if (!hasData ) {
86+ LOG .info ("Data not yet available, waiting... (attempt " + (attempts + 1 ) + "/10)" );
87+ TimeUnit .MILLISECONDS .sleep (500 );
88+ attempts ++;
89+ }
90+ }
91+
92+ if (!hasData ) {
93+ Assertions .fail ("No data available after writing and waiting " + (attempts * 500 ) + "ms" );
94+ }
95+ LOG .info ("Data is available, starting test..." );
96+
97+ }
7998
8099 // Query data
81100 InfluxDBClient client = InfluxDBClient .getInstance (config );
82101 //noinspection TryFinallyCanBeTryWithResources
83102 try {
84- String sql = String .format ("SELECT * FROM %s" , measurement );
85-
86103 // Synchronization to ensure we interrupt during consumption
87104 CountDownLatch consumingStarted = new CountDownLatch (1 );
88105 AtomicInteger rowsProcessed = new AtomicInteger (0 );
@@ -117,7 +134,6 @@ void testStreamCloseWithThreadInterrupt() throws Exception {
117134 }
118135 });
119136
120- LOG .info ("Starting consumer thread..." );
121137 queryThread .start ();
122138
123139 // Wait for thread to start consuming
0 commit comments