44import static com .databricks .jdbc .integration .IntegrationTestUtil .getValidJDBCConnection ;
55import static com .github .tomakehurst .wiremock .client .WireMock .getRequestedFor ;
66import static com .github .tomakehurst .wiremock .client .WireMock .urlPathMatching ;
7- import static org .junit .jupiter .api .Assertions .assertEquals ;
8- import static org .junit .jupiter .api .Assertions .assertTrue ;
7+ import static org .junit .jupiter .api .Assertions .*;
98
109import com .databricks .jdbc .api .impl .DatabricksResultSet ;
1110import com .databricks .jdbc .api .impl .DatabricksResultSetMetaData ;
1514import java .sql .SQLException ;
1615import java .sql .Statement ;
1716import java .util .Properties ;
17+ import java .util .concurrent .atomic .AtomicReference ;
1818import org .junit .jupiter .api .Test ;
1919
2020/** Test SQL execution with results spanning multiple chunks. */
2121public class MultiChunkExecutionIntegrationTests extends AbstractFakeServiceIntegrationTests {
2222
2323 @ Test
24- void testMultiChunkSelect () throws SQLException {
24+ void testMultiChunkSelect () throws SQLException , InterruptedException {
2525 final String table = "samples.tpch.lineitem" ;
2626
2727 // To save on the size of stub mappings, the test uses just enough rows to span multiple chunks.
@@ -36,38 +36,62 @@ void testMultiChunkSelect() throws SQLException {
3636 final Statement statement = connection .createStatement ();
3737 statement .setMaxRows (maxRows );
3838
39- try (ResultSet rs = statement .executeQuery (sql )) {
40- DatabricksResultSetMetaData metaData = (DatabricksResultSetMetaData ) rs .getMetaData ();
39+ final AtomicReference <Throwable > threadException = new AtomicReference <>();
4140
42- int rowCount = 0 ;
43- while (rs .next ()) {
44- rowCount ++;
45- }
41+ // Iterate through the result set in a different thread to surface any 1st-level thread-safety
42+ // issues
43+ Thread thread =
44+ new Thread (
45+ () -> {
46+ try (ResultSet rs = statement .executeQuery (sql )) {
47+ DatabricksResultSetMetaData metaData =
48+ (DatabricksResultSetMetaData ) rs .getMetaData ();
49+
50+ int rowCount = 0 ;
51+ while (rs .next ()) {
52+ rowCount ++;
53+ }
54+
55+ // The result should have the same number of rows as the limit
56+ assertEquals (maxRows , rowCount );
57+ assertEquals (maxRows , metaData .getTotalRows ());
58+
59+ // The result should be split into multiple chunks
60+ assertTrue (metaData .getChunkCount () > 1 , "Chunk count should be greater than 1" );
61+
62+ // The number of cloud fetch calls should be equal to the number of chunks
63+ final int cloudFetchCalls =
64+ getCloudFetchApiExtension ()
65+ .countRequestsMatching (getRequestedFor (urlPathMatching (".*" )).build ())
66+ .getCount ();
67+ // cloud fetch calls can be retried
68+ assertTrue (cloudFetchCalls >= metaData .getChunkCount ());
69+
70+ if (isSqlExecSdkClient ()) {
71+ // Number of requests to fetch external links should be one less than the total
72+ // number of chunks as first chunk link is already fetched
73+ final String statementId = ((DatabricksResultSet ) rs ).getStatementId ();
74+ final String resultChunkPathRegex =
75+ String .format (RESULT_CHUNK_PATH , statementId , ".*" );
76+ getDatabricksApiExtension ()
77+ .verify (
78+ (int ) (metaData .getChunkCount () - 1 ),
79+ getRequestedFor (urlPathMatching (resultChunkPathRegex )));
80+ }
81+ } catch (Throwable e ) {
82+ threadException .set (e );
83+ }
84+ });
85+
86+ thread .start ();
87+ thread .join (10_000 );
4688
47- // The result should have the same number of rows as the limit
48- assertEquals (maxRows , rowCount );
49- assertEquals (maxRows , metaData .getTotalRows ());
50-
51- // The result should be split into multiple chunks
52- assertTrue (metaData .getChunkCount () > 1 , "Chunk count should be greater than 1" );
53-
54- // The number of cloud fetch calls should be equal to the number of chunks
55- final int cloudFetchCalls =
56- getCloudFetchApiExtension ()
57- .countRequestsMatching (getRequestedFor (urlPathMatching (".*" )).build ())
58- .getCount ();
59- // cloud fetch calls can be retried
60- assertTrue (cloudFetchCalls >= metaData .getChunkCount ());
61-
62- if (isSqlExecSdkClient ()) {
63- // Number of requests to fetch external links should be one less than the total number of
64- // chunks as first chunk link is already fetched
65- final String statementId = ((DatabricksResultSet ) rs ).getStatementId ();
66- final String resultChunkPathRegex = String .format (RESULT_CHUNK_PATH , statementId , ".*" );
67- getDatabricksApiExtension ()
68- .verify (
69- (int ) (metaData .getChunkCount () - 1 ),
70- getRequestedFor (urlPathMatching (resultChunkPathRegex )));
89+ // Check if the thread had an exception
90+ if (threadException .get () != null ) {
91+ if (threadException .get () instanceof AssertionError ) {
92+ throw (AssertionError ) threadException .get ();
93+ } else {
94+ fail ("Test thread failed with exception: " + threadException .get ());
7195 }
7296 }
7397
0 commit comments