|
21 | 21 | import io.confluent.connect.avro.AvroData; |
22 | 22 |
|
23 | 23 | import java.util.stream.Stream; |
| 24 | +import java.util.stream.StreamSupport; |
24 | 25 |
|
25 | 26 | import org.radarbase.connect.rest.RestSourceConnectorConfig; |
26 | 27 | import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; |
@@ -50,36 +51,41 @@ public void initialize(RestSourceConnectorConfig config) { |
50 | 51 | @Override |
51 | 52 | protected Stream<TopicData> processRecords( |
52 | 53 | FitbitRestRequest request, JsonNode root, double timeReceived) { |
53 | | - JsonNode resting = root.get("activities-heart"); |
54 | | - if (resting == null || !resting.isObject()) { |
| 54 | + JsonNode activitiesHeart = root.get("activities-heart"); |
| 55 | + if (activitiesHeart == null || !activitiesHeart.isArray() || activitiesHeart.size() == 0) { |
55 | 56 | logger.info("No resting heart rate available from {} on the specified date", request.getRequest().url()); |
56 | 57 | return Stream.empty(); |
57 | 58 | } |
58 | 59 |
|
59 | | - JsonNode dateTimeNode = resting.get("dateTime"); |
60 | | - if (dateTimeNode == null) { |
61 | | - logger.warn("Failed to get resting heart rate from {}, {} : the 'dateTime' node is missing.", request.getRequest().url(), root); |
62 | | - return Stream.empty(); |
63 | | - } |
64 | | - String date = dateTimeNode.asText(); |
| 60 | + return StreamSupport.stream(activitiesHeart.spliterator(), false) |
| 61 | + .filter(entry -> entry != null && entry.isObject()) |
| 62 | + .map(entry -> { |
| 63 | + JsonNode dateTimeNode = entry.get("dateTime"); |
| 64 | + if (dateTimeNode == null) { |
| 65 | + logger.warn("Failed to get resting heart rate from {}, {} : the 'dateTime' node is missing.", request.getRequest().url(), root); |
| 66 | + return null; |
| 67 | + } |
| 68 | + String date = dateTimeNode.asText(); |
65 | 69 |
|
66 | | - JsonNode value = resting.get("value"); |
67 | | - if (value == null || !value.isObject()) { |
68 | | - logger.warn("Failed to get resting heart rate from {}, {} : the 'value' node is missing.", request.getRequest().url(), root); |
69 | | - return Stream.empty(); |
70 | | - } |
| 70 | + JsonNode value = entry.get("value"); |
| 71 | + if (value == null || !value.isObject()) { |
| 72 | + logger.warn("Failed to get resting heart rate from {}, {} : the 'value' node is missing.", request.getRequest().url(), root); |
| 73 | + return null; |
| 74 | + } |
71 | 75 |
|
72 | | - JsonNode restingHeartRateNode = value.get("restingHeartRate"); |
73 | | - if (restingHeartRateNode == null) { |
74 | | - logger.warn("Failed to get resting heart rate from {}, {} : the 'restingHeartRate' node is missing.", request.getRequest().url(), root); |
75 | | - return Stream.empty(); |
76 | | - } |
77 | | - int restingHeartRate = restingHeartRateNode.asInt(); |
| 76 | + JsonNode restingHeartRateNode = value.get("restingHeartRate"); |
| 77 | + if (restingHeartRateNode == null) { |
| 78 | + logger.warn("Failed to get resting heart rate from {}, {} : the 'restingHeartRate' node is missing.", request.getRequest().url(), root); |
| 79 | + return null; |
| 80 | + } |
| 81 | + int restingHeartRate = restingHeartRateNode.asInt(); |
78 | 82 |
|
79 | | - FitbitRestingHeartRate fitbitRestingHeartRate |
80 | | - = new FitbitRestingHeartRate(date, timeReceived, restingHeartRate); |
| 83 | + FitbitRestingHeartRate fitbitRestingHeartRate |
| 84 | + = new FitbitRestingHeartRate(date, timeReceived, restingHeartRate); |
81 | 85 |
|
82 | | - return Stream.of(new TopicData(request.getDateRange().start().toInstant(), |
83 | | - restingHeartRateTopic, fitbitRestingHeartRate)); |
| 86 | + return new TopicData(request.getDateRange().start().toInstant(), |
| 87 | + restingHeartRateTopic, fitbitRestingHeartRate); |
| 88 | + }) |
| 89 | + .filter(java.util.Objects::nonNull); |
84 | 90 | } |
85 | 91 | } |
0 commit comments