3
3
import com .clickhouse .client .api .ClientConfigProperties ;
4
4
import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
5
5
import com .clickhouse .client .api .internal .ServerSettings ;
6
- import com .clickhouse .client .api .metrics .OperationMetrics ;
7
- import com .clickhouse .client .api .metrics .ServerMetrics ;
8
6
import com .clickhouse .client .api .query .QueryResponse ;
9
7
import com .clickhouse .client .api .query .QuerySettings ;
10
8
import com .clickhouse .client .api .sql .SQLUtils ;
16
14
17
15
import java .sql .ResultSet ;
18
16
import java .sql .SQLException ;
19
- import java .sql .SQLFeatureNotSupportedException ;
20
17
import java .sql .SQLWarning ;
21
18
import java .sql .Statement ;
22
19
import java .util .ArrayList ;
@@ -35,9 +32,9 @@ public class StatementImpl implements Statement, JdbcV2Wrapper {
35
32
36
33
// State
37
34
private volatile boolean closed ;
38
- private final ConcurrentLinkedQueue <ResultSetImpl > resultSets ;
35
+ private final ConcurrentLinkedQueue <ResultSetImpl > resultSets ; // all result sets linked to this statement
39
36
protected ResultSetImpl currentResultSet ;
40
- protected OperationMetrics metrics ;
37
+ protected long currentUpdateCount = - 1 ;
41
38
protected List <String > batch ;
42
39
private String lastStatementSql ;
43
40
private ParsedStatement parsedStatement ;
@@ -54,7 +51,6 @@ public StatementImpl(ConnectionImpl connection) throws SQLException {
54
51
this .connection = connection ;
55
52
this .queryTimeout = 0 ;
56
53
this .closed = false ;
57
- this .metrics = null ;
58
54
this .batch = new ArrayList <>();
59
55
this .maxRows = 0 ;
60
56
this .localSettings = QuerySettings .merge (connection .getDefaultQuerySettings (), new QuerySettings ());
@@ -97,7 +93,9 @@ protected String getLastStatementSql() {
97
93
@ Override
98
94
public ResultSet executeQuery (String sql ) throws SQLException {
99
95
ensureOpen ();
100
- return executeQueryImpl (sql , localSettings );
96
+ currentUpdateCount = -1 ;
97
+ currentResultSet = executeQueryImpl (sql , localSettings );
98
+ return currentResultSet ;
101
99
}
102
100
103
101
private void closeCurrentResultSet () {
@@ -149,9 +147,7 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr
149
147
if (reader .getSchema () == null ) {
150
148
throw new SQLException ("Called method expects empty or filled result set but query has returned none. Consider using `java.sql.Statement.execute(java.lang.String)`" , ExceptionUtils .SQL_STATE_CLIENT_ERROR );
151
149
}
152
- metrics = response .getMetrics ();
153
- setCurrentResultSet (new ResultSetImpl (this , response , reader ));
154
- return currentResultSet ;
150
+ return new ResultSetImpl (this , response , reader );
155
151
} catch (Exception e ) {
156
152
if (response != null ) {
157
153
try {
@@ -168,13 +164,10 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr
168
164
@ Override
169
165
public int executeUpdate (String sql ) throws SQLException {
170
166
ensureOpen ();
171
- parsedStatement = connection .getSqlParser ().parsedStatement (sql );
172
- int updateCount = executeUpdateImpl (sql , localSettings );
173
- postUpdateActions ();
174
- return updateCount ;
167
+ return (int )executeLargeUpdate (sql );
175
168
}
176
169
177
- protected int executeUpdateImpl (String sql , QuerySettings settings ) throws SQLException {
170
+ protected long executeUpdateImpl (String sql , QuerySettings settings ) throws SQLException {
178
171
ensureOpen ();
179
172
180
173
// Closing before trying to do next request. Otherwise, deadlock because previous connection will not be
@@ -196,9 +189,7 @@ protected int executeUpdateImpl(String sql, QuerySettings settings) throws SQLEx
196
189
int updateCount = 0 ;
197
190
try (QueryResponse response = queryTimeout == 0 ? connection .client .query (lastStatementSql , mergedSettings ).get ()
198
191
: connection .client .query (lastStatementSql , mergedSettings ).get (queryTimeout , TimeUnit .SECONDS )) {
199
- setCurrentResultSet (null );
200
192
updateCount = Math .max (0 , (int ) response .getWrittenRows ()); // when statement alters schema no result rows returned.
201
- metrics = response .getMetrics ();
202
193
lastQueryId = response .getQueryId ();
203
194
} catch (Exception e ) {
204
195
throw ExceptionUtils .toSqlState (e );
@@ -308,28 +299,17 @@ public void setCursorName(String name) throws SQLException {
308
299
ensureOpen ();
309
300
}
310
301
311
- /**
312
- * Remembers current result set to be able to close it later.
313
- * Sets current resultset to a new value
314
- * @param resultSet new current resultset
315
- */
316
- protected void setCurrentResultSet (ResultSetImpl resultSet ) {
317
- ResultSetImpl tmp = currentResultSet ;
318
- currentResultSet = resultSet ;
319
- if (tmp != null ) {
320
- resultSets .add (tmp );
321
- }
322
- }
323
-
324
302
@ Override
325
303
public boolean execute (String sql ) throws SQLException {
326
304
ensureOpen ();
327
305
parsedStatement = connection .getSqlParser ().parsedStatement (sql );
306
+ currentUpdateCount = -1 ;
307
+ currentResultSet = null ;
328
308
if (parsedStatement .isHasResultSet ()) {
329
- executeQueryImpl (sql , localSettings ); // keep open to allow getResultSet()
309
+ currentResultSet = executeQueryImpl (sql , localSettings );
330
310
return true ;
331
311
} else {
332
- executeUpdateImpl (sql , localSettings );
312
+ currentUpdateCount = executeUpdateImpl (sql , localSettings );
333
313
postUpdateActions ();
334
314
return false ;
335
315
}
@@ -339,9 +319,7 @@ public boolean execute(String sql) throws SQLException {
339
319
public ResultSet getResultSet () throws SQLException {
340
320
ensureOpen ();
341
321
342
- ResultSet resultSet = currentResultSet ;
343
- setCurrentResultSet (null );
344
- return resultSet ;
322
+ return currentResultSet ;
345
323
}
346
324
347
325
@ Override
@@ -353,7 +331,7 @@ public int getUpdateCount() throws SQLException {
353
331
@ Override
354
332
public boolean getMoreResults () throws SQLException {
355
333
ensureOpen ();
356
- return false ;
334
+ return getMoreResults ( Statement . CLOSE_CURRENT_RESULT ) ;
357
335
}
358
336
359
337
@ Override
@@ -435,8 +413,17 @@ public QuerySettings getLocalSettings() {
435
413
436
414
@ Override
437
415
public boolean getMoreResults (int current ) throws SQLException {
438
- // TODO: implement query batches. When multiple selects in the batch.
439
- return false ;
416
+ // This method designed to iterate over multiple resultsets after "execute(sql)" method is called
417
+ // But we have at most only one always
418
+ // Then we should close any existing and return false to indicate that no more result are present
419
+
420
+ if (currentResultSet != null && current != Statement .KEEP_CURRENT_RESULT ) {
421
+ currentResultSet .close ();
422
+ }
423
+
424
+ currentResultSet = null ;
425
+ currentUpdateCount = -1 ;
426
+ return false ; // false indicates that no more results (or it is an update count)
440
427
}
441
428
442
429
@ Override
@@ -547,13 +534,7 @@ public boolean isCloseOnCompletion() throws SQLException {
547
534
@ Override
548
535
public long getLargeUpdateCount () throws SQLException {
549
536
ensureOpen ();
550
- if (currentResultSet == null && metrics != null ) {
551
- long updateCount = metrics .getMetric (ServerMetrics .NUM_ROWS_WRITTEN ).getLong ();
552
- metrics = null ;// clear metrics
553
- return updateCount ;
554
- }
555
-
556
- return -1L ;
537
+ return currentUpdateCount ;
557
538
}
558
539
559
540
@ Override
@@ -590,22 +571,25 @@ public long[] executeLargeBatch() throws SQLException {
590
571
591
572
@ Override
592
573
public long executeLargeUpdate (String sql ) throws SQLException {
593
- return executeUpdate (sql );
574
+ parsedStatement = connection .getSqlParser ().parsedStatement (sql );
575
+ long updateCount = executeUpdateImpl (sql , localSettings );
576
+ postUpdateActions ();
577
+ return updateCount ;
594
578
}
595
579
596
580
@ Override
597
581
public long executeLargeUpdate (String sql , int autoGeneratedKeys ) throws SQLException {
598
- return executeUpdate (sql , autoGeneratedKeys );
582
+ return executeLargeUpdate (sql );
599
583
}
600
584
601
585
@ Override
602
586
public long executeLargeUpdate (String sql , int [] columnIndexes ) throws SQLException {
603
- return executeUpdate (sql , columnIndexes );
587
+ return executeLargeUpdate (sql );
604
588
}
605
589
606
590
@ Override
607
591
public long executeLargeUpdate (String sql , String [] columnNames ) throws SQLException {
608
- return executeUpdate (sql , columnNames );
592
+ return executeLargeUpdate (sql );
609
593
}
610
594
611
595
/**
0 commit comments