diff --git a/common/transport/mqtt/src/mqtt_base.ts b/common/transport/mqtt/src/mqtt_base.ts index 39b1379d0..643910cf8 100644 --- a/common/transport/mqtt/src/mqtt_base.ts +++ b/common/transport/mqtt/src/mqtt_base.ts @@ -16,7 +16,7 @@ class OnTheWireMessage { enqueuedTimeSecondsSinceEpoch: number; callback: (err?: Error, result?: any) => void; constructor(callback: (err?: Error, result?: any) => void) { - this.enqueuedTimeSecondsSinceEpoch = Math.floor( Date.now() / 1000 ); + this.enqueuedTimeSecondsSinceEpoch = Math.floor(Date.now() / 1000); this.callback = callback; } } @@ -283,7 +283,7 @@ export class MqttBase extends EventEmitter { // disconnectClient can be set to true to cause messages to be properly dropped, // and thus re-sent. Making this the default is under investigation. debug('disconnecting mqtt client'); - this._disconnectClient(false, () => { + this._disconnectClient(this._options.mqtt?.forceDisconnect ?? false, () => { clearTimeout(disconnectTimeout); if (!switched) { debug('mqtt client disconnected - reconnecting'); @@ -470,6 +470,7 @@ export class MqttBase extends EventEmitter { } private _disconnectClient(forceDisconnect: boolean, callback: () => void): void { + debug('forceDisconnect is valued at: ' + forceDisconnect); if (this._mqttClient) { debug('removing all listeners'); this._mqttTrackedListeners.removeAllTrackedListeners(); @@ -517,9 +518,9 @@ export class MqttBase extends EventEmitter { } } - /** - * @private - */ +/** + * @private + */ export interface MqttBaseTransportConfig { sharedAccessSignature?: string | SharedAccessSignature; clientId: string; diff --git a/device/core/src/interfaces.ts b/device/core/src/interfaces.ts index eb2308116..fa391d225 100644 --- a/device/core/src/interfaces.ts +++ b/device/core/src/interfaces.ts @@ -85,6 +85,10 @@ export interface MqttTransportOptions { * Optional [Agent]{@link https://nodejs.org/api/https.html#https_class_https_agent} object to use with MQTT-WS connections */ webSocketAgent?: Agent; + /** + * Boolean flag indicating whether to force disconnection in high throughput scenarios to cause messages to be properly dropped. + */ + forceDisconnect?: boolean; } @@ -144,12 +148,12 @@ export interface DeviceClientOptions extends X509 { /** * Optional object with options specific to the MQTT transport */ - mqtt?: MqttTransportOptions; + mqtt?: MqttTransportOptions; /** * Optional object with options specific to the HTTP transport */ - http?: HttpTransportOptions; + http?: HttpTransportOptions; /** * Optional object with options specific to the AMQP transport