2727import java .util .Collection ;
2828import java .util .Collections ;
2929import java .util .HashMap ;
30- import java .util .Iterator ;
3130import java .util .Map ;
32- import java .util .Objects ;
3331import java .util .Optional ;
34- import java .util .OptionalDouble ;
35- import java .util .OptionalLong ;
3632import java .util .concurrent .TimeUnit ;
3733import java .util .stream .Collectors ;
3834import java .util .stream .Stream ;
3935import java .util .stream .StreamSupport ;
4036import okhttp3 .Response ;
4137import okhttp3 .ResponseBody ;
38+ import org .apache .avro .Schema .Field ;
4239import org .apache .avro .generic .IndexedRecord ;
4340import org .apache .kafka .connect .data .SchemaAndValue ;
4441import org .apache .kafka .connect .source .SourceRecord ;
@@ -86,7 +83,7 @@ public Collection<SourceRecord> convert(
8683 double timeReceived = System .currentTimeMillis () / 1000d ;
8784
8885 return processRecords ((FitbitRestRequest )restRequest , activities , timeReceived )
89- .filter (Objects :: nonNull )
86+ .filter (t -> validateRecord (( FitbitRestRequest ) restRequest , t ) )
9087 .map (t -> {
9188 SchemaAndValue avro = avroData .toConnectData (t .value .getSchema (), t .value );
9289 Map <String , ?> offset = Collections .singletonMap (
@@ -98,6 +95,22 @@ public Collection<SourceRecord> convert(
9895 .collect (Collectors .toList ());
9996 }
10097
98+ private boolean validateRecord (FitbitRestRequest request , TopicData record ) {
99+ if (record == null ) {
100+ return false ;
101+ }
102+ Instant endDate = request .getUser ().getEndDate ();
103+ if (endDate == null ) {
104+ return true ;
105+ }
106+ Field timeField = record .value .getSchema ().getField ("time" );
107+ if (timeField != null ) {
108+ long time = (long ) (((Double )record .value .get (timeField .pos ()) * 1000.0 ));
109+ return Instant .ofEpochMilli (time ).isBefore (endDate );
110+ }
111+ return true ;
112+ }
113+
101114 /** Process the JSON records generated by given request. */
102115 protected abstract Stream <TopicData > processRecords (
103116 FitbitRestRequest request ,
0 commit comments