Skip to content

Commit 1d20303

Browse files
committed
map async response to jsoniter Any object && adding executeAsyncWait.
1 parent 6ae4ca0 commit 1d20303

File tree

3 files changed

+75
-29
lines changed

3 files changed

+75
-29
lines changed

rai-sdk-examples/src/main/java/com/relationalai/examples/ExecuteAsync.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.relationalai.Client;
2323
import com.relationalai.Config;
2424
import com.relationalai.HttpError;
25+
import com.relationalai.Json;
2526

2627
public class ExecuteAsync implements Runnable {
2728
boolean readonly;
@@ -65,6 +66,6 @@ public void run(String[] args) throws HttpError, InterruptedException, IOExcepti
6566
if (source == null)
6667
return; // nothing to execute
6768
var rsp = client.executeAsync(database, engine, source, readonly);
68-
System.out.println(rsp);
69+
Json.print(rsp);
6970
}
7071
}

rai-sdk/src/main/java/com/relationalai/Client.java

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.relationalai;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.jsoniter.any.Any;
2021
import org.apache.arrow.memory.RootAllocator;
2122
import org.apache.arrow.vector.VectorSchemaRoot;
2223
import org.apache.arrow.vector.ipc.ArrowStreamReader;
@@ -335,10 +336,10 @@ private String parseArrowResponse(ByteArrayOutputStream out) throws IOException
335336
result.put(f.getName(), String.valueOf(readBatch.getVector(f)));
336337
}
337338
}
339+
// serialize map to json string
338340
return new ObjectMapper().writeValueAsString(result);
339341
}
340342

341-
342343
static void printRequest(HttpRequest request) {
343344
System.out.printf("%s %s\n", request.method(), request.uri());
344345
for (Map.Entry<String, List<String>> entry : request.headers().map().entrySet()) {
@@ -712,52 +713,90 @@ public TransactionResult execute(
712713
return Json.deserialize(rsp, TransactionResult.class);
713714
}
714715

715-
// TODO: map string to result object
716-
public String executeAsync(
716+
public HashMap<String, Any> executeAsyncWait(
717+
String database, String engine, String source, boolean readonly) throws HttpError, IOException, InterruptedException {
718+
return executeAsyncWait(database, engine, source, readonly, null);
719+
}
720+
721+
public HashMap<String, Any> executeAsyncWait(
722+
String database, String engine,
723+
String source, boolean readonly,
724+
Map<String, String> inputs) throws HttpError, IOException, InterruptedException {
725+
String transactionId = null;
726+
var output = new HashMap<String, Any>();
727+
728+
var rsp = executeAsync(database, engine, source, readonly, inputs);
729+
730+
try {
731+
transactionId = rsp.asMap().get("id").toString();
732+
} catch (ClassCastException e) {
733+
transactionId = rsp.get(0).asMap().get("id").toString();
734+
}
735+
736+
var state = getTransaction(transactionId)
737+
.asMap()
738+
.get("transaction")
739+
.asMap()
740+
.get("state")
741+
.toString();
742+
743+
while (!"COMPLETED".equals(state)){
744+
Thread.sleep(2000);
745+
746+
state = getTransaction(transactionId)
747+
.asMap()
748+
.get("transaction")
749+
.asMap()
750+
.get("state")
751+
.toString();
752+
}
753+
754+
output.put("results", getTransactionResults(transactionId));
755+
output.put("metadata", getTransactionMetadata(transactionId));
756+
output.put("problems", getTransactionProblems(transactionId));
757+
758+
return output;
759+
}
760+
761+
public Any executeAsync(
717762
String database, String engine, String source, boolean readonly) throws HttpError, IOException, InterruptedException {
718763
return executeAsync(database, engine, source, readonly, null);
719764
}
720765

721-
// TODO: map string to result object
722-
public String executeAsync(
766+
public Any executeAsync(
723767
String database, String engine,
724768
String source, boolean readonly,
725769
Map<String, String> inputs) throws HttpError, IOException, InterruptedException {
726770
var tx = new TransactionAsync(database, engine, source, readonly);
727771
var action = DbAction.makeQueryAction(source, inputs);
728772
var body = tx.payload(action);
729773
var rsp = post(PATH_TRANSACTIONS, tx.queryParams(), body);
730-
return rsp;
774+
return Json.deserialize(rsp);
731775
}
732776

733-
// TODO: map string to result object
734-
public String getTransaction(String id) throws HttpError, IOException, InterruptedException {
777+
public Any getTransaction(String id) throws HttpError, IOException, InterruptedException {
735778
var rsp = get(String.format("%s/%s", PATH_TRANSACTIONS, id));
736-
return rsp;
779+
return Json.deserialize(rsp);
737780
}
738781

739-
// TODO: map string to result object
740-
public String getTransactions() throws HttpError, IOException, InterruptedException {
782+
public Any getTransactions() throws HttpError, IOException, InterruptedException {
741783
var rsp = get(PATH_TRANSACTIONS);
742-
return rsp;
784+
return Json.deserialize(rsp);
743785
}
744786

745-
// TODO: map string to result object
746-
public String getTransactionResults(String id) throws HttpError, IOException, InterruptedException {
787+
public Any getTransactionResults(String id) throws HttpError, IOException, InterruptedException {
747788
var rsp = get(String.format("%s/%s/results", PATH_TRANSACTIONS, id));
748-
return rsp;
789+
return Json.deserialize(rsp);
749790
}
750791

751-
// TODO: map string to result object
752-
public String getTransactionMetadata(String id) throws HttpError, IOException, InterruptedException {
792+
public Any getTransactionMetadata(String id) throws HttpError, IOException, InterruptedException {
753793
var rsp = get(String.format("%s/%s/metadata", PATH_TRANSACTIONS, id));
754-
return rsp;
794+
return Json.deserialize(rsp);
755795
}
756796

757-
// TODO: map string to result object
758-
public String getTransactionProblems(String id) throws HttpError, IOException, InterruptedException {
797+
public Any getTransactionProblems(String id) throws HttpError, IOException, InterruptedException {
759798
var rsp = get(String.format("%s/%s/problems", PATH_TRANSACTIONS, id));
760-
return rsp;
799+
return Json.deserialize(rsp);
761800
}
762801

763802
// EDBs

rai-sdk/src/test/java/com/relationalai/ExecuteAsyncTest.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,19 @@ void testExecuteAsync() throws HttpError, InterruptedException, IOException {
3434

3535
var query = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}";
3636

37-
var rsp = client.executeAsync(databaseName, engineName, query, true);
38-
// TODO: add more tests
39-
// - check transaction completion
40-
// - check transaction results
41-
// - check transaction metadata
42-
// - check transaction problems
43-
assertNotNull(rsp);
37+
var rsp = client.executeAsyncWait(databaseName, engineName, query, true);
38+
assertEquals(
39+
rsp.get("results").toString(),
40+
"[{\"v1\":\"[1, 2, 3, 4, 5]\",\"v2\":\"[1, 4, 9, 16, 25]\",\"v3\":\"[1, 8, 27, 64, 125]\",\"v4\":\"[1, 16, 81, 256, 625]\"}]"
41+
);
42+
assertEquals(
43+
rsp.get("metadata").toString(),
44+
"[{\"relationId\":\"/:output/Int64/Int64/Int64/Int64\",\"types\":[\":output\",\"Int64\",\"Int64\",\"Int64\",\"Int64\"]}]"
45+
);
46+
assertEquals(
47+
rsp.get("problems").toString(),
48+
"[]"
49+
);
4450
}
4551

4652
@AfterAll

0 commit comments

Comments
 (0)