2121import tech .ydb .core .StatusCode ;
2222import tech .ydb .query .QueryClient ;
2323import tech .ydb .query .QuerySession ;
24+ import tech .ydb .query .QueryStream ;
2425import tech .ydb .query .QueryTransaction ;
2526import tech .ydb .query .result .QueryInfo ;
2627import tech .ydb .query .result .QueryResultPart ;
@@ -70,8 +71,6 @@ public Entity(int id, String name, byte[] payload, boolean isValid) {
7071 @ ClassRule
7172 public final static GrpcTransportRule ydbTransport = new GrpcTransportRule ();
7273
73- private static QueryClient queryClient = null ;
74-
7574 @ BeforeClass
7675 public static void initSchema () {
7776 logger .info ("Prepare database..." );
@@ -98,95 +97,128 @@ public static void initSchema() {
9897 retryCtx .supplyStatus (session -> session .createTable (tablePath , tableDescription )).join ();
9998 retryCtx .supplyStatus (session -> session .createTable (table2Path , table2Description )).join ();
10099 logger .info ("Prepare database OK" );
101-
102- queryClient = QueryClient .newClient (ydbTransport ).build ();
103100 }
104101
105102 @ AfterClass
106103 public static void dropAll () {
107- try {
108- logger .info ("Clean database..." );
109- String tablePath = ydbTransport .getDatabase () + "/" + TEST_TABLE ;
110- String table2Path = ydbTransport .getDatabase () + "/" + TEST_DOUBLE_TABLE ;
111-
112- SimpleTableClient client = SimpleTableClient .newClient (GrpcTableRpc .useTransport (ydbTransport )).build ();
113- SessionRetryContext retryCtx = SessionRetryContext .create (client ).build ();
114- retryCtx .supplyStatus (session -> session .dropTable (tablePath )).join ().isSuccess ();
115- retryCtx .supplyStatus (session -> session .dropTable (table2Path )).join ().isSuccess ();
116- logger .info ("Clean database OK" );
117- } finally {
118- queryClient .close ();
119- }
104+ logger .info ("Clean database..." );
105+ String tablePath = ydbTransport .getDatabase () + "/" + TEST_TABLE ;
106+ String table2Path = ydbTransport .getDatabase () + "/" + TEST_DOUBLE_TABLE ;
107+
108+ SimpleTableClient client = SimpleTableClient .newClient (GrpcTableRpc .useTransport (ydbTransport )).build ();
109+ SessionRetryContext retryCtx = SessionRetryContext .create (client ).build ();
110+ retryCtx .supplyStatus (session -> session .dropTable (tablePath )).join ().isSuccess ();
111+ retryCtx .supplyStatus (session -> session .dropTable (table2Path )).join ().isSuccess ();
112+ logger .info ("Clean database OK" );
120113 }
121114
122115 @ Test
123116 public void testSimpleSelect () {
124- try (QuerySession session = queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
125- QueryReader reader = QueryReader .readFrom (
126- session .createQuery ("SELECT 2 + 3;" , TxMode .SERIALIZABLE_RW )
127- ).join ().getValue ();
117+ try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
118+ try (QuerySession session = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
119+ QueryReader reader = QueryReader .readFrom (
120+ session .createQuery ("SELECT 2 + 3;" , TxMode .SERIALIZABLE_RW )
121+ ).join ().getValue ();
128122
129123
130- Assert .assertEquals (1 , reader .getResultSetCount ());
131- ResultSetReader rs = reader .getResultSet (0 );
124+ Assert .assertEquals (1 , reader .getResultSetCount ());
125+ ResultSetReader rs = reader .getResultSet (0 );
132126
133- Assert .assertTrue (rs .next ());
134- Assert .assertEquals (1 , rs .getColumnCount ());
135- Assert .assertEquals ("column0" , rs .getColumnName (0 ));
136- Assert .assertEquals (5 , rs .getColumn (0 ).getInt32 ());
127+ Assert .assertTrue (rs .next ());
128+ Assert .assertEquals (1 , rs .getColumnCount ());
129+ Assert .assertEquals ("column0" , rs .getColumnName (0 ));
130+ Assert .assertEquals (5 , rs .getColumn (0 ).getInt32 ());
137131
138- Assert .assertFalse (rs .next ());
132+ Assert .assertFalse (rs .next ());
133+ }
139134 }
140135 }
141136
142137 @ Test
143138 @ Ignore
144139 public void testSimplePrepare () {
145- String query = ""
146- + "DECLARE $id AS Int32?;\n "
147- + "UPSERT INTO `" + TEST_TABLE + "` (id) "
148- + "VALUES ($id)" ;
149- try (QuerySession session = queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
150- ExecuteQuerySettings settings = ExecuteQuerySettings .newBuilder ()
151- .withExecMode (QueryExecMode .PARSE )
152- .build ();
140+ try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
141+ String query = ""
142+ + "DECLARE $id AS Int32?;\n "
143+ + "UPSERT INTO `" + TEST_TABLE + "` (id) "
144+ + "VALUES ($id)" ;
145+ try (QuerySession session = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
146+ ExecuteQuerySettings settings = ExecuteQuerySettings .newBuilder ()
147+ .withExecMode (QueryExecMode .PARSE )
148+ .build ();
153149
154- QueryReader reader = QueryReader .readFrom (
155- session .createQuery (query , TxMode .NONE , Params .empty (), settings )
156- ).join ().getValue ();
150+ QueryReader reader = QueryReader .readFrom (
151+ session .createQuery (query , TxMode .NONE , Params .empty (), settings )
152+ ).join ().getValue ();
157153
158154
159- Assert .assertEquals (1 , reader .getResultSetCount ());
160- ResultSetReader rs = reader .getResultSet (0 );
155+ Assert .assertEquals (1 , reader .getResultSetCount ());
156+ ResultSetReader rs = reader .getResultSet (0 );
161157
162- Assert .assertTrue (rs .next ());
163- Assert .assertEquals (1 , rs .getColumnCount ());
164- Assert .assertEquals ("column0" , rs .getColumnName (0 ));
165- Assert .assertEquals (5 , rs .getColumn (0 ).getInt32 ());
158+ Assert .assertTrue (rs .next ());
159+ Assert .assertEquals (1 , rs .getColumnCount ());
160+ Assert .assertEquals ("column0" , rs .getColumnName (0 ));
161+ Assert .assertEquals (5 , rs .getColumn (0 ).getInt32 ());
166162
167- Assert .assertFalse (rs .next ());
163+ Assert .assertFalse (rs .next ());
164+ }
168165 }
169166 }
170167
171168 @ Test
172169 public void testErrors () {
173- SessionImpl s1 = (SessionImpl )queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ();
174- String id = s1 .getId ();
175- s1 .close ();
176-
177- SessionImpl s2 = (SessionImpl )queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ();
178- Assert .assertEquals (id , s2 .getId ());
179- s2 .updateSessionState (Status .of (StatusCode .ABORTED ));
180- s2 .close ();
181-
182- SessionImpl s3 = (SessionImpl )queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ();
183- Assert .assertEquals (id , s3 .getId ());
184- s3 .updateSessionState (Status .of (StatusCode .BAD_SESSION ));
185- s3 .close ();
186-
187- SessionImpl s4 = (SessionImpl )queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ();
188- Assert .assertNotEquals (id , s4 .getId ());
189- s4 .close ();
170+ try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
171+ SessionImpl s1 = (SessionImpl )client .createSession (Duration .ofSeconds (5 )).join ().getValue ();
172+ String id = s1 .getId ();
173+ s1 .close ();
174+
175+ SessionImpl s2 = (SessionImpl )client .createSession (Duration .ofSeconds (5 )).join ().getValue ();
176+ Assert .assertEquals (id , s2 .getId ());
177+ s2 .updateSessionState (Status .of (StatusCode .ABORTED ));
178+ s2 .close ();
179+
180+ SessionImpl s3 = (SessionImpl )client .createSession (Duration .ofSeconds (5 )).join ().getValue ();
181+ Assert .assertEquals (id , s3 .getId ());
182+ s3 .updateSessionState (Status .of (StatusCode .BAD_SESSION ));
183+ s3 .close ();
184+
185+ SessionImpl s4 = (SessionImpl )client .createSession (Duration .ofSeconds (5 )).join ().getValue ();
186+ Assert .assertNotEquals (id , s4 .getId ());
187+ s4 .close ();
188+ }
189+ }
190+
191+ @ Test
192+ public void testCancelStream () {
193+ try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
194+ QuerySession s1 = client .createSession (Duration .ofSeconds (5 )).join ().getValue ();
195+ String id = s1 .getId ();
196+ s1 .close ();
197+
198+ try (QuerySession s2 = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
199+ Assert .assertEquals (id , s2 .getId ());
200+ s2 .createQuery ("SELECT 2 + 2;" , TxMode .SNAPSHOT_RO ).execute (this ::printQuerySetPart )
201+ .join ().getStatus ().expectSuccess ("cannot execute query" );
202+ }
203+
204+ try (QuerySession s3 = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
205+ Assert .assertEquals (id , s3 .getId ());
206+ final QueryStream query = s3 .createQuery ("SELECT 2 + 2;" , TxMode .SNAPSHOT_RO );
207+ final CompletableFuture <Void > stop = new CompletableFuture <>();
208+ CompletableFuture <Result <QueryInfo >> future = query .execute (part -> {
209+ stop .join ();
210+ printQuerySetPart (part );
211+ });
212+ query .cancel ();
213+ stop .complete (null );
214+ Result <QueryInfo > result = future .join ();
215+ Assert .assertEquals (StatusCode .CLIENT_CANCELLED , result .getStatus ().getCode ());
216+ }
217+
218+ try (QuerySession s4 = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
219+ Assert .assertNotEquals (id , s4 .getId ());
220+ }
221+ }
190222 }
191223
192224 @ Test
@@ -198,35 +230,37 @@ public void testSimpleCRUD() {
198230 entities .add (new Entity (3 , "dublicate" , BYTES_LEN5 , true ));
199231 entities .add (new Entity (5 , "entity 5" , BYTES_LEN2 , false ));
200232
201- for (Entity entity : entities ) {
202- String query = "UPSERT INTO `" + TEST_TABLE + "` (id, name, payload, is_valid) "
203- + "VALUES ($id, $name, $payload, $is_valid)" ;
204-
205- Params params = Params .of (
206- "$id" , PrimitiveValue .newInt32 (entity .id ),
207- "$name" , PrimitiveValue .newText (entity .name ),
208- "$payload" , PrimitiveValue .newBytes (entity .payload ),
209- "$is_valid" , PrimitiveValue .newBool (entity .isValid )
210- );
233+ try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
234+ for (Entity entity : entities ) {
235+ String query = "UPSERT INTO `" + TEST_TABLE + "` (id, name, payload, is_valid) "
236+ + "VALUES ($id, $name, $payload, $is_valid)" ;
237+
238+ Params params = Params .of (
239+ "$id" , PrimitiveValue .newInt32 (entity .id ),
240+ "$name" , PrimitiveValue .newText (entity .name ),
241+ "$payload" , PrimitiveValue .newBytes (entity .payload ),
242+ "$is_valid" , PrimitiveValue .newBool (entity .isValid )
243+ );
244+
245+ try (QuerySession session = client .createSession (SESSION_TIMEOUT ).join ().getValue ()) {
246+ session .createQuery (query , TxMode .SERIALIZABLE_RW , params )
247+ .execute (this ::printQuerySetPart )
248+ .join ().getStatus ().expectSuccess ();
249+ }
250+ }
211251
212- try (QuerySession session = queryClient .createSession (SESSION_TIMEOUT ).join ().getValue ()) {
213- session .createQuery (query , TxMode .SERIALIZABLE_RW , params )
252+ try (QuerySession session = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
253+ String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;" ;
254+ session .createQuery (query , TxMode .SERIALIZABLE_RW )
214255 .execute (this ::printQuerySetPart )
215256 .join ().getStatus ().expectSuccess ();
216257 }
217- }
218-
219- try (QuerySession session = queryClient .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
220- String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;" ;
221- session .createQuery (query , TxMode .SERIALIZABLE_RW )
222- .execute (this ::printQuerySetPart )
223- .join ().getStatus ().expectSuccess ();
224- }
225258
226- try (QuerySession session = queryClient .createSession (SESSION_TIMEOUT ).join ().getValue ()) {
227- session .createQuery ("DELETE FROM " + TEST_TABLE , TxMode .SERIALIZABLE_RW )
228- .execute (this ::printQuerySetPart )
229- .join ().getStatus ().expectSuccess ();
259+ try (QuerySession session = client .createSession (SESSION_TIMEOUT ).join ().getValue ()) {
260+ session .createQuery ("DELETE FROM " + TEST_TABLE , TxMode .SERIALIZABLE_RW )
261+ .execute (this ::printQuerySetPart )
262+ .join ().getStatus ().expectSuccess ();
263+ }
230264 }
231265 }
232266
@@ -238,7 +272,6 @@ public void printQuerySetPart(QueryResultPart part) {
238272 }
239273
240274 @ Test
241- @ Ignore
242275 public void updateMultipleTablesInOneTransaction () {
243276 try (QueryClient client = QueryClient .newClient (ydbTransport ).build ()) {
244277 try (QuerySession session = client .createSession (Duration .ofSeconds (5 )).join ().getValue ()) {
0 commit comments