8
8
import com .clickhouse .client .api .query .QueryResponse ;
9
9
import com .clickhouse .client .api .query .QuerySettings ;
10
10
import com .clickhouse .client .api .sql .SQLUtils ;
11
+ import com .clickhouse .jdbc .internal .DriverProperties ;
11
12
import com .clickhouse .jdbc .internal .ExceptionUtils ;
12
13
import com .clickhouse .jdbc .internal .ParsedStatement ;
13
14
import org .slf4j .Logger ;
14
15
import org .slf4j .LoggerFactory ;
15
16
16
- import java .lang .ref .WeakReference ;
17
17
import java .sql .ResultSet ;
18
18
import java .sql .SQLException ;
19
19
import java .sql .SQLFeatureNotSupportedException ;
@@ -35,14 +35,19 @@ public class StatementImpl implements Statement, JdbcV2Wrapper {
35
35
36
36
// State
37
37
private volatile boolean closed ;
38
- private final ConcurrentLinkedQueue <WeakReference < ResultSetImpl > > resultSets ;
38
+ private final ConcurrentLinkedQueue <ResultSetImpl > resultSets ;
39
39
protected ResultSetImpl currentResultSet ;
40
40
protected OperationMetrics metrics ;
41
41
protected List <String > batch ;
42
42
private String lastStatementSql ;
43
43
private ParsedStatement parsedStatement ;
44
44
protected volatile String lastQueryId ;
45
45
private long maxRows ;
46
+ private boolean closeOnCompletion ;
47
+ private boolean resultSetAutoClose ;
48
+ private int maxFieldSize ;
49
+
50
+ // settings local to a statement
46
51
protected QuerySettings localSettings ;
47
52
48
53
public StatementImpl (ConnectionImpl connection ) throws SQLException {
@@ -54,6 +59,7 @@ public StatementImpl(ConnectionImpl connection) throws SQLException {
54
59
this .maxRows = 0 ;
55
60
this .localSettings = QuerySettings .merge (connection .getDefaultQuerySettings (), new QuerySettings ());
56
61
this .resultSets = new ConcurrentLinkedQueue <>();
62
+ this .resultSetAutoClose = connection .getJdbcConfig ().isSet (DriverProperties .RESULTSET_AUTO_CLOSE );
57
63
}
58
64
59
65
protected void ensureOpen () throws SQLException {
@@ -111,30 +117,24 @@ private void closeCurrentResultSet() {
111
117
protected ResultSetImpl executeQueryImpl (String sql , QuerySettings settings ) throws SQLException {
112
118
ensureOpen ();
113
119
114
- // TODO: method should throw exception if no result set returned
115
-
116
120
// Closing before trying to do next request. Otherwise, deadlock because previous connection will not be
117
121
// release before this one completes.
118
- closeCurrentResultSet ();
119
-
120
- QuerySettings mergedSettings = QuerySettings .merge (connection .getDefaultQuerySettings (), settings );
121
- if (maxRows > 0 ) {
122
- mergedSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .MAX_RESULT_ROWS ), maxRows );
123
- mergedSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .RESULT_OVERFLOW_MODE ), "break" );
122
+ if (resultSetAutoClose ) {
123
+ closeCurrentResultSet ();
124
124
}
125
125
126
- if (mergedSettings .getQueryId () != null ) {
127
- lastQueryId = mergedSettings .getQueryId ();
128
- } else {
129
- lastQueryId = UUID .randomUUID ().toString ();
130
- mergedSettings .setQueryId (lastQueryId );
126
+ QuerySettings mergedSettings = QuerySettings .merge (settings , new QuerySettings ());
127
+ if (mergedSettings .getQueryId () == null ) {
128
+ final String queryId = UUID .randomUUID ().toString ();
129
+ mergedSettings .setQueryId (queryId );
131
130
}
131
+ lastQueryId = mergedSettings .getQueryId ();
132
132
LOG .debug ("Query ID: {}" , lastQueryId );
133
133
134
+ QueryResponse response = null ;
134
135
try {
135
136
lastStatementSql = parseJdbcEscapeSyntax (sql );
136
137
LOG .trace ("SQL Query: {}" , lastStatementSql ); // this is not secure for create statements because of passwords
137
- QueryResponse response ;
138
138
if (queryTimeout == 0 ) {
139
139
response = connection .client .query (lastStatementSql , mergedSettings ).get ();
140
140
} else {
@@ -146,11 +146,21 @@ protected ResultSetImpl executeQueryImpl(String sql, QuerySettings settings) thr
146
146
ExceptionUtils .SQL_STATE_CLIENT_ERROR );
147
147
}
148
148
ClickHouseBinaryFormatReader reader = connection .client .newBinaryFormatReader (response );
149
-
149
+ if (reader .getSchema () == null ) {
150
+ throw new SQLException ("Called method expects empty or filled result set but query has returned none. Consider using `java.sql.Statement.execute(java.lang.String)`" , ExceptionUtils .SQL_STATE_CLIENT_ERROR );
151
+ }
150
152
metrics = response .getMetrics ();
151
153
setCurrentResultSet (new ResultSetImpl (this , response , reader ));
152
154
return currentResultSet ;
153
155
} catch (Exception e ) {
156
+ if (response != null ) {
157
+ try {
158
+ response .close ();
159
+ } catch (Exception ex ) {
160
+ LOG .warn ("Failed to close response after exception" , e );
161
+ }
162
+ }
163
+ onResultSetClosed (null );
154
164
throw ExceptionUtils .toSqlState (e );
155
165
}
156
166
}
@@ -167,19 +177,19 @@ public int executeUpdate(String sql) throws SQLException {
167
177
protected int executeUpdateImpl (String sql , QuerySettings settings ) throws SQLException {
168
178
ensureOpen ();
169
179
170
- // TODO: method should throw exception if result set returned
171
180
// Closing before trying to do next request. Otherwise, deadlock because previous connection will not be
172
181
// release before this one completes.
173
- closeCurrentResultSet ();
182
+ if (resultSetAutoClose ) {
183
+ closeCurrentResultSet ();
184
+ }
174
185
175
186
QuerySettings mergedSettings = QuerySettings .merge (connection .getDefaultQuerySettings (), settings );
176
187
177
- if (mergedSettings .getQueryId () != null ) {
178
- lastQueryId = mergedSettings .getQueryId ();
179
- } else {
180
- lastQueryId = UUID .randomUUID ().toString ();
181
- mergedSettings .setQueryId (lastQueryId );
188
+ if (mergedSettings .getQueryId () == null ) {
189
+ final String queryId = UUID .randomUUID ().toString ();
190
+ mergedSettings .setQueryId (queryId );
182
191
}
192
+ lastQueryId = mergedSettings .getQueryId ();
183
193
184
194
lastStatementSql = parseJdbcEscapeSyntax (sql );
185
195
LOG .trace ("SQL Query: {}" , lastStatementSql );
@@ -212,9 +222,8 @@ protected void postUpdateActions() {
212
222
public void close () throws SQLException {
213
223
closed = true ;
214
224
closeCurrentResultSet ();
215
- for (WeakReference <ResultSetImpl > refRs : resultSets ) {
216
- ResultSetImpl resultSet = refRs .get ();
217
- if (resultSet != null ) {
225
+ for (ResultSetImpl resultSet : resultSets ) {
226
+ if (resultSet != null && !resultSet .isClosed ()) {
218
227
try {
219
228
resultSet .close ();
220
229
} catch (Exception e ) {
@@ -227,15 +236,16 @@ public void close() throws SQLException {
227
236
@ Override
228
237
public int getMaxFieldSize () throws SQLException {
229
238
ensureOpen ();
230
- return 0 ;
239
+ return this . maxFieldSize ;
231
240
}
232
241
233
242
@ Override
234
243
public void setMaxFieldSize (int max ) throws SQLException {
235
244
ensureOpen ();
236
- if (! connection . config . isIgnoreUnsupportedRequests () ) {
237
- throw new SQLFeatureNotSupportedException ( "Set max field size is not supported." , ExceptionUtils . SQL_STATE_FEATURE_NOT_SUPPORTED );
245
+ if (max <= 0 ) {
246
+ throw new SQLException ( " max should be a positive integer." );
238
247
}
248
+ this .maxFieldSize = max ;
239
249
}
240
250
241
251
@ Override
@@ -307,7 +317,7 @@ protected void setCurrentResultSet(ResultSetImpl resultSet) {
307
317
ResultSetImpl tmp = currentResultSet ;
308
318
currentResultSet = resultSet ;
309
319
if (tmp != null ) {
310
- resultSets .add (new WeakReference <>( tmp ) );
320
+ resultSets .add (tmp );
311
321
}
312
322
}
313
323
@@ -355,8 +365,8 @@ public boolean getMoreResults() throws SQLException {
355
365
@ Override
356
366
public void setFetchDirection (int direction ) throws SQLException {
357
367
ensureOpen ();
358
- if (! connection . config . isIgnoreUnsupportedRequests () ) {
359
- throw new SQLFeatureNotSupportedException ( "Set fetch direction is not supported." , ExceptionUtils . SQL_STATE_FEATURE_NOT_SUPPORTED );
368
+ if (direction != ResultSet . FETCH_FORWARD && direction != ResultSet . FETCH_REVERSE && direction != ResultSet . FETCH_UNKNOWN ) {
369
+ throw new SQLException ( "Invalid fetch direction: " + direction + ". Should be one of ResultSet.FETCH_FORWARD, ResultSet.FETCH_REVERSE, or ResultSet.FETCH_UNKNOW" );
360
370
}
361
371
}
362
372
@@ -420,6 +430,15 @@ public ConnectionImpl getConnection() throws SQLException {
420
430
return connection ;
421
431
}
422
432
433
+ /**
434
+ * Returns instance of local settings. Can be used to override settings.
435
+ *
436
+ * @return QuerySettings that is used as base for each request.
437
+ */
438
+ public QuerySettings getLocalSettings () {
439
+ return localSettings ;
440
+ }
441
+
423
442
@ Override
424
443
public boolean getMoreResults (int current ) throws SQLException {
425
444
// TODO: implement query batches. When multiple selects in the batch.
@@ -509,11 +528,26 @@ public boolean isPoolable() throws SQLException {
509
528
@ Override
510
529
public void closeOnCompletion () throws SQLException {
511
530
ensureOpen ();
531
+ this .closeOnCompletion = true ;
532
+ }
533
+
534
+ // called each time query is complete or result set is closed
535
+ public void onResultSetClosed (ResultSetImpl resultSet ) throws SQLException {
536
+ if (resultSet != null ) {
537
+ this .resultSets .remove (resultSet );
538
+ }
539
+
540
+ if (this .closeOnCompletion ) {
541
+ if ((resultSets .isEmpty ()) && (currentResultSet == null || currentResultSet .isClosed ())) {
542
+ // last result set is closed.
543
+ this .closed = true ;
544
+ }
545
+ }
512
546
}
513
547
514
548
@ Override
515
549
public boolean isCloseOnCompletion () throws SQLException {
516
- return false ;
550
+ return this . closeOnCompletion ;
517
551
}
518
552
519
553
@ Override
@@ -526,12 +560,20 @@ public long getLargeUpdateCount() throws SQLException {
526
560
public void setLargeMaxRows (long max ) throws SQLException {
527
561
ensureOpen ();
528
562
maxRows = max ;
563
+ // This method override user set overflow mode on purpose:
564
+ // 1. Spec clearly states that after calling this method with a limit > 0 all rows over limit are dropped.
565
+ // 2. Calling this method should not cause throwing exception for future queries what only `break` can guarantee
566
+ // 3. If user wants different behavior then they are can use connection properties.
529
567
if (max > 0 ) {
530
568
localSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .MAX_RESULT_ROWS ), maxRows );
531
- localSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .RESULT_OVERFLOW_MODE ), "break" );
569
+ localSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .RESULT_OVERFLOW_MODE ),
570
+ ServerSettings .RESULT_OVERFLOW_MODE_BREAK );
532
571
} else {
533
- localSettings .resetOption (ClientConfigProperties .serverSetting (ServerSettings .MAX_RESULT_ROWS ));
534
- localSettings .resetOption (ClientConfigProperties .serverSetting (ServerSettings .RESULT_OVERFLOW_MODE ));
572
+ // overriding potential client settings (set thru connection setup)
573
+ // there is no no limit value so we use very large limit.
574
+ localSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .MAX_RESULT_ROWS ), Long .MAX_VALUE );
575
+ localSettings .setOption (ClientConfigProperties .serverSetting (ServerSettings .RESULT_OVERFLOW_MODE ),
576
+ ServerSettings .RESULT_OVERFLOW_MODE_BREAK );
535
577
}
536
578
}
537
579
@@ -573,6 +615,4 @@ public long executeLargeUpdate(String sql, String[] columnNames) throws SQLExcep
573
615
public String getLastQueryId () {
574
616
return lastQueryId ;
575
617
}
576
-
577
-
578
618
}
0 commit comments