Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions common/transport/mqtt/src/mqtt_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class OnTheWireMessage {
enqueuedTimeSecondsSinceEpoch: number;
callback: (err?: Error, result?: any) => void;
constructor(callback: (err?: Error, result?: any) => void) {
this.enqueuedTimeSecondsSinceEpoch = Math.floor( Date.now() / 1000 );
this.enqueuedTimeSecondsSinceEpoch = Math.floor(Date.now() / 1000);
this.callback = callback;
}
}
Expand Down Expand Up @@ -283,7 +283,7 @@ export class MqttBase extends EventEmitter {
// disconnectClient can be set to true to cause messages to be properly dropped,
// and thus re-sent. Making this the default is under investigation.
debug('disconnecting mqtt client');
this._disconnectClient(false, () => {
this._disconnectClient(this._options.mqtt?.forceFlag ?? false, () => {
clearTimeout(disconnectTimeout);
if (!switched) {
debug('mqtt client disconnected - reconnecting');
Expand Down Expand Up @@ -470,6 +470,7 @@ export class MqttBase extends EventEmitter {
}

private _disconnectClient(forceDisconnect: boolean, callback: () => void): void {
debug('forceDisconnect is valued at: ' + forceDisconnect);
if (this._mqttClient) {
debug('removing all listeners');
this._mqttTrackedListeners.removeAllTrackedListeners();
Expand Down Expand Up @@ -517,9 +518,9 @@ export class MqttBase extends EventEmitter {
}
}

/**
* @private
*/
/**
* @private
*/
export interface MqttBaseTransportConfig {
sharedAccessSignature?: string | SharedAccessSignature;
clientId: string;
Expand Down
8 changes: 6 additions & 2 deletions device/core/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ export interface MqttTransportOptions {
* Optional [Agent]{@link https://nodejs.org/api/https.html#https_class_https_agent} object to use with MQTT-WS connections
*/
webSocketAgent?: Agent;
/**
* Boolean flag indicating whether to force disconnection in high throughput scenarios to cause messages to be properly dropped.
*/
forceFlag?: boolean;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a more descriptive name here?


}

Expand Down Expand Up @@ -144,12 +148,12 @@ export interface DeviceClientOptions extends X509 {
/**
* Optional object with options specific to the MQTT transport
*/
mqtt?: MqttTransportOptions;
mqtt?: MqttTransportOptions;

/**
* Optional object with options specific to the HTTP transport
*/
http?: HttpTransportOptions;
http?: HttpTransportOptions;

/**
* Optional object with options specific to the AMQP transport
Expand Down