Skip to content

Servient internal caching of ExposedThings works incorrectly with MqttProtocolServer #3

@Dnnd

Description

@Dnnd

Expected behavior

It's possible to interact with thing, which was exposed (ExposedThing::expose), destroyed (ExposedThing::destroy) and exposed again with the same id.

Observed behavior

It's impossible to use events, actions and properties of exposed ExposedThing if it's id is equal to the id of the previously destroyed Thing.

Example

public class Test {
    private static final String THING_ID = "thing";
    private static final String PROPERTY_NAME = "property-1";

    public static void exposeWriteDestroy(Wot wot,
                                          String value) throws ExecutionException, InterruptedException {
        Thing thing = new Thing.Builder()
                .addProperty(PROPERTY_NAME, new ThingProperty.Builder().setType("string").build())
                .setId(THING_ID)
                .build();
        ExposedThing exposed = wot.produce(thing);
        exposed.expose().get();
        exposed.getProperty(PROPERTY_NAME).write(value).get();
        exposed.destroy().get();
    }

    public static void main(String[] args) throws IOException, WotException, ExecutionException, InterruptedException {
        Config config = ConfigFactory.parseString(
                "wot.servient.client-factories = [\"city.sane.wot.binding.mqtt.MqttProtocolClientFactory\"], wot.servient.servers = [\"city.sane.wot.binding.mqtt.MqttProtocolServer\"]");
        config = config.withFallback(ConfigFactory.load());
        Wot wot = new DefaultWot(config);
        exposeWriteDestroy(wot, "first invocation");
        exposeWriteDestroy(wot, "second invocation");
    }
}

While the string "first invocation" was successfully transmitted to MQTT broker, the second string ("second invocation") wasn't. I believe, the cause is interaction between Servient's things map

private final Map<String, ExposedThing> things;
and MqttProtocolServer. Servient::destroy won't remove any ExposedThing from things map, so Servient::addThing method won't replace ExposedThing instance if it's was created earlier:
ExposedThing previous = things.putIfAbsent(exposedThing.getId(), exposedThing);

It leads to the situation, where ExposedThing instance which I created is different from one inside Servient and MqttProtocolServer internal maps. After ExposedThing::expose invocation these instances won't share the same PublisherSubject field inside states of exposed properties, so it's impossible to pass any data to property observer
property.observer()
.map(optional -> ContentManager.valueToContent(optional.orElse(null)))
.map(content -> new MqttMessage(content.getBody()))
.subscribe(
mqttMessage -> settingsClientPair.second().publish(topic, mqttMessage),
e -> log.warn("MqttServer cannot publish data for topic '{}': {}", topic, e.getMessage()),
() -> {
}
);
using instance of ExposedThing returned from Wot::produce method.
One may suggest, that problem can be mitigated by using instance of ExposedThing returned from ExposedThing::expose method, but these will lead to another problem. Consider the following code:

public class Test {
    private static final String THING_ID = "thing";
    private static final String PROPERTY_NAME = "property-1";
    private static ExposedThing exposedThing;

    public static void setExposedThing(ExposedThing thing) {
        Test.exposedThing = thing;
    }

    public static void exposeWriteDestroy(Wot wot,
                                          String value) throws ExecutionException, InterruptedException {
        Thing thing = new Thing.Builder()
                .addProperty(PROPERTY_NAME, new ThingProperty.Builder().setType("string").build())
                .setId(THING_ID)
                .build();
        ExposedThing exposed = wot.produce(thing);
        exposed.expose().whenComplete((t, ignore ) -> setExposedThing(t)).get();
        
        exposedThing.getProperty(PROPERTY_NAME).write(value).get();
        exposedThing.destroy().get();
    }

    public static void main(String[] args) throws IOException, WotException, ExecutionException, InterruptedException {
        Config config = ConfigFactory.parseString(
                "wot.servient.client-factories = [\"city.sane.wot.binding.mqtt.MqttProtocolClientFactory\"], wot.servient.servers = [\"city.sane.wot.binding.mqtt.MqttProtocolServer\"]");
        config = config.withFallback(ConfigFactory.load());
        Wot wot = new DefaultWot(config);
        exposeWriteDestroy(wot, "first invocation");
        exposeWriteDestroy(wot, "second invocation");
    }
}

In this example, the string "second invocation" will be sent to MQTT broker twice. It will happen because MqttProtocolServer won't dispose subscriptions, created in MqttProtocolServer::exposeProperties.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions