Skip to content

Commit 4624a20

Browse files
author
rmqtt-rs
committed
Implement receiving and sending QoS 2 messages
1 parent 36d50c3 commit 4624a20

File tree

9 files changed

+292
-85
lines changed

9 files changed

+292
-85
lines changed

src/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,16 @@ where
165165
> + 'static,
166166
P: ServiceFactory<
167167
Config = v5::Session<St>,
168-
Request = v5::Publish,
169-
Response = v5::PublishAck,
168+
Request = v5::PublishMessage,
169+
Response = v5::PublishResult,
170170
> + 'static,
171171
P::Error: fmt::Debug,
172172
C::Error: From<Cn::Error>
173173
+ From<Cn::InitError>
174174
+ From<P::Error>
175175
+ From<P::InitError>
176176
+ fmt::Debug,
177-
v5::PublishAck: TryFrom<P::Error, Error = C::Error>,
177+
v5::PublishResult: TryFrom<P::Error, Error = C::Error>,
178178
{
179179
MqttServer {
180180
v3: self.v3,

src/topic.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,4 +489,3 @@ mod tests {
489489
assert!(Topic::from_str("$SYS/monitor/+").unwrap().matches_str("$SYS/monitor/Clients"));
490490
}
491491
}
492-

src/v5/client/dispatcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ where
161161

162162
// check for duplicated packet id
163163
if !inner.inflight.insert(pid) {
164-
self.inner.sink.send(codec::Packet::PublishAck(
164+
let _ = self.inner.sink.send(codec::Packet::PublishAck(
165165
codec::PublishAck {
166166
packet_id: pid,
167167
reason_code: codec::PublishAckReason::PacketIdentifierInUse,
@@ -453,7 +453,7 @@ where
453453

454454
if self.error {
455455
if let Some(pkt) = result.packet {
456-
self.inner.sink.send(pkt)
456+
let _ = self.inner.sink.send(pkt);
457457
}
458458
if result.disconnect {
459459
self.inner.sink.drop_sink();

src/v5/default.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use ntex::service::{Service, ServiceFactory};
44
use ntex::util::Ready;
55

66
use super::control::{ControlMessage, ControlResult};
7-
use super::publish::{Publish, PublishAck};
7+
use super::publish::{PublishMessage, PublishResult};
88
use super::Session;
99

1010
/// Default publish service
@@ -20,8 +20,8 @@ impl<St, Err> Default for DefaultPublishService<St, Err> {
2020

2121
impl<St, Err> ServiceFactory for DefaultPublishService<St, Err> {
2222
type Config = Session<St>;
23-
type Request = Publish;
24-
type Response = PublishAck;
23+
type Request = PublishMessage;
24+
type Response = PublishResult;
2525
type Error = Err;
2626
type Service = DefaultPublishService<St, Err>;
2727
type InitError = Err;
@@ -33,16 +33,16 @@ impl<St, Err> ServiceFactory for DefaultPublishService<St, Err> {
3333
}
3434

3535
impl<St, Err> Service for DefaultPublishService<St, Err> {
36-
type Request = Publish;
37-
type Response = PublishAck;
36+
type Request = PublishMessage;
37+
type Response = PublishResult;
3838
type Error = Err;
3939
type Future = Ready<Self::Response, Self::Error>;
4040

4141
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
4242
Poll::Ready(Ok(()))
4343
}
4444

45-
fn call(&self, req: Publish) -> Self::Future {
45+
fn call(&self, req: PublishMessage) -> Self::Future {
4646
log::warn!("Publish service is disabled");
4747
Ready::Ok(req.ack())
4848
}

0 commit comments

Comments
 (0)