@@ -43,8 +43,6 @@ protected Mutation(ClickHouseRequest<?> request, boolean sealed) {
4343
4444 this .options .putAll (request .options );
4545 this .settings .putAll (request .settings );
46-
47- this .sessionId = request .sessionId ;
4846 }
4947
5048 @ Override
@@ -90,13 +88,13 @@ public Mutation data(String file, ClickHouseCompression compression) {
9088
9189 final ClickHouseRequest <?> self = this ;
9290 final String fileName = ClickHouseChecker .nonEmpty (file , "File" );
93- this .input = ClickHouseDeferredValue .of (() -> {
91+ this .input = changeProperty ( PROP_DATA , this . input , ClickHouseDeferredValue .of (() -> {
9492 try {
9593 return ClickHouseInputStream .of (new FileInputStream (fileName ), 123 , compression );
9694 } catch (FileNotFoundException e ) {
9795 throw new IllegalArgumentException (e );
9896 }
99- });
97+ })) ;
10098 return this ;
10199 }
102100
@@ -119,7 +117,8 @@ public Mutation data(InputStream input) {
119117 public Mutation data (ClickHouseInputStream input ) {
120118 checkSealed ();
121119
122- this .input = ClickHouseDeferredValue .of (input , ClickHouseInputStream .class );
120+ this .input = changeProperty (PROP_DATA , this .input ,
121+ ClickHouseDeferredValue .of (input , ClickHouseInputStream .class ));
123122
124123 return this ;
125124 }
@@ -133,7 +132,7 @@ public Mutation data(ClickHouseInputStream input) {
133132 public Mutation data (ClickHouseDeferredValue <ClickHouseInputStream > input ) {
134133 checkSealed ();
135134
136- this .input = input ;
135+ this .input = changeProperty ( PROP_DATA , this . input , input ) ;
137136
138137 return this ;
139138 }
@@ -162,16 +161,7 @@ public ClickHouseResponse sendAndWait() throws ClickHouseException {
162161 @ Override
163162 public Mutation table (String table , String queryId ) {
164163 checkSealed ();
165-
166- this .queryId = queryId ;
167-
168- String sql = "INSERT INTO " + ClickHouseChecker .nonBlank (table , "table" );
169- if (!sql .equals (this .sql )) {
170- this .sql = sql ;
171- this .preparedQuery = null ;
172- resetCache ();
173- }
174-
164+ super .query ("INSERT INTO " + ClickHouseChecker .nonBlank (table , "table" ), queryId );
175165 return this ;
176166 }
177167
@@ -190,7 +180,6 @@ public Mutation seal() {
190180
191181 req .input = input ;
192182 req .queryId = queryId ;
193- req .sessionId = sessionId ;
194183 req .sql = sql ;
195184
196185 req .preparedQuery = preparedQuery ;
@@ -202,6 +191,11 @@ public Mutation seal() {
202191
203192 private static final long serialVersionUID = 4990313525960702287L ;
204193
194+ static final String PROP_DATA = "data" ;
195+ static final String PROP_PREPARED_QUERY = "preparedQuery" ;
196+ static final String PROP_QUERY = "query" ;
197+ static final String PROP_QUERY_ID = "queryId" ;
198+
205199 private final boolean sealed ;
206200
207201 private transient ClickHouseClient client ;
@@ -216,7 +210,6 @@ public Mutation seal() {
216210
217211 protected transient ClickHouseDeferredValue <ClickHouseInputStream > input ;
218212 protected String queryId ;
219- protected String sessionId ;
220213 protected String sql ;
221214 protected ClickHouseParameterizedQuery preparedQuery ;
222215
@@ -245,6 +238,13 @@ protected ClickHouseRequest(ClickHouseClient client, Function<ClickHouseNodeSele
245238 this .namedParameters = new HashMap <>();
246239 }
247240
241+ protected <T > T changeProperty (String property , T oldValue , T newValue ) {
242+ if (changeListener != null && !Objects .equals (oldValue , newValue )) {
243+ changeListener .propertyChanged (this , property , oldValue , newValue );
244+ }
245+ return newValue ;
246+ }
247+
248248 protected void checkSealed () {
249249 if (sealed ) {
250250 throw new IllegalStateException ("Sealed request is immutable" );
@@ -291,7 +291,6 @@ public ClickHouseRequest<SelfT> copy() {
291291 req .namedParameters .putAll (namedParameters );
292292 req .input = input ;
293293 req .queryId = queryId ;
294- req .sessionId = sessionId ;
295294 req .sql = sql ;
296295 req .preparedQuery = preparedQuery ;
297296 return req ;
@@ -412,7 +411,8 @@ public Optional<String> getQueryId() {
412411 */
413412 public ClickHouseParameterizedQuery getPreparedQuery () {
414413 if (preparedQuery == null ) {
415- preparedQuery = ClickHouseParameterizedQuery .of (getConfig (), getQuery ());
414+ preparedQuery = changeProperty (PROP_PREPARED_QUERY , preparedQuery ,
415+ ClickHouseParameterizedQuery .of (getConfig (), getQuery ()));
416416 }
417417
418418 return preparedQuery ;
@@ -433,6 +433,7 @@ public Map<String, Object> getSettings() {
433433 * @return session id
434434 */
435435 public Optional <String > getSessionId () {
436+ String sessionId = (String ) getConfig ().getOption (ClickHouseClientOption .SESSION_ID );
436437 return ClickHouseChecker .isNullOrEmpty (sessionId ) ? Optional .empty () : Optional .of (sessionId );
437438 }
438439
@@ -681,12 +682,7 @@ public SelfT external(Collection<ClickHouseExternalTable> tables) {
681682 @ SuppressWarnings ("unchecked" )
682683 public SelfT format (ClickHouseFormat format ) {
683684 checkSealed ();
684-
685- if (format == null ) {
686- removeOption (ClickHouseClientOption .FORMAT );
687- } else {
688- option (ClickHouseClientOption .FORMAT , format );
689- }
685+ option (ClickHouseClientOption .FORMAT , format );
690686 return (SelfT ) this ;
691687 }
692688
@@ -739,11 +735,7 @@ public SelfT options(Map<ClickHouseOption, Serializable> options) {
739735 m .putAll (this .options );
740736 if (options != null ) {
741737 for (Entry <ClickHouseOption , Serializable > e : options .entrySet ()) {
742- if (e .getValue () == null ) {
743- removeOption (e .getKey ());
744- } else {
745- option (e .getKey (), e .getValue ());
746- }
738+ option (e .getKey (), e .getValue ());
747739 m .remove (e .getKey ());
748740 }
749741 }
@@ -1078,35 +1070,35 @@ public SelfT query(String sql) {
10781070 public SelfT query (ClickHouseParameterizedQuery query , String queryId ) {
10791071 checkSealed ();
10801072
1081- if (!ClickHouseChecker .nonNull (query , "query" ).equals (this .preparedQuery )) {
1082- this .preparedQuery = query ;
1083- this .sql = query .getOriginalQuery ();
1073+ if (!ClickHouseChecker .nonNull (query , PROP_QUERY ).equals (this .preparedQuery )) {
1074+ this .preparedQuery = changeProperty ( PROP_PREPARED_QUERY , this . preparedQuery , query ) ;
1075+ this .sql = changeProperty ( PROP_QUERY , this . sql , query .getOriginalQuery () );
10841076 resetCache ();
10851077 }
10861078
1087- this .queryId = queryId ;
1079+ this .queryId = changeProperty ( PROP_QUERY_ID , this . queryId , queryId ) ;
10881080
10891081 return (SelfT ) this ;
10901082 }
10911083
10921084 /**
10931085 * Sets query and optinally query id.
10941086 *
1095- * @param sql non-empty query
1087+ * @param query non-empty query
10961088 * @param queryId query id, null means no query id
10971089 * @return the request itself
10981090 */
10991091 @ SuppressWarnings ("unchecked" )
1100- public SelfT query (String sql , String queryId ) {
1092+ public SelfT query (String query , String queryId ) {
11011093 checkSealed ();
11021094
1103- if (!ClickHouseChecker .nonBlank (sql , "sql" ).equals (this .sql )) {
1104- this .sql = sql ;
1105- this .preparedQuery = null ;
1095+ if (!ClickHouseChecker .nonBlank (query , PROP_QUERY ).equals (this .sql )) {
1096+ this .sql = changeProperty ( PROP_QUERY , this . sql , query ) ;
1097+ this .preparedQuery = changeProperty ( PROP_PREPARED_QUERY , this . preparedQuery , null ) ;
11061098 resetCache ();
11071099 }
11081100
1109- this .queryId = queryId ;
1101+ this .queryId = changeProperty ( PROP_QUERY_ID , this . queryId , queryId ) ;
11101102
11111103 return (SelfT ) this ;
11121104 }
@@ -1121,12 +1113,9 @@ public SelfT query(String sql, String queryId) {
11211113 public SelfT clearSession () {
11221114 checkSealed ();
11231115
1116+ removeOption (ClickHouseClientOption .SESSION_ID );
11241117 removeOption (ClickHouseClientOption .SESSION_CHECK );
11251118 removeOption (ClickHouseClientOption .SESSION_TIMEOUT );
1126- if (this .sessionId != null ) {
1127- this .sessionId = null ;
1128- resetCache ();
1129- }
11301119
11311120 return (SelfT ) this ;
11321121 }
@@ -1178,12 +1167,9 @@ public SelfT session(String sessionId, Integer timeout) {
11781167 public SelfT session (String sessionId , Boolean check , Integer timeout ) {
11791168 checkSealed ();
11801169
1170+ option (ClickHouseClientOption .SESSION_ID , sessionId );
11811171 option (ClickHouseClientOption .SESSION_CHECK , check );
11821172 option (ClickHouseClientOption .SESSION_TIMEOUT , timeout );
1183- if (!Objects .equals (this .sessionId , sessionId )) {
1184- this .sessionId = sessionId ;
1185- resetCache ();
1186- }
11871173
11881174 return (SelfT ) this ;
11891175 }
@@ -1263,12 +1249,7 @@ public SelfT table(String table, String queryId) {
12631249 @ SuppressWarnings ("unchecked" )
12641250 public SelfT use (String database ) {
12651251 checkSealed ();
1266-
1267- if (database == null ) {
1268- removeOption (ClickHouseClientOption .DATABASE );
1269- } else {
1270- option (ClickHouseClientOption .DATABASE , database );
1271- }
1252+ option (ClickHouseClientOption .DATABASE , database );
12721253 return (SelfT ) this ;
12731254 }
12741255
@@ -1384,11 +1365,10 @@ public SelfT reset() {
13841365 }
13851366 this .namedParameters .clear ();
13861367
1387- this .input = null ;
1388- this .sql = null ;
1389- this .preparedQuery = null ;
1390- this .queryId = null ;
1391- this .sessionId = null ;
1368+ this .input = changeProperty (PROP_DATA , this .input , null );
1369+ this .sql = changeProperty (PROP_QUERY , this .sql , null );
1370+ this .preparedQuery = changeProperty (PROP_PREPARED_QUERY , this .preparedQuery , null );
1371+ this .queryId = changeProperty (PROP_QUERY_ID , this .queryId , null );
13921372
13931373 resetCache ();
13941374
@@ -1414,7 +1394,6 @@ public ClickHouseRequest<SelfT> seal() {
14141394
14151395 req .input = input ;
14161396 req .queryId = queryId ;
1417- req .sessionId = sessionId ;
14181397 req .sql = sql ;
14191398 req .preparedQuery = preparedQuery ;
14201399 }
0 commit comments