Skip to content

Commit cecd042

Browse files
author
Iulius Hutuleac
committed
add an automatic date incrementer
1 parent 1146dd2 commit cecd042

File tree

3 files changed

+99
-4
lines changed

3 files changed

+99
-4
lines changed

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ class HttpSourceConnectorConfig extends AbstractConfig {
5757
private static final String OFFSET_INITIAL = "http.offset.initial";
5858
private static final String NEXT_PAGE_OFFSET = "http.offset.nextpage";
5959
private static final String HAS_NEXT_PAGE = "http.offset.hasnextpage";
60+
public static final String AUTODATE_INITIAL_OFFSET = "http.offset.date_initial_date";
61+
public static final String AUTODATE_INCREMENT = "http.offset.date_increment";
62+
public static final String AUTODATE_BACKOFF = "http.offset.date_backoff";
63+
6064

6165
private final TimerThrottler throttler;
6266
private final HttpRequestFactory requestFactory;
@@ -67,6 +71,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
6771
private final Map<String, String> initialOffset;
6872
private String nextPageOffsetField;
6973
private String hasNextPageField;
74+
private String autoDateInitialOffset;
75+
private String autoDateIncrement;
76+
private String autoDateBackoff;
7077

7178
HttpSourceConnectorConfig(Map<String, ?> originals) {
7279
super(config(), originals);
@@ -80,6 +87,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
8087
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
8188
nextPageOffsetField = getString(NEXT_PAGE_OFFSET);
8289
hasNextPageField = getString(HAS_NEXT_PAGE);
90+
autoDateInitialOffset = getString(AUTODATE_INITIAL_OFFSET);
91+
autoDateIncrement = getString(AUTODATE_INCREMENT);
92+
autoDateBackoff = getString(AUTODATE_BACKOFF);
8393
}
8494

8595
public static ConfigDef config() {
@@ -92,6 +102,9 @@ public static ConfigDef config() {
92102
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
93103
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
94104
.define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset")
95-
.define(HAS_NEXT_PAGE, STRING, "", HIGH, "Has Next Page");
105+
.define(HAS_NEXT_PAGE, STRING, "", HIGH, "Has Next Page")
106+
.define(AUTODATE_INITIAL_OFFSET, STRING, "", HIGH, "Automatic Date Initial Offset")
107+
.define(AUTODATE_INCREMENT, STRING, "", HIGH, "Automatic Date Increment")
108+
.define(AUTODATE_BACKOFF, STRING, "", HIGH, "Automatic Date Backoff");
96109
}
97110
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
import edu.emory.mathcs.backport.java.util.Collections;
3535
import lombok.Getter;
3636
import lombok.extern.slf4j.Slf4j;
37+
import java.time.Instant;
38+
import java.time.LocalDateTime;
39+
import java.time.ZoneId;
40+
import java.time.format.DateTimeFormatter;
41+
import java.time.format.DateTimeParseException;
3742
import org.apache.kafka.clients.producer.RecordMetadata;
3843
import org.apache.kafka.connect.errors.RetriableException;
3944
import org.apache.kafka.connect.source.SourceRecord;
@@ -42,6 +47,7 @@
4247
import java.io.IOException;
4348
import java.time.Instant;
4449
import java.util.ArrayList;
50+
import java.util.Date;
4551
import java.util.List;
4652
import java.util.Map;
4753
import java.util.function.Function;
@@ -55,6 +61,7 @@
5561
@Slf4j
5662
public class HttpSourceTask extends SourceTask {
5763

64+
public static final String AUTOTIMESTAMP = "AUTOTIMESTAMP";
5865
private final Function<Map<String, String>, HttpSourceConnectorConfig> configFactory;
5966

6067
private TimerThrottler throttler;
@@ -74,6 +81,10 @@ public class HttpSourceTask extends SourceTask {
7481
private String nextPageOffsetField;
7582
private String hasNextPageField;
7683

84+
private String autoDateInitialOffset;
85+
private String sautoDateIncrement;
86+
private String sautoDateBackoff;
87+
7788
@Getter
7889
private Offset offset;
7990

@@ -99,6 +110,10 @@ public void start(Map<String, String> settings) {
99110
offset = loadOffset(config.getInitialOffset());
100111
nextPageOffsetField = config.getNextPageOffsetField();
101112
hasNextPageField = config.getHasNextPageField();
113+
114+
autoDateInitialOffset = config.getAutoDateInitialOffset();
115+
sautoDateIncrement = config.getAutoDateIncrement();
116+
sautoDateBackoff = config.getAutoDateBackoff();
102117
}
103118

104119
private Offset loadOffset(Map<String, String> initialOffset) {
@@ -110,12 +125,34 @@ private Offset loadOffset(Map<String, String> initialOffset) {
110125
public List<SourceRecord> poll() throws InterruptedException {
111126

112127
throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));
128+
List<SourceRecord> allRecords = new ArrayList<>();
129+
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
130+
long autoOffset = 0;
131+
long autoDateIncrement = 0;
132+
long autoDateBackoff = 0;
133+
134+
if( autoDateInitialOffset != null && !autoDateInitialOffset.isEmpty()) {
135+
try {
136+
LocalDateTime dateTime = LocalDateTime.parse(autoDateInitialOffset, formatter);
137+
Instant timestamp = dateTime.atZone(ZoneId.of("UTC")).toInstant();
138+
autoDateIncrement = Long.parseLong(sautoDateIncrement);
139+
autoDateBackoff = Long.parseLong(sautoDateBackoff);
140+
autoOffset = timestamp.toEpochMilli();
141+
} catch (Exception e) {
142+
e.printStackTrace();
143+
}
144+
145+
if (offset.toMap().containsKey(AUTOTIMESTAMP)) {
146+
autoOffset = (Long) offset.toMap().get(AUTOTIMESTAMP);
147+
}
148+
149+
offset.setValue(AUTOTIMESTAMP, autoOffset);
150+
}
113151
offset.setValue(nextPageOffsetField, "");
114152
offset.setValue(hasNextPageField, "");
115153
String hasNextPageFlag = "true";
116154
String nextPageValue = "";
117155

118-
List<SourceRecord> allRecords = new ArrayList<>();
119156
while(hasNextPageFlag.matches("true")) {
120157
HttpRequest request = requestFactory.createRequest(offset);
121158

@@ -139,17 +176,28 @@ public List<SourceRecord> poll() throws InterruptedException {
139176
} else {
140177
hasNextPageFlag = "";
141178
}
142-
143179
} else {
144180
hasNextPageFlag = "";
145181
}
146-
Thread.sleep(1000);
182+
Thread.sleep(300);
147183
}
148184

149185
List<SourceRecord> unseenRecords = recordSorter.sort(allRecords).stream()
150186
.filter(recordFilterFactory.create(offset))
151187
.collect(toList());
152188

189+
if(autoDateInitialOffset != null && !autoDateInitialOffset.isEmpty()) {
190+
autoOffset = autoOffset + autoDateIncrement - autoDateBackoff;
191+
if (autoOffset > new Date().getTime()) {
192+
autoOffset = new Date().getTime() - autoDateBackoff;
193+
}
194+
for(SourceRecord s: allRecords) {
195+
((Map<String,Long>)s.sourceOffset()).put(AUTOTIMESTAMP, Long.valueOf(autoOffset));
196+
}
197+
offset.setValue(AUTOTIMESTAMP, autoOffset);
198+
log.info("AutoOffset Patch {}", offset.toString());
199+
}
200+
153201
log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), allRecords.size());
154202

155203
confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords));

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,40 @@ void givenTaskInitializedWithoutRestoredOffsetButWithInitialOffset_whenStart_the
144144
assertThat(task.getOffset()).isEqualTo(Offset.of(offsetInitialMap));
145145
}
146146

147+
@Test
148+
void givenTaskAutotimer() throws IOException, InterruptedException {
149+
150+
Map<String, Object> offsetMap = new HashMap<>(ImmutableMap.of("AUTOTIMESTAMP", 1672531200000L));
151+
Map<String, Object> offsetInitialMap = new HashMap<>(ImmutableMap.of("AUTOTIMESTAMP", 1672617599000L));
152+
Offset offset = Offset.of(offsetMap);
153+
HttpRequest request = HttpRequest.builder().build();
154+
HttpResponse response = HttpResponse.builder().build();
155+
156+
givenTaskConfiguration();
157+
given(config.getAutoDateInitialOffset()).willReturn("2023-01-01T00:00:00Z");
158+
given(config.getAutoDateIncrement()).willReturn("86400000");
159+
given(config.getAutoDateBackoff()).willReturn("1000");
160+
161+
task.initialize(getContext(offsetMap));
162+
task.start(emptyMap());
163+
164+
given(requestFactory.createRequest(offset)).willReturn(request);
165+
given(client.execute(request)).willReturn(response);
166+
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
167+
given(recordSorter.sort(asList(record(offsetMap))))
168+
.willReturn(asList(record(offsetMap(1)), record(offsetMap(2)), record(offsetMap(3))));
169+
given(recordFilterFactory.create(offset)).willReturn(__ -> true);
170+
171+
task.initialize(getContext(emptyMap()));
172+
173+
task.start(emptyMap());
174+
175+
task.poll();
176+
177+
System.out.println(task.getOffset());
178+
assertThat(task.getOffset()).isEqualTo(Offset.of(offsetInitialMap));
179+
}
180+
147181
@Test
148182
void givenTaskInitialized_whenStart_thenGetPollIntervalMillis() {
149183

0 commit comments

Comments
 (0)