Skip to content

Commit 164c940

Browse files
committed
[Core] Enable setting deterministic valid start time for datastreams and
control streams
1 parent 579178a commit 164c940

File tree

4 files changed

+140
-75
lines changed

4 files changed

+140
-75
lines changed

sensorhub-core/src/main/java/org/sensorhub/api/command/IStreamingControlInterface.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package org.sensorhub.api.command;
1616

17+
import java.time.Instant;
1718
import java.util.concurrent.CompletableFuture;
1819
import org.sensorhub.api.command.ICommandStatus.CommandStatusCode;
1920
import org.sensorhub.api.common.BigId;
@@ -81,6 +82,18 @@ public default DataEncoding getCommandEncoding()
8182
}
8283

8384

85+
/**
86+
* Gets the control stream valid time. If null is returned, the valid start time of the
87+
* parent system will be used, or if an older version of the same control stream already
88+
* exists, current time will be used instead.
89+
* @return the timestamp to use for the valid start time of the control stream
90+
*/
91+
public default Instant getValidStartTime()
92+
{
93+
return null;
94+
}
95+
96+
8497
/**
8598
* Validates the command parameters synchronously. This is called before
8699
* the command is submitted for execution (it is used to avoid persisting invalid

sensorhub-core/src/main/java/org/sensorhub/api/data/IStreamingDataInterface.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package org.sensorhub.api.data;
1616

17+
import java.time.Instant;
1718
import org.sensorhub.api.event.IEventProducer;
1819
import net.opengis.swe.v20.DataBlock;
1920
import net.opengis.swe.v20.DataComponent;
@@ -105,4 +106,16 @@ public interface IStreamingDataInterface extends IEventProducer
105106
* @return sampling period in seconds or {@link Double#NaN} if unknown
106107
*/
107108
public double getAverageSamplingPeriod();
109+
110+
111+
/**
112+
* Gets the datastream valid time. If null is returned, the valid start time of the
113+
* parent system will be used, or if an older version of the same datastream already
114+
* exists with the parent system time, current time will be used instead.
115+
* @return the timestamp to use for the valid start time of the datastream
116+
*/
117+
public default Instant getValidStartTime()
118+
{
119+
return null;
120+
}
108121
}

sensorhub-core/src/main/java/org/sensorhub/impl/system/SystemDriverTransactionHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ protected synchronized boolean doRegister(IStreamingDataInterface output) throws
266266
var newDsHandler = addOrUpdateDataStream(
267267
output.getName(),
268268
output.getRecordDescription(),
269-
output.getRecommendedEncoding());
269+
output.getRecommendedEncoding(),
270+
output.getValidStartTime());
270271

271272
// replace and cleanup old handler
272273
var oldDsHandler = dataStreamHandlers.get(output.getName());
@@ -331,14 +332,16 @@ protected synchronized boolean doRegister(IStreamingControlInterface controlInpu
331332
controlInput.getCommandDescription(),
332333
controlInput.getCommandEncoding(),
333334
((IStreamingControlInterfaceWithResult) controlInput).getResultDescription(),
334-
((IStreamingControlInterfaceWithResult) controlInput).getResultEncoding());
335+
((IStreamingControlInterfaceWithResult) controlInput).getResultEncoding(),
336+
controlInput.getValidStartTime());
335337
}
336338
else
337339
{
338340
newCsHandler = addOrUpdateCommandStream(
339341
controlInput.getName(),
340342
controlInput.getCommandDescription(),
341-
controlInput.getCommandEncoding());
343+
controlInput.getCommandEncoding(),
344+
controlInput.getValidStartTime());
342345
}
343346

344347
// replace and cleanup old handler and subscription

sensorhub-core/src/main/java/org/sensorhub/impl/system/SystemTransactionHandler.java

Lines changed: 108 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,12 @@ public synchronized SystemTransactionHandler addOrUpdateMember(ISystemWithDesc s
294294

295295

296296
public DataStreamTransactionHandler addOrUpdateDataStream(String outputName, DataComponent dataStruct, DataEncoding dataEncoding) throws DataStoreException
297+
{
298+
return addOrUpdateDataStream(outputName, dataStruct, dataEncoding, null);
299+
}
300+
301+
302+
public DataStreamTransactionHandler addOrUpdateDataStream(String outputName, DataComponent dataStruct, DataEncoding dataEncoding, Instant validStartTime) throws DataStoreException
297303
{
298304
Asserts.checkNotNullOrEmpty(outputName, "outputName");
299305
Asserts.checkNotNull(dataStruct, DataComponent.class);
@@ -303,11 +309,18 @@ public DataStreamTransactionHandler addOrUpdateDataStream(String outputName, Dat
303309
var sysName = getSystemDescStore().get(sysKey).getName();
304310
var dsName = Strings.isNullOrEmpty(dataStruct.getLabel()) ? outputName : dataStruct.getLabel();
305311

312+
// compute valid time extent
313+
TimeExtent validTime = null;
314+
if (validStartTime != null) {
315+
validTime = TimeExtent.endNow(validStartTime);
316+
}
317+
306318
// create datastream info
307319
dataStruct.setName(outputName);
308320
var dsInfo = new DataStreamInfo.Builder()
309321
.withName(sysName + " - " + dsName)
310322
.withSystem(FeatureId.NULL_FEATURE)
323+
.withValidTime(validTime)
311324
.withRecordDescription(dataStruct)
312325
.withRecordEncoding(dataEncoding)
313326
.build();
@@ -363,41 +376,43 @@ public synchronized DataStreamTransactionHandler addOrUpdateDataStream(IDataStre
363376
addedEvent = new DataStreamAddedEvent(sysUID, outputName);
364377
log.debug("Added datastream {}#{} with valid time {}", sysUID, outputName, validTime);
365378
}
366-
367-
// compare properties of old and new datastreams
368-
var sameName = Objects.equals(oldDsInfo.getName(), dsInfo.getName());
369-
var sameDescription = Objects.equals(oldDsInfo.getDescription(), dsInfo.getDescription());
370-
var sameRecordStruct = DataComponentChecks.checkStructEquals(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
371-
var sameRecordEncoding = DataComponentChecks.checkEncodingEquals(oldDsInfo.getRecordEncoding(), dsInfo.getRecordEncoding());
372-
var recordStructCompatible = sameRecordStruct || DataComponentChecks.checkStructCompatible(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
373-
374-
// if observations were already recorded and structure has changed, create a new datastream
375-
if (hasObs && (!recordStructCompatible || !sameRecordEncoding))
376-
{
377-
// set validTime to current time
378-
dsInfo = DataStreamInfo.Builder.from(dsInfo)
379-
.withValidTime(TimeExtent.endNow(Instant.now()))
380-
.build();
381-
382-
dsKey = dataStreamStore.add(dsInfo);
383-
addedEvent = new DataStreamAddedEvent(sysUID, outputName);
384-
log.debug("Added datastream {}#{} with new data structure", sysUID, outputName);
385-
}
386-
387-
// if something else has changed, update existing datastream
388-
else if (!sameRecordStruct || !sameRecordEncoding || !sameName || !sameDescription)
389-
{
390-
var dsHandler = new DataStreamTransactionHandler(dsKey, oldDsInfo, rootHandler);
391-
dsHandler.update(dsInfo);
392-
dsInfo = dsHandler.getDataStreamInfo();
393-
log.debug("Updated datastream {}#{}", sysUID, outputName);
394-
}
395-
396-
// else don't update and return existing key
397379
else
398380
{
399-
dsInfo = oldDsInfo;
400-
log.debug("No changes to datastream {}#{}", sysUID, outputName);
381+
// compare properties of old and new datastreams
382+
var sameName = Objects.equals(oldDsInfo.getName(), dsInfo.getName());
383+
var sameDescription = Objects.equals(oldDsInfo.getDescription(), dsInfo.getDescription());
384+
var sameRecordStruct = DataComponentChecks.checkStructEquals(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
385+
var sameRecordEncoding = DataComponentChecks.checkEncodingEquals(oldDsInfo.getRecordEncoding(), dsInfo.getRecordEncoding());
386+
var recordStructCompatible = sameRecordStruct || DataComponentChecks.checkStructCompatible(oldDsInfo.getRecordStructure(), dsInfo.getRecordStructure());
387+
388+
// if observations were already recorded and structure has changed, create a new datastream
389+
if (hasObs && (!recordStructCompatible || !sameRecordEncoding))
390+
{
391+
// set validTime to current time
392+
dsInfo = DataStreamInfo.Builder.from(dsInfo)
393+
.withValidTime(TimeExtent.endNow(Instant.now()))
394+
.build();
395+
396+
dsKey = dataStreamStore.add(dsInfo);
397+
addedEvent = new DataStreamAddedEvent(sysUID, outputName);
398+
log.debug("Added datastream {}#{} with new data structure", sysUID, outputName);
399+
}
400+
401+
// if something else has changed, update existing datastream
402+
else if (!sameRecordStruct || !sameRecordEncoding || !sameName || !sameDescription)
403+
{
404+
var dsHandler = new DataStreamTransactionHandler(dsKey, oldDsInfo, rootHandler);
405+
dsHandler.update(dsInfo);
406+
dsInfo = dsHandler.getDataStreamInfo();
407+
log.debug("Updated datastream {}#{}", sysUID, outputName);
408+
}
409+
410+
// else don't update and return existing key
411+
else
412+
{
413+
dsInfo = oldDsInfo;
414+
log.debug("No changes to datastream {}#{}", sysUID, outputName);
415+
}
401416
}
402417
}
403418

@@ -411,11 +426,23 @@ else if (!sameRecordStruct || !sameRecordEncoding || !sameName || !sameDescripti
411426

412427
public CommandStreamTransactionHandler addOrUpdateCommandStream(String commandName, DataComponent dataStruct, DataEncoding dataEncoding) throws DataStoreException
413428
{
414-
return addOrUpdateCommandStream(commandName, dataStruct, dataEncoding, null, null);
429+
return addOrUpdateCommandStream(commandName, dataStruct, dataEncoding, null, null, null);
430+
}
431+
432+
433+
public CommandStreamTransactionHandler addOrUpdateCommandStream(String commandName, DataComponent dataStruct, DataEncoding dataEncoding, Instant validStartTime) throws DataStoreException
434+
{
435+
return addOrUpdateCommandStream(commandName, dataStruct, dataEncoding, null, null, validStartTime);
415436
}
416437

417438

418439
public CommandStreamTransactionHandler addOrUpdateCommandStream(String commandName, DataComponent dataStruct, DataEncoding dataEncoding, DataComponent resultStruct, DataEncoding resultEncoding) throws DataStoreException
440+
{
441+
return addOrUpdateCommandStream(commandName, dataStruct, dataEncoding, resultStruct, resultEncoding, null);
442+
}
443+
444+
445+
public CommandStreamTransactionHandler addOrUpdateCommandStream(String commandName, DataComponent dataStruct, DataEncoding dataEncoding, DataComponent resultStruct, DataEncoding resultEncoding, Instant validStartTime) throws DataStoreException
419446
{
420447
Asserts.checkNotNullOrEmpty(commandName, "commandName");
421448
Asserts.checkNotNull(dataStruct, DataComponent.class);
@@ -427,11 +454,18 @@ public CommandStreamTransactionHandler addOrUpdateCommandStream(String commandNa
427454
var fullName = sysName + " - " + csName;
428455
dataStruct.setName(commandName);
429456

457+
// compute valid time extent
458+
TimeExtent validTime = null;
459+
if (validStartTime != null) {
460+
validTime = TimeExtent.endNow(validStartTime);
461+
}
462+
430463
// create command stream info
431464
// with or without command result
432465
ICommandStreamInfo csInfo = new CommandStreamInfo.Builder()
433466
.withName(fullName)
434467
.withSystem(FeatureId.NULL_FEATURE)
468+
.withValidTime(validTime)
435469
.withRecordDescription(dataStruct)
436470
.withRecordEncoding(dataEncoding)
437471
.withResultDescription(resultStruct)
@@ -489,46 +523,48 @@ public synchronized CommandStreamTransactionHandler addOrUpdateCommandStream(ICo
489523
addedEvent = new CommandStreamAddedEvent(sysUID, commandName);
490524
log.debug("Added command stream {}#{} with valid time {}", sysUID, commandName, validTime);
491525
}
492-
493-
// compare properties of old and new command streams
494-
var sameName = Objects.equals(oldCsInfo.getName(), csInfo.getName());
495-
var sameDescription = Objects.equals(oldCsInfo.getDescription(), csInfo.getDescription());
496-
497-
var sameParamStruct = DataComponentChecks.checkStructEquals(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
498-
var sameParamEncoding = DataComponentChecks.checkEncodingEquals(oldCsInfo.getRecordEncoding(), csInfo.getRecordEncoding());
499-
var paramStructCompatible = sameParamStruct || DataComponentChecks.checkStructCompatible(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
500-
501-
var sameResultStruct = DataComponentChecks.checkStructEqualsNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
502-
var sameResultEncoding = DataComponentChecks.checkEncodingEqualsNullAllowed(oldCsInfo.getResultEncoding(), csInfo.getResultEncoding());
503-
var resultStructCompatible = sameResultStruct || DataComponentChecks.checkStructCompatibleNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
504-
505-
// if observations were already recorded and structure has changed, create a new datastream
506-
if (hasCommands && (!paramStructCompatible || !sameParamEncoding || !resultStructCompatible || !sameResultEncoding))
507-
{
508-
// set validTime to current time
509-
csInfo = CommandStreamInfo.Builder.from(csInfo)
510-
.withValidTime(TimeExtent.endNow(Instant.now()))
511-
.build();
512-
513-
csKey = commandStreamStore.add(csInfo);
514-
addedEvent = new CommandStreamAddedEvent(sysUID, commandName);
515-
log.debug("Added command stream {}#{} with new data structure", sysUID, commandName);
516-
}
517-
518-
// if something else has changed, update existing command stream
519-
else if (!sameParamStruct || !sameParamEncoding || !sameResultStruct || !sameResultEncoding || !sameName || !sameDescription)
520-
{
521-
var csHandler = new CommandStreamTransactionHandler(csKey, oldCsInfo, rootHandler);
522-
csHandler.update(csInfo);
523-
csInfo = csHandler.getCommandStreamInfo();
524-
log.debug("Updated command stream {}#{}", sysUID, commandName);
525-
}
526-
527-
// else don't update and return existing key
528526
else
529527
{
530-
csInfo = oldCsInfo;
531-
log.debug("No changes to command stream {}#{}", sysUID, commandName);
528+
// compare properties of old and new command streams
529+
var sameName = Objects.equals(oldCsInfo.getName(), csInfo.getName());
530+
var sameDescription = Objects.equals(oldCsInfo.getDescription(), csInfo.getDescription());
531+
532+
var sameParamStruct = DataComponentChecks.checkStructEquals(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
533+
var sameParamEncoding = DataComponentChecks.checkEncodingEquals(oldCsInfo.getRecordEncoding(), csInfo.getRecordEncoding());
534+
var paramStructCompatible = sameParamStruct || DataComponentChecks.checkStructCompatible(oldCsInfo.getRecordStructure(), csInfo.getRecordStructure());
535+
536+
var sameResultStruct = DataComponentChecks.checkStructEqualsNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
537+
var sameResultEncoding = DataComponentChecks.checkEncodingEqualsNullAllowed(oldCsInfo.getResultEncoding(), csInfo.getResultEncoding());
538+
var resultStructCompatible = sameResultStruct || DataComponentChecks.checkStructCompatibleNullAllowed(oldCsInfo.getResultStructure(), csInfo.getResultStructure());
539+
540+
// if observations were already recorded and structure has changed, create a new datastream
541+
if (hasCommands && (!paramStructCompatible || !sameParamEncoding || !resultStructCompatible || !sameResultEncoding))
542+
{
543+
// set validTime to current time
544+
csInfo = CommandStreamInfo.Builder.from(csInfo)
545+
.withValidTime(TimeExtent.endNow(Instant.now()))
546+
.build();
547+
548+
csKey = commandStreamStore.add(csInfo);
549+
addedEvent = new CommandStreamAddedEvent(sysUID, commandName);
550+
log.debug("Added command stream {}#{} with new data structure", sysUID, commandName);
551+
}
552+
553+
// if something else has changed, update existing command stream
554+
else if (!sameParamStruct || !sameParamEncoding || !sameResultStruct || !sameResultEncoding || !sameName || !sameDescription)
555+
{
556+
var csHandler = new CommandStreamTransactionHandler(csKey, oldCsInfo, rootHandler);
557+
csHandler.update(csInfo);
558+
csInfo = csHandler.getCommandStreamInfo();
559+
log.debug("Updated command stream {}#{}", sysUID, commandName);
560+
}
561+
562+
// else don't update and return existing key
563+
else
564+
{
565+
csInfo = oldCsInfo;
566+
log.debug("No changes to command stream {}#{}", sysUID, commandName);
567+
}
532568
}
533569
}
534570

0 commit comments

Comments
 (0)