File tree Expand file tree Collapse file tree 2 files changed +22
-11
lines changed
kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user Expand file tree Collapse file tree 2 files changed +22
-11
lines changed Original file line number Diff line number Diff line change @@ -96,13 +96,10 @@ public Stream<? extends User> stream() throws IOException {
9696 Request request = requestFor ("users" + "?source-type=FitBit" ).build ();
9797 this .timedCachedUsers =
9898 this .<Users >makeRequest (request , USER_LIST_READER ).getUsers ().stream ()
99- .filter (u -> containedUsers .isEmpty () || containedUsers .contains (u .getId ()))
100- .filter ( // Filter for when a partial update is ongoing
101- (User u ) ->
102- u .getEndDate () != null
103- && u .getStartDate () != null
104- && u .getProjectId () != null
105- && u .getUserId () != null )
99+ .filter (
100+ u ->
101+ (containedUsers .isEmpty () || containedUsers .contains (u .getId ()))
102+ && u .isComplete ())
106103 .collect (Collectors .toSet ());
107104
108105 return this .timedCachedUsers .stream ();
Original file line number Diff line number Diff line change 2323import org .radarcns .kafka .ObservationKey ;
2424
2525public interface User {
26+ static SchemaAndValue computeObservationKey (AvroData avroData , User user ) {
27+ return avroData .toConnectData (
28+ ObservationKey .getClassSchema (),
29+ new ObservationKey (user .getProjectId (), user .getUserId (), user .getSourceId ()));
30+ }
31+
2632 String getId ();
33+
2734 String getExternalUserId ();
35+
2836 String getProjectId ();
37+
2938 String getUserId ();
39+
3040 Instant getStartDate ();
41+
3142 Instant getEndDate ();
43+
3244 String getSourceId ();
45+
3346 SchemaAndValue getObservationKey (AvroData avroData );
3447
35- static SchemaAndValue computeObservationKey (AvroData avroData , User user ) {
36- return avroData .toConnectData (
37- ObservationKey .getClassSchema (),
38- new ObservationKey (user .getProjectId (), user .getUserId (), user .getSourceId ()));
48+ default Boolean isComplete () {
49+ return getEndDate () != null
50+ && getStartDate () != null
51+ && getProjectId () != null
52+ && getUserId () != null ;
3953 }
4054}
You can’t perform that action at this time.
0 commit comments