-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathMqttConnectionConfig.java
More file actions
598 lines (531 loc) · 18.8 KB
/
MqttConnectionConfig.java
File metadata and controls
598 lines (531 loc) · 18.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.io.ClientTlsContext;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
/**
* Encapsulates all per-mqtt-connection configuration
*/
public final class MqttConnectionConfig extends CrtResource {
/* connection */
private String endpoint;
private int port;
private SocketOptions socketOptions;
/* mqtt */
private MqttClient mqttClient;
private Mqtt5Client mqtt5Client;
private String clientId;
private String username;
private String password;
private MqttClientConnectionEvents connectionCallbacks;
private int keepAliveSecs = 0;
private int pingTimeoutMs = 0;
private long minReconnectTimeoutSecs = 0L;
private long maxReconnectTimeoutSecs = 0L;
private int protocolOperationTimeoutMs = 0;
private boolean cleanSession = true;
/* will */
private MqttMessage willMessage;
private QualityOfService deprecatedWillQos;
private Boolean deprecatedWillRetain;
/* websockets */
private boolean useWebsockets = false;
private HttpProxyOptions proxyOptions;
private Consumer<WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
/* metrics */
private boolean metricsEnabled = true;
public MqttConnectionConfig() {}
/**
* Required override method that must begin the release process of the acquired native handle
*/
@Override
protected void releaseNativeHandle() {}
/**
* Override that determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
* Resources with asynchronous shutdown processes should override this with false, and establish a callback from native code that
* invokes releaseReferences() when the asynchronous shutdown process has completed. See HttpClientConnectionManager for an example.
*/
@Override
protected boolean canReleaseReferencesImmediately() { return true; }
/**
* Configures the connection-related callbacks for a connection
*
* @param connectionCallbacks connection event callbacks to use
*/
public void setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks) {
this.connectionCallbacks = connectionCallbacks;
}
/**
* Queries the connection-related callbacks for a connection
*
* @return the connection event callbacks to use
*/
public MqttClientConnectionEvents getConnectionCallbacks() {
return connectionCallbacks;
}
/**
* Configures the client_id to use with a connection
*
* @param clientId The client id for a connection. Needs to be unique across
* all devices/clients.this.credentialsProvider
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Queries the client_id being used by a connection
*
* @return The client id for a connection.
*/
public String getClientId() {
return clientId;
}
/**
* Configures the IoT endpoint for a connection
*
* @param endpoint The IoT endpoint to connect to
*/
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
/**
* Queries the IoT endpoint used by a connection
*
* @return The IoT endpoint used by a connection
*/
public String getEndpoint() {
return endpoint;
}
/**
* Configures the port to connect to.
*
* @param port The port to connect to. Usually 8883 for MQTT, or 443 for websockets
*/
public void setPort(int port) {
this.port = port;
}
/**
* Queries the port to connect to.
*
* @return The port to connect to
*/
public int getPort() {
return port;
}
/**
* Configures the common settings to use for a connection's socket
*
* @param socketOptions The socket settings
*/
public void setSocketOptions(SocketOptions socketOptions) {
swapReferenceTo(this.socketOptions, socketOptions);
this.socketOptions = socketOptions;
}
/**
* Queries the common settings to use for a connection's socket
*
* @return The socket settings
*/
public SocketOptions getSocketOptions() {
return socketOptions;
}
/**
* Configures whether or not the service should try to resume prior subscriptions, if it has any
*
* @param cleanSession true if the session should drop prior subscriptions when
* a connection is established, false to resume the session
*/
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
/**
* Queries whether or not the service should try to resume prior subscriptions, if it has any
*
* @return true if the session should drop prior subscriptions when
* a connection is established, false to resume the session
*/
public boolean getCleanSession() {
return cleanSession;
}
/**
* @deprecated Configures MQTT keep-alive via PING messages. Note that this is not TCP
* keepalive. Please use setKeepAliveSecs instead.
*
* @param keepAliveMs How often in milliseconds to send an MQTT PING message to the
* service to keep a connection alive
*
*/
@Deprecated
public void setKeepAliveMs(int keepAliveMs) {
this.keepAliveSecs = keepAliveMs/1000;
}
/**
* @deprecated Queries the MQTT keep-alive via PING messages. Please use
* getKeepAliveSecs instead.
*
* @return How often in milliseconds to send an MQTT PING message to the
* service to keep a connection alive
*/
@Deprecated
public int getKeepAliveMs() {
return keepAliveSecs*1000;
}
/**
* Configures MQTT keep-alive via PING messages. Note that this is not TCP
* keepalive. Note: AWS IoT Core only allows 30-1200 Secs. Anything larger than
* 65535 will be capped.
*
* @param keepAliveSecs How often in seconds to send an MQTT PING message to the
* service to keep a connection alive
*
*/
public void setKeepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
}
/**
* Queries the MQTT keep-alive via PING messages.
*
* @return How often in seconds to send an MQTT PING message to the
* service to keep a connection alive
*/
public int getKeepAliveSecs() {
return keepAliveSecs;
}
/**
* Configures ping timeout value. If a response is not received within this
* interval, the connection will be reestablished.
*
* @param pingTimeoutMs How long to wait for a ping response (in milliseconds) before resetting the connection
*/
public void setPingTimeoutMs(int pingTimeoutMs) {
this.pingTimeoutMs = pingTimeoutMs;
}
/**
* Queries ping timeout value. If a response is not received within this
* interval, the connection will be reestablished.
*
* @return How long to wait for a ping response before resetting the connection
*/
public int getPingTimeoutMs() {
return pingTimeoutMs;
}
/**
* Configures the minimum and maximum reconnect timeouts.
*
* The time between reconnect attempts will start at min and multiply by 2 until max is reached.
* Default value for min is 1, for max 128. Set either one to zero will use the default setting.
*
* @param minTimeoutSecs The timeout to start with
* @param maxTimeoutSecs The highest allowable wait time between reconnect attempts
*/
public void setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs) {
this.minReconnectTimeoutSecs = minTimeoutSecs;
this.maxReconnectTimeoutSecs = maxTimeoutSecs;
}
/**
* Return the minimum reconnect timeout.
*
* @return The timeout to start with
*/
public long getMinReconnectTimeoutSecs() {
return minReconnectTimeoutSecs;
}
/**
* Return the maximum reconnect timeout.
*
* @return The highest allowable wait time between reconnect attempts
*/
public long getMaxReconnectTimeoutSecs() {
return maxReconnectTimeoutSecs;
}
/**
* Configures timeout value for requests that response is required on healthy connection.
* If a response is not received within this interval, the request will fail as server not receiving it.
* Applied to publish (QoS>0) and unsubscribe
*
* @param protocolOperationTimeoutMs How long to wait for a request response (in milliseconds) before failing
*/
public void setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs) {
this.protocolOperationTimeoutMs = protocolOperationTimeoutMs;
}
/**
* Queries timeout value for requests that response is required on healthy connection.
* If a response is not received within this interval, the request will fail as server not receiving it.
* Applied to publish (QoS>0) and unsubscribe
*
* @return How long to wait for a request response (in milliseconds) before failing
*/
public int getProtocolOperationTimeoutMs() {
return protocolOperationTimeoutMs;
}
/**
* Configures the mqtt client to use for a connection
*
* @param mqttClient the mqtt client to use
*/
public void setMqttClient(MqttClient mqttClient) {
swapReferenceTo(this.mqttClient, mqttClient);
this.mqttClient = mqttClient;
}
/**
* Queries the mqtt client to use for a connection
*
* @return the mqtt client to use
*/
public MqttClient getMqttClient() {
return mqttClient;
}
/**
* Configures the mqtt5 client to use for a connection
*
* @param mqtt5Client the mqtt client to use
*/
public void setMqtt5Client(Mqtt5Client mqtt5Client) {
swapReferenceTo(this.mqtt5Client, mqtt5Client);
this.mqtt5Client = mqtt5Client;
}
/**
* Queries the mqtt5 client to use for a connection
*
* @return the mqtt5 client to use
*/
public Mqtt5Client getMqtt5Client() {
return mqtt5Client;
}
/**
* Sets the login credentials for a connection.
*
* @param user Login username
* @param pass Login password
*/
public void setLogin(String user, String pass) throws MqttException {
this.username = user;
this.password = pass;
}
/**
* Configures the username to use as part of the CONNECT attempt
*
* @param username username to use for the mqtt connect operation
*/
public void setUsername(String username) {
this.username = username;
}
/**
* Queries the username to use as part of the CONNECT attempt
*
* @return username to use for the mqtt connect operation
*/
public String getUsername() {
return username;
}
/**
* Configures the password to use as part of the CONNECT attempt
*
* @param password password to use for the mqtt connect operation
*/
public void setPassword(String password) {
this.password = password;
}
/**
* Queries the password to use as part of the CONNECT attempt
*
* @return password to use for the mqtt connect operation
*/
public String getPassword() {
return password;
}
/**
* Configures the last will and testament message to be delivered to a topic when a connection disconnects
*
* @param willMessage the message to publish as the will
*/
public void setWillMessage(MqttMessage willMessage) {
this.willMessage = willMessage;
}
/**
* Queries the last will and testament message to be delivered to a topic when a connection disconnects
*
* @return the message to publish as the will
*/
public MqttMessage getWillMessage() {
if (willMessage == null || (deprecatedWillRetain == null && deprecatedWillQos == null)) {
return willMessage;
}
QualityOfService qos = deprecatedWillQos == null ? willMessage.getQos() : deprecatedWillQos;
boolean retain = deprecatedWillRetain == null ? willMessage.getRetain() : deprecatedWillRetain;
return new MqttMessage(willMessage.getTopic(), willMessage.getPayload(), qos, retain);
}
/**
* @deprecated Set QoS directly on the will's {@link MqttMessage}.
* @param qos Quality of Service
*/
@Deprecated
public void setWillQos(QualityOfService qos) {
this.deprecatedWillQos = qos;
}
/**
* @deprecated Query QoS directly from the will's {@link MqttMessage}.
* @return Quality of Service
*/
@Deprecated
public QualityOfService getWillQos() {
if (deprecatedWillQos != null) {
return deprecatedWillQos;
}
if (willMessage != null) {
return willMessage.getQos();
}
return null;
}
/**
* @deprecated Set retain directly on the will's {@link MqttMessage}.
* @param retain whether will's should be sent with retain property set
*/
@Deprecated
public void setWillRetain(boolean retain) {
this.deprecatedWillRetain = retain;
}
/**
* @deprecated Query retain directly from the will's {@link MqttMessage}.
* @return whether will will be sent with retain property set
*/
@Deprecated
public boolean getWillRetain() {
if (deprecatedWillRetain != null) {
return deprecatedWillRetain;
}
if (willMessage != null) {
return willMessage.getRetain();
}
return false;
}
/**
* Configures whether or not to use websockets for the mqtt connection
*
* @param useWebsockets whether or not to use websockets
*/
public void setUseWebsockets(boolean useWebsockets) {
this.useWebsockets = useWebsockets;
}
/**
* Queries whether or not to use websockets for the mqtt connection
*
* @return whether or not to use websockets
*/
public boolean getUseWebsockets() {
return useWebsockets;
}
/**
* @deprecated use setHttpProxyOptions instead
* Configures proxy options for a websocket-based mqtt connection
*
* @param proxyOptions proxy options to use for the base http connection
*/
public void setWebsocketProxyOptions(HttpProxyOptions proxyOptions) {
this.proxyOptions = proxyOptions;
}
/**
* @deprecated use getHttpProxyOptions instead
* Queries proxy options for a websocket-based mqtt connection
*
* @return proxy options to use for the base http connection
*/
public HttpProxyOptions getWebsocketProxyOptions() {
return proxyOptions;
}
/**
* Configures proxy options for the mqtt connection
*
* @param proxyOptions proxy options to use for the connection
*/
public void setHttpProxyOptions(HttpProxyOptions proxyOptions) {
this.proxyOptions = proxyOptions;
}
/**
* Queries proxy options for an mqtt connection
*
* @return proxy options to use for the connection
*/
public HttpProxyOptions getHttpProxyOptions() {
return proxyOptions;
}
/**
* Set a transform operation to use on each websocket handshake http request.
* The transform may modify the http request before it is sent to the server.
* The transform MUST call handshakeTransform.complete() or handshakeTransform.completeExceptionally()
* when the transform is complete, failure to do so will stall the mqtt connection indefinitely.
* The transform operation may be asynchronous.
*
* The default websocket handshake http request uses path "/mqtt".
* All required headers for a websocket handshake are present,
* plus the optional header "Sec-WebSocket-Protocol: mqtt".
*
* This is only applicable to websocket-based mqtt connections.
*
* @param handshakeTransform http request handshake transform
*/
public void setWebsocketHandshakeTransform(Consumer<WebsocketHandshakeTransformArgs> handshakeTransform) {
this.websocketHandshakeTransform = handshakeTransform;
}
/**
* Queries the handshake http request transform to use when upgrading the connection
*
* @return http request handshake transform
*/
public Consumer<WebsocketHandshakeTransformArgs> getWebsocketHandshakeTransform() {
return websocketHandshakeTransform;
}
/**
* Enables or disables IoT Device SDK metrics collection. The metrics includes SDK name, version, and platform.
*
* @param enabled true to enable metrics, false to disable
*/
public void setMetricsEnabled(boolean enabled) {
this.metricsEnabled = enabled;
}
/**
* Queries whether IoT Device SDK metrics collection is enabled
*
* @return true if metrics are enabled, false if disabled
*/
public boolean getMetricsEnabled() {
return metricsEnabled;
}
/**
* Creates a (shallow) clone of this config object
*
* @return shallow clone of this config object
*/
public MqttConnectionConfig clone() {
try (MqttConnectionConfig clone = new MqttConnectionConfig()) {
clone.setEndpoint(getEndpoint());
clone.setPort(getPort());
clone.setSocketOptions(getSocketOptions());
clone.setMqttClient(getMqttClient());
clone.setMqtt5Client(getMqtt5Client());
clone.setClientId(getClientId());
clone.setUsername(getUsername());
clone.setPassword(getPassword());
clone.setConnectionCallbacks(getConnectionCallbacks());
clone.setKeepAliveSecs(getKeepAliveSecs());
clone.setPingTimeoutMs(getPingTimeoutMs());
clone.setProtocolOperationTimeoutMs(getProtocolOperationTimeoutMs());
clone.setCleanSession(getCleanSession());
clone.setWillMessage(getWillMessage());
clone.setUseWebsockets(getUseWebsockets());
clone.setHttpProxyOptions(getHttpProxyOptions());
clone.setWebsocketHandshakeTransform(getWebsocketHandshakeTransform());
clone.setReconnectTimeoutSecs(getMinReconnectTimeoutSecs(), getMaxReconnectTimeoutSecs());
clone.setMetricsEnabled(getMetricsEnabled());
// success, bump up the ref count so we can escape the try-with-resources block
clone.addRef();
return clone;
}
}
}