File tree Expand file tree Collapse file tree 1 file changed +24
-9
lines changed Expand file tree Collapse file tree 1 file changed +24
-9
lines changed Original file line number Diff line number Diff line change @@ -97,16 +97,31 @@ impl v2::HostConnection for OutboundMqtt {
97
97
. await
98
98
. map_err ( other_error) ?;
99
99
100
- // Poll EventLoop once to send the message to MQTT broker or capture/throw error
100
+ // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
101
101
// We may revisit this later to manage long running connections and their issues in the connection pool.
102
- eventloop
103
- . poll ( )
104
- . await
105
- . map_err ( |err : rumqttc:: ConnectionError | {
106
- v2:: Error :: ConnectionFailed ( err. to_string ( ) )
107
- } ) ?;
108
-
109
- Ok ( ( ) )
102
+ loop {
103
+ let event = eventloop
104
+ . poll ( )
105
+ . await
106
+ . map_err ( |err| v2:: Error :: ConnectionFailed ( err. to_string ( ) ) ) ?;
107
+
108
+ match event {
109
+ rumqttc:: Event :: Outgoing ( outgoing_event) => {
110
+ match outgoing_event {
111
+ rumqttc:: Outgoing :: Publish ( _) => {
112
+ return Ok ( ( ) ) ;
113
+ }
114
+ _ => {
115
+ // We don't care about other outgoing event types in this loop check.
116
+ continue ;
117
+ }
118
+ }
119
+ }
120
+ rumqttc:: Event :: Incoming ( _) => {
121
+ // We don't care about incoming event types in this loop check.
122
+ }
123
+ }
124
+ }
110
125
}
111
126
. await )
112
127
}
You can’t perform that action at this time.
0 commit comments