Skip to content

Commit 6bafaf9

Browse files
committed
rolled in review comments
1 parent 029f216 commit 6bafaf9

File tree

3 files changed

+53
-12762
lines changed

3 files changed

+53
-12762
lines changed

javav2/example_code/firehose/src/main/java/com/example/firehose/scenario/FirehoseScenario.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,20 @@ public class FirehoseScenario {
3737
private static String deliveryStreamName;
3838

3939
public static void main(String[] args) {
40-
firehoseClient = FirehoseClient.builder().region(Region.US_EAST_1).build();
41-
cloudWatchClient = CloudWatchClient.builder().region(Region.US_EAST_1).build();
40+
final String usage = """
4241
43-
// Replace with your region and delivery stream name
44-
deliveryStreamName = "stream35";
42+
Usage:
43+
<deliveryStreamName> \s
44+
45+
Where:
46+
deliveryStreamName - The data stream name.\s
47+
""";
48+
49+
if (args.length != 1) {
50+
System.out.println(usage);
51+
return;
52+
}
53+
deliveryStreamName = args[0];
4554

4655
/*
4756
See the Readme in the scenario folder for information about the sample_records.json file.
@@ -56,7 +65,7 @@ public static void main(String[] args) {
5665
new TypeReference<>() {}
5766
);
5867

59-
// Process individual records
68+
// Process individual records.
6069
System.out.println("Processing individual records...");
6170
sampleData.subList(0, 100).forEach(record -> {
6271
try {
@@ -70,15 +79,42 @@ public static void main(String[] args) {
7079

7180
// Process batch records.
7281
System.out.println("Processing batch records...");
73-
putRecordBatch(sampleData.subList(100, 200), 50);
82+
putRecordBatch(sampleData.subList(100, sampleData.size()), 50);
7483
monitorMetrics();
7584
} catch (Exception e) {
7685
System.out.println("Error processing records: " + e.getMessage());
7786
} finally {
7887
closeClients();
7988
}
8089

81-
System.out.println("This concludes the AWS Firehose scenario...");
90+
System.out.println("This concludes the Amazon Firehose scenario...");
91+
}
92+
93+
/**
94+
* Retrieves a singleton instance of the FirehoseClient.
95+
*
96+
* <p>If the {@code firehoseClient} instance is null, it is created using the
97+
* {@code FirehoseClient.create()} method. Otherwise, the existing instance is returned.</p>
98+
*
99+
* @return the FirehoseClient instance
100+
*/
101+
private static FirehoseClient getFirehoseClient() {
102+
if (firehoseClient == null) {
103+
firehoseClient = FirehoseClient.create();
104+
}
105+
return firehoseClient;
106+
}
107+
108+
/**
109+
* Retrieves a singleton instance of the cloudWatchClient.
110+
*
111+
* @return the CloudWatchClient instance
112+
*/
113+
private static CloudWatchClient getCloudWatchClient() {
114+
if (cloudWatchClient == null) {
115+
cloudWatchClient = CloudWatchClient.create();
116+
}
117+
return cloudWatchClient;
82118
}
83119

84120
/**
@@ -102,7 +138,7 @@ public static void putRecord(Map<String, Object> record) {
102138
.record(firehoseRecord)
103139
.build();
104140

105-
firehoseClient.putRecord(putRecordRequest);
141+
getFirehoseClient().putRecord(putRecordRequest);
106142
System.out.println("Record sent successfully: " + jsonRecord);
107143
} catch (Exception e) {
108144
System.out.println("Failed to send record. Error: " + e.getMessage());
@@ -140,7 +176,7 @@ public static void putRecordBatch(List<Map<String, Object>> records, int batchSi
140176
.records(batchRecords)
141177
.build();
142178

143-
PutRecordBatchResponse response = firehoseClient.putRecordBatch(request);
179+
PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request);
144180

145181
if (response.failedPutCount() > 0) {
146182
System.out.println("Failed to send " + response.failedPutCount() + " records in batch of " + batchRecords.size());
@@ -198,7 +234,7 @@ private static void monitorMetric(String metricName, Instant startTime, Instant
198234
.statistics(Statistic.SUM)
199235
.build();
200236

201-
GetMetricStatisticsResponse response = cloudWatchClient.getMetricStatistics(request);
237+
GetMetricStatisticsResponse response = getCloudWatchClient().getMetricStatistics(request);
202238
double totalSum = response.datapoints().stream()
203239
.mapToDouble(Datapoint::sum)
204240
.sum();

0 commit comments

Comments
 (0)