Skip to content

Commit 6eff1c6

Browse files
committed
[CS API] Fixes in command status serialization + better error handling
1 parent 6ff4508 commit 6eff1c6

File tree

7 files changed

+140
-41
lines changed

7 files changed

+140
-41
lines changed

sensorhub-core/src/main/java/org/sensorhub/api/command/CommandResult.java

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.sensorhub.utils.ObjectUtils;
2222
import org.vast.ogc.xlink.IXlinkReference;
2323
import org.vast.util.Asserts;
24+
import com.google.common.collect.ImmutableList;
2425
import net.opengis.swe.v20.DataBlock;
2526

2627

@@ -48,67 +49,92 @@ protected CommandResult()
4849

4950
/**
5051
* Add an entire datastream to the command result
51-
* @param dataStreamID The ID of the datastream that contains the result
52+
* @param dataStreamID The internal ID of the datastream that contains the result
5253
* @return The result object
5354
*/
5455
public static ICommandResult withDatastream(BigId dataStreamID)
5556
{
5657
Asserts.checkNotNull(dataStreamID, BigId.class);
5758

5859
var res = new CommandResult();
59-
if (res.dsIDs == null)
60-
res.dsIDs = new ArrayList<>();
61-
res.dsIDs.add(dataStreamID);
60+
res.dsIDs = ImmutableList.of(dataStreamID);
6261
return res;
6362
}
6463

6564

6665
/**
67-
* Add an observation to the command result
68-
* @param obsID The internal ID of an observation to add to the result
66+
* Create a command result with multiple datastream references
67+
* @param dataStreamIDs The internal IDs of the datastreams
68+
* @return The result object
69+
*/
70+
public static ICommandResult withDatastreams(Collection<BigId> dataStreamIDs)
71+
{
72+
Asserts.checkNotNull(dataStreamIDs, Collection.class);
73+
74+
var res = new CommandResult();
75+
res.dsIDs = new ArrayList<>();
76+
res.dsIDs.addAll(dataStreamIDs);
77+
return res;
78+
}
79+
80+
81+
/**
82+
* Create a command result with a single observation
83+
* @param obsID The internal ID of the observation
6984
* @return The result object
7085
*/
7186
public static ICommandResult withObservation(BigId obsID)
7287
{
7388
Asserts.checkNotNull(obsID, BigId.class);
7489

7590
var res = new CommandResult();
76-
if (res.obsIDs == null)
77-
res.obsIDs = new ArrayList<>();
78-
res.obsIDs.add(obsID);
91+
res.obsIDs = ImmutableList.of(obsID);
92+
return res;
93+
}
94+
95+
96+
/**
97+
* Create a command result with multiple observation references
98+
* @param obsIDs The internal IDs of the observations
99+
* @return The result object
100+
*/
101+
public static ICommandResult withObservations(Collection<BigId> obsIDs)
102+
{
103+
Asserts.checkNotNull(obsIDs, Collection.class);
104+
105+
var res = new CommandResult();
106+
res.obsIDs = new ArrayList<>();
107+
res.obsIDs.addAll(obsIDs);
79108
return res;
80109
}
81110

82111

83112
/**
84-
* Add data to the inline command result
85-
* @param data The data record to be added
113+
* Create a command result with single record
114+
* @param data The data record to add
86115
* @return The result object
87116
*/
88117
public static ICommandResult withData(DataBlock data)
89118
{
90119
Asserts.checkNotNull(data, DataBlock.class);
91120

92121
var res = new CommandResult();
93-
if (res.inlineRecords == null)
94-
res.inlineRecords = new ArrayList<>();
95-
res.inlineRecords.add(data);
122+
res.inlineRecords = ImmutableList.of(data);
96123
return res;
97124
}
98125

99126

100127
/**
101128
* Add multiple data records to the inline command result
102-
* @param records List of data records to be added
129+
* @param records The list of data records to add
103130
* @return The result object
104131
*/
105132
public static ICommandResult withData(Collection<DataBlock> records)
106133
{
107134
Asserts.checkNotNull(records, Collection.class);
108135

109136
var res = new CommandResult();
110-
if (res.inlineRecords == null)
111-
res.inlineRecords = new ArrayList<>();
137+
res.inlineRecords = new ArrayList<>();
112138
res.inlineRecords.addAll(records);
113139
return res;
114140
}

sensorhub-core/src/main/java/org/sensorhub/impl/system/CommandStreamTransactionHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,16 @@ public void submitCommand(long correlationID, ICommandData cmd, Subscriber<IComm
340340
if (subscriber != null)
341341
connectStatusReceiver(correlationID, subscriber);
342342

343-
// reject command if no command receiver is listening
343+
// if this is connected to a local driver, reject command if no command receiver is listening
344344
var cmdPublisher = getCommandDataEventPublisher();
345-
if (cmdPublisher.getNumberOfSubscribers() == 0)
346-
{
347-
publishStatusEvent(
348-
correlationID,
349-
CommandStatus.rejected(BigId.NONE, "Receiving system is disabled"));
350-
return;
345+
if (this.rootHandler instanceof SystemRegistryTransactionHandler) {
346+
if (cmdPublisher.getNumberOfSubscribers() == 0)
347+
{
348+
publishStatusEvent(
349+
correlationID,
350+
CommandStatus.rejected(BigId.NONE, "Receiving system is disabled"));
351+
return;
352+
}
351353
}
352354

353355
// send command to bus

sensorhub-core/src/main/java/org/sensorhub/impl/system/SystemDriverTransactionHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.Flow.Subscriber;
2323
import java.util.concurrent.Flow.Subscription;
2424
import org.sensorhub.api.command.CommandEvent;
25+
import org.sensorhub.api.command.CommandStatus;
2526
import org.sensorhub.api.command.ICommandReceiver;
2627
import org.sensorhub.api.command.IStreamingControlInterface;
2728
import org.sensorhub.api.command.IStreamingControlInterfaceWithResult;
@@ -364,6 +365,7 @@ protected void connectControlInput(IStreamingControlInterface controlInput, Comm
364365
{
365366
csHandler.connectCommandReceiver(new Subscriber<CommandEvent>() {
366367
Subscription sub;
368+
static final String ERROR_MESSAGE = "Error sending command to {}. Canceling command receiver subscription";
367369

368370
@Override
369371
public void onSubscribe(Subscription sub)
@@ -383,20 +385,27 @@ public void onNext(CommandEvent event)
383385
.thenAccept(status -> {
384386
csHandler.sendStatus(event.getCorrelationID(), status);
385387
sub.request(1);
388+
})
389+
.exceptionally(e -> {
390+
DefaultSystemRegistry.log.error(ERROR_MESSAGE, csHandler.csInfo.getFullName(), e);
391+
sub.cancel();
392+
csHandler.sendStatus(event.getCorrelationID(), CommandStatus.failed(event.getCommand().getID(), "Internal error processing command"));
393+
return null; // return type is Void
386394
});
387395
}
388396
catch (Exception e)
389397
{
390-
onError(e);
391-
sub.request(1);
398+
DefaultSystemRegistry.log.error(ERROR_MESSAGE, csHandler.csInfo.getFullName(), e);
399+
sub.cancel();
392400
}
393401
});
394402
}
395403

396404
@Override
397405
public void onError(Throwable e)
398406
{
399-
DefaultSystemRegistry.log.error("Error dispatching commands to {} / {}", driver.getName(), controlInput.getName(), e);
407+
DefaultSystemRegistry.log.error("Error dispatching commands to {}. "
408+
+ "No more commands will be processed.", csHandler.csInfo.getFullName(), e);
400409
}
401410

402411
@Override

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/HandlerContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.sensorhub.impl.service.consys.property.PropertyStoreWrapper;
4343
import org.sensorhub.impl.service.consys.system.SystemStoreWrapper;
4444
import org.sensorhub.impl.service.consys.task.CommandStoreWrapper;
45+
import org.sensorhub.impl.service.consys.task.CommandStreamIdEncoder;
4546
import org.sensorhub.impl.service.consys.task.CommandStreamStoreWrapper;
4647
import org.vast.util.Asserts;
4748

@@ -308,7 +309,10 @@ public IdEncoder getObsIdEncoder()
308309

309310
public IdEncoder getCommandStreamIdEncoder()
310311
{
311-
return idEncoders.getCommandStreamIdEncoder();
312+
return new CommandStreamIdEncoder(
313+
idEncoders.getCommandStreamIdEncoder(),
314+
curieResolver,
315+
commandStreamStore);
312316
}
313317

314318

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@
9898
import org.vast.util.BaseBuilder;
9999
import com.google.common.base.Strings;
100100
import com.google.common.net.HttpHeaders;
101+
import com.google.gson.JsonObject;
102+
import com.google.gson.JsonParser;
103+
import com.google.gson.JsonSyntaxException;
101104
import com.google.gson.stream.JsonReader;
102105
import com.google.gson.stream.JsonWriter;
103106

@@ -1132,7 +1135,6 @@ public CompletableFuture<ICommandStatus> sendCommand(String controlstreamId, ICo
11321135
respCtx.setData(respCtxData);
11331136
respCtx.setFormat(ResourceFormat.JSON);
11341137
var binding = new CommandStatusBindingJson(respCtx, new IdEncodersBase32(), true, null);
1135-
binding.startCollection();
11361138
return binding.deserialize();
11371139
}
11381140
catch (IOException e)
@@ -1280,8 +1282,13 @@ protected <T> CompletableFuture<T> sendPostRequestAndReadResponse(URI collection
12801282
var is = new ByteArrayInputStream(body);
12811283
return responseBodyMapper.apply(is);
12821284
} else {
1283-
var error = new String(body);
1284-
throw new CompletionException("HTTP error " + resp.statusCode() + ": " + error, null);
1285+
var bodyStr = new String(body);
1286+
try {
1287+
var jsonError = (JsonObject)JsonParser.parseString(bodyStr);
1288+
throw new CompletionException(jsonError.get("message").getAsString(), null);
1289+
} catch (JsonSyntaxException e) {
1290+
throw new CompletionException("HTTP error " + resp.statusCode() + ": " + bodyStr, null);
1291+
}
12851292
}
12861293
});
12871294
};

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStatusBindingJson.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
import java.io.IOException;
1818
import java.time.OffsetDateTime;
1919
import java.time.format.DateTimeParseException;
20+
import java.util.ArrayList;
2021
import java.util.Collection;
22+
import org.sensorhub.api.command.CommandResult;
2123
import org.sensorhub.api.command.CommandStatus;
24+
import org.sensorhub.api.command.ICommandResult;
2225
import org.sensorhub.api.command.ICommandStatus;
2326
import org.sensorhub.api.command.ICommandStatus.CommandStatusCode;
2427
import org.sensorhub.api.common.BigId;
@@ -31,12 +34,14 @@
3134
import org.sensorhub.impl.service.consys.task.CommandStatusHandler.CommandStatusHandlerContextData;
3235
import org.vast.cdm.common.DataStreamParser;
3336
import org.vast.cdm.common.DataStreamWriter;
37+
import org.vast.swe.fast.JsonDataParserGson;
3438
import org.vast.swe.fast.JsonDataWriterGson;
3539
import org.vast.util.ReaderException;
3640
import org.vast.util.TimeExtent;
3741
import com.google.gson.stream.JsonReader;
3842
import com.google.gson.stream.JsonToken;
3943
import com.google.gson.stream.JsonWriter;
44+
import net.opengis.swe.v20.DataBlock;
4045

4146

4247
public class CommandStatusBindingJson extends ResourceBindingJson<BigId, ICommandStatus>
@@ -95,7 +100,7 @@ else if ("statusCode".equals(propName))
95100
var code = CommandStatusCode.valueOf(reader.nextString());
96101
status.withStatusCode(code);
97102
}
98-
else if ("progress".equals(propName))
103+
else if ("percentCompletion".equals(propName))
99104
{
100105
var percent = reader.nextInt();
101106
status.withProgress(percent);
@@ -105,6 +110,47 @@ else if ("message".equals(propName))
105110
var msg = reader.nextString().trim();
106111
status.withMessage(msg);
107112
}
113+
else if ("results".equals(propName))
114+
{
115+
var dsIdList = new ArrayList<BigId>();
116+
var obsIdList = new ArrayList<BigId>();
117+
var recList = new ArrayList<DataBlock>();
118+
119+
reader.beginArray();
120+
while (reader.hasNext())
121+
{
122+
reader.beginObject();
123+
124+
while (reader.hasNext())
125+
{
126+
propName = reader.nextName();
127+
128+
if (propName.equals("data")) {
129+
var resultReader = new JsonDataParserGson(reader);
130+
resultReader.setDataComponents(contextData.csInfo.getResultStructure());
131+
recList.add(resultReader.parseNextBlock());
132+
}
133+
else
134+
reader.skipValue();
135+
136+
// TODO add support for observation and datastream references
137+
}
138+
139+
reader.endObject();
140+
}
141+
reader.endArray();
142+
143+
ICommandResult result = null;
144+
if (!recList.isEmpty())
145+
result = CommandResult.withData(recList);
146+
else if (!obsIdList.isEmpty())
147+
result = CommandResult.withObservations(obsIdList);
148+
else if (!dsIdList.isEmpty())
149+
result = CommandResult.withDatastreams(dsIdList);
150+
151+
if (result != null)
152+
status.withResult(result);
153+
}
108154
else
109155
reader.skipValue();
110156
}
@@ -200,6 +246,7 @@ else if (status.getResult().getInlineRecords() != null)
200246
var resultWriter = new JsonDataWriterGson(writer);
201247
resultWriter.setDataComponents(contextData.csInfo.getResultStructure());
202248

249+
writer.setSerializeNulls(true);
203250
for (var rec: result.getInlineRecords())
204251
{
205252
writer.beginObject();
@@ -208,14 +255,12 @@ else if (status.getResult().getInlineRecords() != null)
208255
resultWriter.write(rec);
209256
writer.endObject();
210257
}
258+
writer.setSerializeNulls(false);
211259
}
212260

213261
writer.endArray();
214262
}
215263

216-
if (status.getMessage() != null)
217-
writer.name("message").value(status.getMessage());
218-
219264
writer.endObject();
220265
writer.flush();
221266
}

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStatusHandler.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,27 @@ protected ResourceBinding<BigId, ICommandStatus> getBinding(RequestContext ctx,
8989
ctx.setData(contextData);
9090

9191
// try to fetch command stream since it's needed to configure binding
92-
var dsID = ctx.getParentID();
93-
if (dsID != null)
92+
Asserts.checkState(ctx.getParentID() != null);
93+
BigId csID = null;
94+
if (ctx.getParentRef().type instanceof CommandHandler)
9495
{
95-
var parentType = ctx.getParentRef().type;
96-
if (parentType instanceof CommandStreamHandler)
97-
contextData.csInfo = db.getCommandStreamStore().get(new CommandStreamKey(dsID));
96+
var cmd = db.getCommandStore().get(ctx.getParentID());
97+
csID = cmd.getCommandStreamID();
9898
}
99+
else if (ctx.getParentRef().type instanceof CommandStreamHandler)
100+
csID = ctx.getParentID();
101+
102+
Asserts.checkNotNull(csID, BigId.class);
103+
contextData.csInfo = db.getCommandStreamStore().get(new CommandStreamKey(csID));
104+
Asserts.checkNotNull(contextData.csInfo, ICommandStreamInfo.class);
99105

100106
if (forReading)
101107
{
102108
// when ingesting status, command stream needs to be known at this stage
103109
Asserts.checkNotNull(contextData.csInfo, ICommandStreamInfo.class);
104110

105111
// create transaction handler here so it can be reused multiple times
106-
contextData.streamID = dsID;
112+
contextData.streamID = csID;
107113
contextData.csHandler = transactionHandler.getCommandStreamHandler(contextData.streamID);
108114
if (contextData.csHandler == null)
109115
throw ServiceErrors.notWritable();

0 commit comments

Comments
 (0)