Skip to content

Commit b6ecad0

Browse files
committed
Rework rest request to convert records out of requests
1 parent 6ed9da6 commit b6ecad0

File tree

5 files changed

+23
-20
lines changed

5 files changed

+23
-20
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitAvroConverter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@
3333
import java.util.stream.Collectors;
3434
import java.util.stream.Stream;
3535
import java.util.stream.StreamSupport;
36-
import okhttp3.Response;
37-
import okhttp3.ResponseBody;
36+
import okhttp3.Headers;
3837
import org.apache.avro.Schema.Field;
3938
import org.apache.avro.generic.IndexedRecord;
4039
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -71,12 +70,11 @@ public FitbitAvroConverter(AvroData avroData) {
7170

7271
@Override
7372
public Collection<SourceRecord> convert(
74-
RestRequest restRequest, Response response) throws IOException {
75-
ResponseBody body = response.body();
76-
if (body == null) {
73+
RestRequest restRequest, Headers headers, byte[] data) throws IOException {
74+
if (data == null) {
7775
throw new IOException("Failed to read body");
7876
}
79-
JsonNode activities = JSON_READER.readTree(body.charStream());
77+
JsonNode activities = JSON_READER.readTree(data);
8078

8179
User user = ((FitbitRestRequest) restRequest).getUser();
8280
final SchemaAndValue key = user.getObservationKey(avroData);

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.Map;
26+
import okhttp3.Headers;
2627
import okhttp3.Response;
2728
import okhttp3.ResponseBody;
2829
import org.apache.kafka.connect.data.Schema;
@@ -36,15 +37,13 @@ public class BytesPayloadConverter implements PayloadToSourceRecordConverter {
3637

3738
// Just bytes for incoming messages
3839
@Override
39-
public Collection<SourceRecord> convert(RestRequest request, Response response) throws IOException {
40+
public Collection<SourceRecord> convert(RestRequest request, Headers headers, byte[] data) {
4041
Map<String, Long> sourceOffset = Collections.singletonMap(
4142
TIMESTAMP_OFFSET_KEY, currentTimeMillis());
42-
ResponseBody body = response.body();
43-
byte[] result = body != null ? body.bytes() : null;
44-
String topic = topicSelector.getTopic(request, result);
43+
String topic = topicSelector.getTopic(request, data);
4544
return Collections.singleton(
4645
new SourceRecord(request.getPartition(), sourceOffset,
47-
topic, Schema.BYTES_SCHEMA, result));
46+
topic, Schema.BYTES_SCHEMA, data));
4847
}
4948

5049
@Override

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/PayloadToSourceRecordConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.time.Instant;
2323
import java.time.temporal.TemporalAmount;
2424
import java.util.Collection;
25-
import okhttp3.Response;
25+
import okhttp3.Headers;
2626
import org.apache.kafka.connect.source.SourceRecord;
2727
import org.radarbase.connect.rest.config.RestSourceTool;
2828
import org.radarbase.connect.rest.request.RestRequest;
@@ -33,7 +33,7 @@ public interface PayloadToSourceRecordConverter extends RestSourceTool {
3333
TemporalAmount NEAR_FUTURE = Duration.ofDays(31L);
3434

3535
Collection<SourceRecord> convert(
36-
RestRequest request, Response response) throws IOException;
36+
RestRequest request, Headers headers, byte[] data) throws IOException;
3737

3838
static Instant nearFuture() {
3939
return Instant.now().plus(NEAR_FUTURE);

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/StringPayloadConverter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919

2020
import static java.lang.System.currentTimeMillis;
2121

22-
import java.io.IOException;
22+
import java.nio.charset.StandardCharsets;
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.Map;
26-
import okhttp3.Response;
27-
import okhttp3.ResponseBody;
26+
import okhttp3.Headers;
2827
import org.apache.kafka.connect.data.Schema;
2928
import org.apache.kafka.connect.source.SourceRecord;
3029
import org.radarbase.connect.rest.RestSourceConnectorConfig;
@@ -35,10 +34,9 @@ public class StringPayloadConverter implements PayloadToSourceRecordConverter {
3534
private TopicSelector topicSelector;
3635

3736
@Override
38-
public Collection<SourceRecord> convert(RestRequest request, Response response) throws IOException {
37+
public Collection<SourceRecord> convert(RestRequest request, Headers headers, byte[] data) {
3938
Map<String, Long> sourceOffset = Collections.singletonMap(TIMESTAMP_OFFSET_KEY, currentTimeMillis());
40-
ResponseBody body = response.body();
41-
String result = body == null ? null : body.string();
39+
String result = data == null ? null : new String(data, StandardCharsets.UTF_8);
4240
String topic = topicSelector.getTopic(request, result);
4341
return Collections.singleton(
4442
new SourceRecord(request.getPartition(), sourceOffset, topic,

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.util.Map;
2323
import java.util.function.Predicate;
2424
import java.util.stream.Stream;
25+
import okhttp3.Headers;
2526
import okhttp3.OkHttpClient;
2627
import okhttp3.Request;
2728
import okhttp3.Response;
29+
import okhttp3.ResponseBody;
2830
import org.apache.kafka.connect.source.SourceRecord;
2931

3032
/**
@@ -82,18 +84,24 @@ public Stream<SourceRecord> handleRequest() throws IOException {
8284

8385
Collection<SourceRecord> records;
8486

87+
byte[] data;
88+
Headers headers;
8589
try (Response response = client.newCall(request).execute()) {
8690
if (!response.isSuccessful()) {
8791
route.requestFailed(this, response);
8892
return Stream.empty();
8993
}
9094

91-
records = route.converter().convert(this, response);
95+
headers = response.headers();
96+
ResponseBody body = response.body();
97+
data = body != null ? body.bytes() : null;
9298
} catch (IOException ex) {
9399
route.requestFailed(this, null);
94100
throw ex;
95101
}
96102

103+
records = route.converter().convert(this, headers, data);
104+
97105
if (records.isEmpty()) {
98106
route.requestEmpty(this);
99107
} else {

0 commit comments

Comments
 (0)