Skip to content

Commit 6ff4508

Browse files
committed
[CSAPI] Implement new serialization for command results
1 parent c2c75ec commit 6ff4508

File tree

7 files changed

+228
-71
lines changed

7 files changed

+228
-71
lines changed

sensorhub-core/src/main/java/org/sensorhub/impl/system/SystemTransactionHandler.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,15 @@ public synchronized DataStreamTransactionHandler addOrUpdateDataStream(IDataStre
364364
log.debug("Added datastream {}#{} with valid time {}", sysUID, outputName, validTime);
365365
}
366366

367+
// compare properties of old and new datastreams
368+
var sameName = Objects.equals(oldDsInfo.getName(), dsInfo.getName());
369+
var sameDescription = Objects.equals(oldDsInfo.getDescription(), dsInfo.getDescription());
370+
var sameRecordStruct = DataComponentChecks.checkStructEquals(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
371+
var sameRecordEncoding = DataComponentChecks.checkEncodingEquals(oldDsInfo.getRecordEncoding(), dsInfo.getRecordEncoding());
372+
var recordStructCompatible = sameRecordStruct || DataComponentChecks.checkStructCompatible(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
373+
367374
// if observations were already recorded and structure has changed, create a new datastream
368-
if (hasObs &&
369-
(!DataComponentChecks.checkStructCompatible(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure()) ||
370-
!DataComponentChecks.checkEncodingEquals(oldDsInfo.getRecordEncoding(), dsInfo.getRecordEncoding())))
375+
if (hasObs && (!recordStructCompatible || !sameRecordEncoding))
371376
{
372377
// set validTime to current time
373378
dsInfo = DataStreamInfo.Builder.from(dsInfo)
@@ -380,10 +385,7 @@ public synchronized DataStreamTransactionHandler addOrUpdateDataStream(IDataStre
380385
}
381386

382387
// if something else has changed, update existing datastream
383-
else if (!DataComponentChecks.checkStructEquals(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure()) ||
384-
!DataComponentChecks.checkEncodingEquals(oldDsInfo.getRecordEncoding(), dsInfo.getRecordEncoding()) ||
385-
!Objects.equals(oldDsInfo.getName(), dsInfo.getName()) ||
386-
!Objects.equals(oldDsInfo.getDescription(), dsInfo.getDescription()))
388+
else if (!sameRecordStruct || !sameRecordEncoding || !sameName || !sameDescription)
387389
{
388390
var dsHandler = new DataStreamTransactionHandler(dsKey, oldDsInfo, rootHandler);
389391
dsHandler.update(dsInfo);
@@ -487,11 +489,21 @@ public synchronized CommandStreamTransactionHandler addOrUpdateCommandStream(ICo
487489
addedEvent = new CommandStreamAddedEvent(sysUID, commandName);
488490
log.debug("Added command stream {}#{} with valid time {}", sysUID, commandName, validTime);
489491
}
490-
492+
493+
// compare properties of old and new command streams
494+
var sameName = Objects.equals(oldCsInfo.getName(), csInfo.getName());
495+
var sameDescription = Objects.equals(oldCsInfo.getDescription(), csInfo.getDescription());
496+
497+
var sameParamStruct = DataComponentChecks.checkStructEquals(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
498+
var sameParamEncoding = DataComponentChecks.checkEncodingEquals(oldCsInfo.getRecordEncoding(), csInfo.getRecordEncoding());
499+
var paramStructCompatible = sameParamStruct || DataComponentChecks.checkStructCompatible(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
500+
501+
var sameResultStruct = DataComponentChecks.checkStructEqualsNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
502+
var sameResultEncoding = DataComponentChecks.checkEncodingEqualsNullAllowed(oldCsInfo.getResultEncoding(), csInfo.getResultEncoding());
503+
var resultStructCompatible = sameResultStruct || DataComponentChecks.checkStructCompatibleNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
504+
491505
// if observations were already recorded and structure has changed, create a new datastream
492-
if (hasCommands &&
493-
(!DataComponentChecks.checkStructCompatible(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure()) ||
494-
!DataComponentChecks.checkEncodingEquals(oldCsInfo.getRecordEncoding(), csInfo.getRecordEncoding())))
506+
if (hasCommands && (!paramStructCompatible || !sameParamEncoding || !resultStructCompatible || !sameResultEncoding))
495507
{
496508
// set validTime to current time
497509
csInfo = CommandStreamInfo.Builder.from(csInfo)
@@ -504,10 +516,7 @@ public synchronized CommandStreamTransactionHandler addOrUpdateCommandStream(ICo
504516
}
505517

506518
// if something else has changed, update existing command stream
507-
else if (!DataComponentChecks.checkStructEquals(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure()) ||
508-
!DataComponentChecks.checkEncodingEquals(oldCsInfo.getRecordEncoding(), csInfo.getRecordEncoding()) ||
509-
!Objects.equals(oldCsInfo.getName(), csInfo.getName()) ||
510-
!Objects.equals(oldCsInfo.getDescription(), csInfo.getDescription()))
519+
else if (!sameParamStruct || !sameParamEncoding || !sameResultStruct || !sameResultEncoding || !sameName || !sameDescription)
511520
{
512521
var csHandler = new CommandStreamTransactionHandler(csKey, oldCsInfo, rootHandler);
513522
csHandler.update(csInfo);

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@
5050
import java.util.stream.Stream;
5151
import java.util.stream.StreamSupport;
5252
import net.opengis.swe.v20.BinaryEncoding;
53+
import org.sensorhub.api.command.CommandStatus;
5354
import org.sensorhub.api.command.CommandStreamInfo;
5455
import org.sensorhub.api.command.ICommandData;
56+
import org.sensorhub.api.command.ICommandStatus;
5557
import org.sensorhub.api.command.ICommandStreamInfo;
5658
import org.sensorhub.api.common.BigId;
5759
import org.sensorhub.api.data.DataStreamInfo;
@@ -80,6 +82,12 @@
8082
import org.sensorhub.impl.service.consys.resource.ResourceLink;
8183
import org.sensorhub.impl.service.consys.system.SystemBindingGeoJson;
8284
import org.sensorhub.impl.service.consys.system.SystemBindingSmlJson;
85+
import org.sensorhub.impl.service.consys.task.CommandBindingJson;
86+
import org.sensorhub.impl.service.consys.task.CommandBindingSweCommon;
87+
import org.sensorhub.impl.service.consys.task.CommandHandler;
88+
import org.sensorhub.impl.service.consys.task.CommandResultBindingSweCommon;
89+
import org.sensorhub.impl.service.consys.task.CommandStatusBindingJson;
90+
import org.sensorhub.impl.service.consys.task.CommandStatusHandler.CommandStatusHandlerContextData;
8391
import org.sensorhub.impl.service.consys.task.CommandStreamBindingJson;
8492
import org.sensorhub.impl.service.consys.task.CommandStreamSchemaBindingJson;
8593
import org.sensorhub.utils.Lambdas;
@@ -101,8 +109,9 @@ public class ConSysApiClient
101109
static final String SYSTEMS_COLLECTION = "systems";
102110
static final String DEPLOYMENTS_COLLECTION = "deployments";
103111
static final String DATASTREAMS_COLLECTION = "datastreams";
104-
static final String CONTROLS_COLLECTION = "controlstreams";
105112
static final String OBSERVATIONS_COLLECTION = "observations";
113+
static final String CONTROLSTREAMS_COLLECTION = "controlstreams";
114+
static final String COMMANDS_COLLECTION = "commands";
106115
static final String SUBSYSTEMS_COLLECTION = "subsystems";
107116
static final String SF_COLLECTION = "samplingFeatures";
108117

@@ -842,7 +851,7 @@ public CompletableFuture<String> addControlStream(String systemId, ICommandStrea
842851
binding.serializeCreate(cmdstream);
843852

844853
return sendPostRequest(
845-
endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION),
854+
endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLSTREAMS_COLLECTION),
846855
ResourceFormat.JSON,
847856
buffer.toByteArray());
848857
}
@@ -885,7 +894,7 @@ protected void endJsonCollection(JsonWriter writer, Collection<ResourceLink> lin
885894
binding.endCollection(Collections.emptyList());
886895

887896
return sendBatchPostRequest(
888-
endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLS_COLLECTION),
897+
endpoint.resolve(SYSTEMS_COLLECTION + "/" + systemId + "/" + CONTROLSTREAMS_COLLECTION),
889898
ResourceFormat.JSON,
890899
buffer.toByteArray());
891900
}
@@ -898,7 +907,7 @@ protected void endJsonCollection(JsonWriter writer, Collection<ResourceLink> lin
898907

899908
public CompletableFuture<ICommandStreamInfo> getControlStreamById(String id, ResourceFormat format, boolean fetchSchema)
900909
{
901-
var cf1 = sendGetRequest(endpoint.resolve(CONTROLS_COLLECTION + "/" + id), format, body -> {
910+
var cf1 = sendGetRequest(endpoint.resolve(CONTROLSTREAMS_COLLECTION + "/" + id), format, body -> {
902911
try
903912
{
904913
var ctx = new RequestContext(body);
@@ -933,7 +942,7 @@ public CompletableFuture<ICommandStreamInfo> getControlStreamById(String id, Res
933942

934943
public CompletableFuture<ICommandStreamInfo> getControlStreamSchema(String id, ResourceFormat obsFormat, ResourceFormat format)
935944
{
936-
return sendGetRequest(endpoint.resolve(CONTROLS_COLLECTION + "/" + id + "/schema?obsFormat=" + obsFormat), format, body -> {
945+
return sendGetRequest(endpoint.resolve(CONTROLSTREAMS_COLLECTION + "/" + id + "/schema?obsFormat=" + obsFormat), format, body -> {
937946
try
938947
{
939948
var ctx = new RequestContext(body);
@@ -1088,9 +1097,55 @@ public Spliterator<IObsData> trySplit()
10881097
/* Commands */
10891098
/*----------*/
10901099

1091-
public CompletableFuture<String> sendCommand(String controlId, ICommandData cmd)
1100+
public CompletableFuture<ICommandStatus> sendCommand(String controlstreamId, ICommandStreamInfo cmdStream, ICommandData cmd)
10921101
{
1093-
return null;
1102+
try
1103+
{
1104+
var buffer = new ByteArrayOutputStream();
1105+
var ctx = new RequestContext(buffer);
1106+
ctx.setParent(null, controlstreamId, cmd.getCommandStreamID());
1107+
var contextData = new CommandHandler.CommandHandlerContextData();
1108+
contextData.csInfo = cmdStream;
1109+
ctx.setData(contextData);
1110+
1111+
if (cmdStream != null && cmdStream.getRecordEncoding() instanceof BinaryEncoding) {
1112+
ctx.setData(contextData);
1113+
ctx.setFormat(ResourceFormat.SWE_BINARY);
1114+
var binding = new CommandBindingSweCommon(ctx, new IdEncodersBase32(), false, null);
1115+
binding.serialize(null, cmd, false);
1116+
} else {
1117+
ctx.setFormat(ResourceFormat.JSON);
1118+
var binding = new CommandBindingJson(ctx, new IdEncodersBase32(), false, null);
1119+
binding.serialize(null, cmd, false);
1120+
}
1121+
1122+
return sendPostRequestAndReadResponse(
1123+
endpoint.resolve(CONTROLSTREAMS_COLLECTION + "/" + controlstreamId + "/" + COMMANDS_COLLECTION),
1124+
ctx.getFormat(),
1125+
buffer.toByteArray(),
1126+
responseBody -> {
1127+
try
1128+
{
1129+
var respCtx = new RequestContext(responseBody);
1130+
var respCtxData = new CommandStatusHandlerContextData();
1131+
respCtxData.csInfo = cmdStream;
1132+
respCtx.setData(respCtxData);
1133+
respCtx.setFormat(ResourceFormat.JSON);
1134+
var binding = new CommandStatusBindingJson(respCtx, new IdEncodersBase32(), true, null);
1135+
binding.startCollection();
1136+
return binding.deserialize();
1137+
}
1138+
catch (IOException e)
1139+
{
1140+
e.printStackTrace();
1141+
throw new CompletionException(e);
1142+
}
1143+
});
1144+
}
1145+
catch (IOException e)
1146+
{
1147+
throw new IllegalStateException("Error initializing binding", e);
1148+
}
10941149
}
10951150

10961151

@@ -1202,6 +1257,45 @@ protected CompletableFuture<String> sendPostRequest(URI collectionUri, ResourceF
12021257
}
12031258

12041259

1260+
protected <T> CompletableFuture<T> sendPostRequestAndReadResponse(URI collectionUri, ResourceFormat format, byte[] requestBody, Function<InputStream, T> responseBodyMapper)
1261+
{
1262+
//if (!isHttpClientAvailable)
1263+
// return sendPostRequestFallback(collectionUri, format, body);
1264+
1265+
var builder = HttpRequest.newBuilder()
1266+
.uri(collectionUri)
1267+
.POST(HttpRequest.BodyPublishers.ofByteArray(requestBody))
1268+
.header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType())
1269+
.header(HttpHeaders.CONTENT_TYPE, format.getMimeType());
1270+
1271+
if (token != null)
1272+
builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token);
1273+
1274+
1275+
var req = builder.build();
1276+
BodyHandler<T> bodyHandler = resp -> {
1277+
BodySubscriber<byte[]> upstream = BodySubscribers.ofByteArray();
1278+
return BodySubscribers.mapping(upstream, body -> {
1279+
if (resp.statusCode() == 200) {
1280+
var is = new ByteArrayInputStream(body);
1281+
return responseBodyMapper.apply(is);
1282+
} else {
1283+
var error = new String(body);
1284+
throw new CompletionException("HTTP error " + resp.statusCode() + ": " + error, null);
1285+
}
1286+
});
1287+
};
1288+
1289+
return http.sendAsync(req, bodyHandler)
1290+
.thenApply(resp -> {
1291+
if (resp.statusCode() == 200)
1292+
return resp.body();
1293+
else
1294+
throw new CompletionException("HTTP error " + resp.statusCode(), null);
1295+
});
1296+
}
1297+
1298+
12051299
/**
12061300
* Fallback method for sending requests using HttpURLConnection.
12071301
* This is used when HttpClient is not available (e.g., on Android).

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandBindingSweCommon.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public CommandBindingSweCommon(RequestContext ctx, IdEncoders idEncoders, boolea
5353
super(ctx, idEncoders);
5454
this.contextData = (CommandHandlerContextData)ctx.getData();
5555

56-
var dsInfo = contextData.dsInfo;
56+
var dsInfo = contextData.csInfo;
5757
if (forReading)
5858
{
5959
var is = ctx.getInputStream();
6060
paramsReader = getSweCommonParser(dsInfo, is, ctx.getFormat());
61-
timeStampIndexer = SWEHelper.getTimeStampIndexer(contextData.dsInfo.getRecordStructure());
61+
timeStampIndexer = SWEHelper.getTimeStampIndexer(contextData.csInfo.getRecordStructure());
6262

6363
var user = ctx.getSecurityHandler().getCurrentUser();
6464
this.userID = user != null ? user.getId() : "api";

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandHandler.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -387,23 +387,10 @@ protected BigId addEntry(RequestContext ctx, ICommandData cmd) throws DataStoreE
387387
// serialize status info we received in response
388388
if (ctx.getOutputStream() != null)
389389
{
390-
if (status.getResult() != null)
391-
{
392-
// if there is a result, just write the result
393-
var resultHandler = (CommandResultHandler)subResources.get(CommandResultHandler.NAMES[0]);
394-
var resultBinding = resultHandler.getBinding(ctx, false);
395-
resultBinding.startCollection();
396-
resultBinding.serialize(null, status, false);
397-
resultBinding.endCollection(null);
398-
}
399-
else
400-
{
401-
// else write the complete status report
402-
var statusHandler = (CommandStatusHandler)subResources.get(CommandStatusHandler.NAMES[0]);
403-
ctx.setResponseFormat(ResourceFormat.JSON);
404-
var statusBinding = statusHandler.getBinding(ctx, false);
405-
statusBinding.serialize(null, status, false);
406-
}
390+
var statusHandler = (CommandStatusHandler)subResources.get(CommandStatusHandler.NAMES[0]);
391+
ctx.setResponseFormat(ResourceFormat.JSON);
392+
var statusBinding = statusHandler.getBinding(ctx, false);
393+
statusBinding.serialize(null, status, false);
407394
}
408395

409396
return status.getCommandID();

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandResultBindingSweCommon.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
import java.io.InputStream;
1919
import java.io.OutputStream;
2020
import java.util.Collection;
21+
import org.sensorhub.api.command.CommandResult;
22+
import org.sensorhub.api.command.CommandStatus;
23+
import org.sensorhub.api.command.ICommandResult;
2124
import org.sensorhub.api.command.ICommandStatus;
2225
import org.sensorhub.api.command.ICommandStreamInfo;
2326
import org.sensorhub.api.common.BigId;
2427
import org.sensorhub.api.common.IdEncoders;
2528
import org.sensorhub.api.database.IObsSystemDatabase;
29+
import org.sensorhub.impl.service.consys.ResourceParseException;
2630
import org.sensorhub.impl.service.consys.SWECommonUtils;
27-
import org.sensorhub.impl.service.consys.ServiceErrors;
2831
import org.sensorhub.impl.service.consys.resource.PropertyFilter;
2932
import org.sensorhub.impl.service.consys.resource.RequestContext;
3033
import org.sensorhub.impl.service.consys.resource.ResourceBinding;
@@ -33,6 +36,9 @@
3336
import org.sensorhub.impl.service.consys.task.CommandStatusHandler.CommandStatusHandlerContextData;
3437
import org.vast.cdm.common.DataStreamParser;
3538
import org.vast.cdm.common.DataStreamWriter;
39+
import org.vast.swe.fast.JsonDataParserGson;
40+
import org.vast.swe.fast.JsonDataWriterGson;
41+
import com.google.gson.stream.JsonWriter;
3642

3743

3844
public class CommandResultBindingSweCommon extends ResourceBinding<BigId, ICommandStatus>
@@ -75,22 +81,50 @@ else if (ctx.getFormat().equals(ResourceFormat.SWE_XML))
7581
}
7682

7783

84+
public CommandResultBindingSweCommon(RequestContext ctx, IdEncoders idEncoders, boolean forReading, IObsSystemDatabase db, JsonWriter writer) throws IOException
85+
{
86+
super(ctx, idEncoders);
87+
this.contextData = (CommandStatusHandlerContextData)ctx.getData();
88+
var csInfo = contextData.csInfo;
89+
90+
resultWriter = new JsonDataWriterGson(writer);
91+
resultWriter.setDataComponents(csInfo.getResultStructure());
92+
93+
ctx.setResponseContentType(ctx.getFormat().getMimeType());
94+
}
95+
96+
7897
@Override
7998
public ICommandStatus deserialize() throws IOException
8099
{
81-
throw ServiceErrors.notWritable();
100+
try
101+
{
102+
var rec = resultReader.parseNextBlock();
103+
if (rec == null)
104+
return null;
105+
106+
var result = CommandResult.withData(rec);
107+
return new CommandStatus.Builder()
108+
.withCommand(BigId.NONE)
109+
.withResult(result)
110+
.build();
111+
}
112+
catch (IOException e)
113+
{
114+
throw new ResourceParseException(e.getMessage());
115+
}
82116
}
83117

84118

85119
@Override
86120
public void serialize(BigId key, ICommandStatus status, boolean showLinks) throws IOException
87121
{
88122
// if embedded result
89-
var obsList = status.getResult().getObservations();
90-
if (obsList != null)
123+
var inlineRecords = status.getResult().getInlineRecords();
124+
if (inlineRecords != null)
91125
{
92-
for (var obs: obsList)
93-
resultWriter.write(obs.getResult());
126+
for (var rec: inlineRecords)
127+
resultWriter.write(rec);
94128
}
95129
}
96130

@@ -114,7 +148,11 @@ protected DataStreamWriter getSweCommonWriter(ICommandStreamInfo csInfo, OutputS
114148
@Override
115149
public void startCollection() throws IOException
116150
{
117-
resultWriter.startStream(true);
151+
if (resultReader != null && resultReader instanceof JsonDataParserGson) {
152+
((JsonDataParserGson)resultReader).setHasArrayWrapper();
153+
}
154+
else
155+
resultWriter.startStream(true);
118156
}
119157

120158

0 commit comments

Comments
 (0)