Skip to content

Comments

feat: setting retain flags and QoS levels for outgoing MQTT messages#3994

Open
didier-wenzek wants to merge 5 commits intothin-edge:mainfrom
didier-wenzek:feat/protocol-specific-flow-properties
Open

feat: setting retain flags and QoS levels for outgoing MQTT messages#3994
didier-wenzek wants to merge 5 commits intothin-edge:mainfrom
didier-wenzek:feat/protocol-specific-flow-properties

Conversation

@didier-wenzek
Copy link
Contributor

@didier-wenzek didier-wenzek commented Feb 16, 2026

Proposed changes

  • setting retain flags and QoS levels for outgoing MQTT messages
  • getting retain flags and QoS levels for incoming MQTT messages
  • getting command line for polled and streamed incoming messages

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

See #3984

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@codecov
Copy link

codecov bot commented Feb 16, 2026

Codecov Report

❌ Patch coverage is 95.65217% with 4 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_flows/src/js_value.rs 87.09% 2 Missing and 2 partials ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 16, 2026

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
840 0 3 840 100 2h37m28.244372s

Copy link
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed approach looks good to me. Happy to approve once it is finalized.

("payload", payload),
("raw_payload", raw_payload),
("time", JsonValue::option(value.timestamp)),
("transport", JsonValue::option(transport)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional:

Suggested change
("transport", JsonValue::option(transport)),
("source", JsonValue::option(source)),

As it is more about the source of the message: mqtt or file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is more about the source of the message: mqtt or file.

For an input message, this is the source, yes. But a transformed message, this is the target.

That said I'm not fully happy with the naming. An option is to simply remove this intermediate level that is not easy to name. By the way, this is what is proposed in the issue:

export function onMessage(message, context) {
    return [{
        topic: "foo",
        payload: JSON.stringify({
            text: "bar"
        }),
        qos: 1,
        retain: true,
    }];
}

Copy link
Contributor

@albinsuresh albinsuresh Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that moving the qos and retain to an MQTT specific key feels like the right thing to avoid polluting the root namespace, since we support non-mqtt sources/sinks as well. Yes, topic being in the message is contentious in that sense, but I guess we can't go back on that one now and we have already defined what a topic means for file sources as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, topic being in the message is contentious in that sense

I don't think so. This is a concept shared by all messaging systems. QoS and retained messages is not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand we don't want to add too many things to the root level. Especially, QoS and retained are only related to mqtt input. However, then I would have one nested layer. It would look as below.

export function onMessage(message, context) {
    return [{
        topic: "foo",
        payload: JSON.stringify({
            text: "bar"
        }),
        mqtt: { 
          qos: 1,
          retain: true,
       }
    }];
}

The current approach (two nested layers) is a bit too nested for me.

export function onMessage(message, context) {
    return [{
        ...message, 
        transport: { mqtt: { qos: 2, retain: true } }
    }];
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, topic being in the message is contentious in that sense

I don't think so. This is a concept shared by all messaging systems. QoS and retained messages is not.

I was referring to file sources where the path is conflated into topic.

Copy link
Contributor Author

@didier-wenzek didier-wenzek Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to file sources where the path is conflated into topic.

Yeah. This is one of the point I wish to improve with this PR.

The path or command of a polling source is not fully conflated into topic. The flow definition can provide a topic name for its source. And the path or command is only used as a default.

To fix that, the proposal is to provide both, topic and source info. For a line received from a polling command:

{
    "topic": "logical name of the source",               # default to the command
    "payload": "line read from the source",
    "process": {
         "command": "/some/polling/command"
    }
}

Copy link
Contributor Author

@didier-wenzek didier-wenzek Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rina's suggestion has been applied: 95928aa

export function onMessage(message, context) {
    return [{
        topic: "foo",
        payload: JSON.stringify({
            text: "bar"
        }),
        mqtt: { 
          qos: 1,
          retain: true,
       }
    }];
}

Copy link
Member

@Bravo555 Bravo555 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks all good, also happy to approve once finalized.

.map(|v| v.to_owned().into_value::<Transport>())
.transpose()
else {
return Err(anyhow::anyhow!("Unexpected transport data").into());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: The error itself doesn't include the context for which message caused the problem, but I'm not sure where it's printed, is the message that caused the problem reported there? Otherwise it may be tricky to debug.

Copy link
Contributor Author

@didier-wenzek didier-wenzek Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: 95928aa

@reubenmiller
Copy link
Contributor

@didier-wenzek For some reason I can't get a flow to publish a retain message...Am I doing something wrong? Below is the example I was trying to use to verify it (referencing the latest comment with the expected format)...you should be able to copy/paste the code below to recreate the scenario:

# create a flow which just appends "/bar" to the incoming topic and adds the retained flag
mkdir -p /etc/tedge/mappers/c8y/flows/custom
cat <<'EOT' > /etc/tedge/mappers/c8y/flows/custom/main.js
export function onMessage(message, context) {
    return [{
        topic: message.topic + "/bar",
        payload: JSON.stringify({
            ...JSON.parse(message.payload),
            time: message.time,
            seen: true,
        }),
        mqtt: {
            retain: true,
        },
    }]
}
EOT

cat <<'EOT' > /etc/tedge/mappers/c8y/flows/custom/flow.toml
input.mqtt.topics = ["foo"]
[[steps]]
script = "/etc/tedge/mappers/c8y/flows/custom/main.js"
EOT

# publish a message to the topic
tedge mqtt pub 'foo' '{}'
sleep 1
# check for the retained message
tedge mqtt sub foo/bar --retained-only -C 1 -W 5

@didier-wenzek
Copy link
Contributor Author

        mqtt: {
            retain: true,
        },

Nothing wrong your side. The code was not excepting all the attributes to be set. Fixed by: ec4956f

Copy link
Contributor

@reubenmiller reubenmiller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved. It works nicely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants