|
| 1 | +/** |
| 2 | + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 3 | + * SPDX-License-Identifier: Apache-2.0. |
| 4 | + */ |
| 5 | + |
| 6 | +package greengrass; |
| 7 | + |
| 8 | +import software.amazon.awssdk.crt.CrtResource; |
| 9 | +import software.amazon.awssdk.crt.CrtRuntimeException; |
| 10 | +import software.amazon.awssdk.crt.http.HttpProxyOptions; |
| 11 | +import software.amazon.awssdk.crt.io.*; |
| 12 | +import software.amazon.awssdk.crt.mqtt.*; |
| 13 | +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; |
| 14 | +import software.amazon.awssdk.iot.discovery.DiscoveryClient; |
| 15 | +import software.amazon.awssdk.iot.discovery.DiscoveryClientConfig; |
| 16 | +import software.amazon.awssdk.iot.discovery.model.ConnectivityInfo; |
| 17 | +import software.amazon.awssdk.iot.discovery.model.DiscoverResponse; |
| 18 | +import software.amazon.awssdk.iot.discovery.model.GGCore; |
| 19 | +import software.amazon.awssdk.iot.discovery.model.GGGroup; |
| 20 | + |
| 21 | +import java.io.File; |
| 22 | +import java.nio.charset.StandardCharsets; |
| 23 | +import java.util.*; |
| 24 | +import java.util.concurrent.CompletableFuture; |
| 25 | +import java.util.concurrent.ExecutionException; |
| 26 | +import java.util.concurrent.TimeUnit; |
| 27 | +import java.util.concurrent.TimeoutException; |
| 28 | + |
| 29 | +import static software.amazon.awssdk.iot.discovery.DiscoveryClient.TLS_EXT_ALPN; |
| 30 | + |
| 31 | +public class BasicDiscovery { |
| 32 | + |
| 33 | + // ------------------------- ARGUMENT PARSING ------------------------- |
| 34 | + static class Args { |
| 35 | + String certPath; |
| 36 | + String keyPath; |
| 37 | + String region; |
| 38 | + String thingName; |
| 39 | + Boolean printDiscoveryRespOnly = false; |
| 40 | + String mode; |
| 41 | + String proxyHost; |
| 42 | + String caPath; |
| 43 | + int proxyPort = 0; |
| 44 | + String topic = "test/topic"; |
| 45 | + } |
| 46 | + |
| 47 | + private static void printHelpAndExit(int code) { |
| 48 | + System.out.println("Basic Discovery Sample\n"); |
| 49 | + System.out.println("Required:"); |
| 50 | + System.out.println(" --cert <CERTIFICATE> Path to certificate file (PEM)"); |
| 51 | + System.out.println(" --key <PRIVATE_KEY> Path to private key file (PEM)"); |
| 52 | + System.out.println(" --region <REGION> The region to connect through"); |
| 53 | + System.out.println(" --thing_name <THING_NAME> The name assigned to your IoT Thing"); |
| 54 | + System.out.println("\nOptional:"); |
| 55 | + System.out.println(" --print_discover_resp_only <PRINT_DISC_RESPONSE> (optional, default='False')"); |
| 56 | + System.out.println(" --mode <MODE> The operation mode can be set to 'subscribe' 'publish' or 'both'(default)"); |
| 57 | + System.out.println(" --ca_file <CA_FILE> Path to optional CA bundle (PEM)"); |
| 58 | + System.out.println(" --proxy_host <PROXY_HOST> HTTP proxy host"); |
| 59 | + System.out.println(" --proxy_port <PROXY_PORT> HTTP proxy port"); |
| 60 | + System.out.println(" --topic <TOPIC> Topic to use (default: test/topic)"); |
| 61 | + System.exit(code); |
| 62 | + } |
| 63 | + |
| 64 | + private static Args parseArgs(String[] argv) { |
| 65 | + if (argv.length == 0 || Arrays.asList(argv).contains("--help")) { |
| 66 | + printHelpAndExit(0); |
| 67 | + } |
| 68 | + Args a = new Args(); |
| 69 | + for (int i = 0; i < argv.length; i++) { |
| 70 | + String k = argv[i]; |
| 71 | + String v = (i + 1 < argv.length) ? argv[i + 1] : null; |
| 72 | + |
| 73 | + switch (k) { |
| 74 | + case "--cert": a.certPath = v; i++; break; |
| 75 | + case "--key": a.keyPath = v; i++; break; |
| 76 | + case "--region": a.region = v; i++; break; |
| 77 | + case "--thing_name": a.thingName = v; i++; break; |
| 78 | + case "--print_discover_resp_only": a.printDiscoveryRespOnly = Boolean.valueOf(v); |
| 79 | + case "--mode": a.mode = v; i++; break; |
| 80 | + case "--proxy_host": a.proxyHost = v; i++; break; |
| 81 | + case "--proxy_port": a.proxyPort = Integer.parseInt(v); i++; break; |
| 82 | + case "--ca_file": a.caPath = v; i++; break; |
| 83 | + case "--topic": a.topic = v; i++; break; |
| 84 | + default: |
| 85 | + System.err.println("Unknown arg: " + k); |
| 86 | + printHelpAndExit(2); |
| 87 | + } |
| 88 | + } |
| 89 | + if (a.certPath == null || a.keyPath == null || a.region == null || a.thingName == null) { |
| 90 | + System.err.println("Missing required arguments."); |
| 91 | + printHelpAndExit(2); |
| 92 | + } |
| 93 | + return a; |
| 94 | + } |
| 95 | + // ------------------------- ARGUMENT PARSING END --------------------- |
| 96 | + |
| 97 | + static Args args; |
| 98 | + |
| 99 | + public static void main(String[] argv) { |
| 100 | + args = parseArgs(argv); |
| 101 | + |
| 102 | + try (final TlsContextOptions tlsCtxOptions = TlsContextOptions.createWithMtlsFromPath(args.certPath, args.keyPath)) { |
| 103 | + if (TlsContextOptions.isAlpnSupported()) { |
| 104 | + tlsCtxOptions.withAlpnList(TLS_EXT_ALPN); |
| 105 | + } |
| 106 | + if (args.caPath != null) { |
| 107 | + tlsCtxOptions.overrideDefaultTrustStoreFromPath(null, args.caPath); |
| 108 | + } |
| 109 | + HttpProxyOptions proxyOptions = null; |
| 110 | + if (args.proxyHost != null && args.proxyPort > 0) { |
| 111 | + proxyOptions = new HttpProxyOptions(); |
| 112 | + proxyOptions.setHost(args.proxyHost); |
| 113 | + proxyOptions.setPort(args.proxyPort); |
| 114 | + } |
| 115 | + |
| 116 | + try ( |
| 117 | + final SocketOptions socketOptions = new SocketOptions(); |
| 118 | + final DiscoveryClientConfig discoveryClientConfig = |
| 119 | + new DiscoveryClientConfig(tlsCtxOptions, socketOptions, args.region, 1, proxyOptions); |
| 120 | + final DiscoveryClient discoveryClient = new DiscoveryClient(discoveryClientConfig)) { |
| 121 | + |
| 122 | + DiscoverResponse response = discoveryClient.discover(args.thingName).get(60, TimeUnit.SECONDS); |
| 123 | + printGreengrassGroupList(response.getGGGroups(), ""); |
| 124 | + |
| 125 | + if (args.printDiscoveryRespOnly == false) { |
| 126 | + try (final MqttClientConnection connection = getClientFromDiscovery(discoveryClient)) { |
| 127 | + if ("subscribe".equals(args.mode) || "both".equals(args.mode)) { |
| 128 | + final CompletableFuture<Integer> subFuture = connection.subscribe(args.topic, QualityOfService.AT_MOST_ONCE, message -> { |
| 129 | + System.out.println(String.format("Message received on topic %s: %s", |
| 130 | + message.getTopic(), new String(message.getPayload(), StandardCharsets.UTF_8))); |
| 131 | + }); |
| 132 | + subFuture.get(); |
| 133 | + } |
| 134 | + |
| 135 | + final Scanner scanner = new Scanner(System.in); |
| 136 | + while (true) { |
| 137 | + String input = null; |
| 138 | + if ("publish".equals(args.mode) || "both".equals(args.mode)) { |
| 139 | + System.out.println("Enter the message you want to publish to topic " + args.topic + " and press Enter. " + |
| 140 | + "Type 'exit' or 'quit' to exit this program: "); |
| 141 | + input = scanner.nextLine(); |
| 142 | + } |
| 143 | + |
| 144 | + if ("exit".equals(input) || "quit".equals(input)) { |
| 145 | + System.out.println("Terminating..."); |
| 146 | + break; |
| 147 | + } |
| 148 | + |
| 149 | + if ("publish".equals(args.mode) || "both".equals(args.mode)) { |
| 150 | + final CompletableFuture<Integer> publishResult = connection.publish(new MqttMessage(args.topic, |
| 151 | + input.getBytes(StandardCharsets.UTF_8), QualityOfService.AT_MOST_ONCE, false)); |
| 152 | + Integer result = publishResult.get(); |
| 153 | + } |
| 154 | + } |
| 155 | + } |
| 156 | + } |
| 157 | + } |
| 158 | + } catch (CrtRuntimeException | InterruptedException | ExecutionException | TimeoutException ex) { |
| 159 | + System.out.println("Exception thrown: " + ex.toString()); |
| 160 | + ex.printStackTrace(); |
| 161 | + } |
| 162 | + CrtResource.waitForNoResources(); |
| 163 | + System.out.println("Complete!"); |
| 164 | + } |
| 165 | + |
| 166 | + private static void printGreengrassGroupList(List<GGGroup> groupList, String prefix) { |
| 167 | + for (int i = 0; i < groupList.size(); i++) { |
| 168 | + GGGroup group = groupList.get(i); |
| 169 | + System.out.println(prefix + "Group ID: " + group.getGGGroupId()); |
| 170 | + printGreengrassCoreList(group.getCores(), " "); |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + private static void printGreengrassCoreList(List<GGCore> coreList, String prefix) { |
| 175 | + for (int i = 0; i < coreList.size(); i++) { |
| 176 | + GGCore core = coreList.get(i); |
| 177 | + System.out.println(prefix + "Thing ARN: " + core.getThingArn()); |
| 178 | + printGreengrassConnectivityList(core.getConnectivity(), prefix + " "); |
| 179 | + } |
| 180 | + } |
| 181 | + |
| 182 | + private static void printGreengrassConnectivityList(List<ConnectivityInfo> connectivityList, String prefix) { |
| 183 | + for (int i = 0; i < connectivityList.size(); i++) { |
| 184 | + ConnectivityInfo connectivityInfo = connectivityList.get(i); |
| 185 | + System.out.println(prefix + "Connectivity ID: " + connectivityInfo.getId()); |
| 186 | + System.out.println(prefix + "Connectivity Host Address: " + connectivityInfo.getHostAddress()); |
| 187 | + System.out.println(prefix + "Connectivity Port: " + connectivityInfo.getPortNumber()); |
| 188 | + } |
| 189 | + } |
| 190 | + |
| 191 | + private static MqttClientConnection getClientFromDiscovery(final DiscoveryClient discoveryClient |
| 192 | + ) throws ExecutionException, InterruptedException { |
| 193 | + final CompletableFuture<DiscoverResponse> futureResponse = discoveryClient.discover(args.thingName); |
| 194 | + final DiscoverResponse response = futureResponse.get(); |
| 195 | + |
| 196 | + if (response.getGGGroups() == null) { |
| 197 | + throw new RuntimeException("ThingName " + args.thingName + " does not have a Greengrass group/core configuration"); |
| 198 | + } |
| 199 | + final Optional<GGGroup> groupOpt = response.getGGGroups().stream().findFirst(); |
| 200 | + if (!groupOpt.isPresent()) { |
| 201 | + throw new RuntimeException("ThingName " + args.thingName + " does not have a Greengrass group/core configuration"); |
| 202 | + } |
| 203 | + |
| 204 | + final GGGroup group = groupOpt.get(); |
| 205 | + final GGCore core = group.getCores().stream().findFirst().get(); |
| 206 | + |
| 207 | + for (ConnectivityInfo connInfo : core.getConnectivity()) { |
| 208 | + final String dnsOrIp = connInfo.getHostAddress(); |
| 209 | + final Integer port = connInfo.getPortNumber(); |
| 210 | + |
| 211 | + System.out.printf("Connecting to group ID %s, with thing arn %s, using endpoint %s:%d%n", |
| 212 | + group.getGGGroupId(), core.getThingArn(), dnsOrIp, port); |
| 213 | + |
| 214 | + try (final AwsIotMqttConnectionBuilder connectionBuilder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(args.certPath, args.keyPath) |
| 215 | + .withClientId(args.thingName) |
| 216 | + .withPort(port) |
| 217 | + .withEndpoint(dnsOrIp) |
| 218 | + .withConnectionEventCallbacks(new MqttClientConnectionEvents() { |
| 219 | + @Override |
| 220 | + public void onConnectionInterrupted(int errorCode) { |
| 221 | + System.out.println("Connection interrupted: " + errorCode); |
| 222 | + } |
| 223 | + |
| 224 | + @Override |
| 225 | + public void onConnectionResumed(boolean sessionPresent) { |
| 226 | + System.out.println("Connection resumed!"); |
| 227 | + } |
| 228 | + })) { |
| 229 | + if (group.getCAs() != null) { |
| 230 | + connectionBuilder.withCertificateAuthority(group.getCAs().get(0)); |
| 231 | + } |
| 232 | + |
| 233 | + try (MqttClientConnection connection = connectionBuilder.build()) { |
| 234 | + if (connection.connect().get()) { |
| 235 | + System.out.println("Session resumed"); |
| 236 | + } else { |
| 237 | + System.out.println("Started a clean session"); |
| 238 | + } |
| 239 | + |
| 240 | + /* This lets the connection escape the try block without getting cleaned up */ |
| 241 | + connection.addRef(); |
| 242 | + return connection; |
| 243 | + } catch (Exception e) { |
| 244 | + System.out.println(String.format("Connection failed with exception %s", e.toString())); |
| 245 | + } |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + throw new RuntimeException("ThingName " + args.thingName + " could not connect to the green grass core using any of the endpoint connectivity options"); |
| 250 | + } |
| 251 | +} |
0 commit comments