Skip to content

Commit 0bdcf8c

Browse files
author
Iulius Hutuleac
committed
fixes
1 parent cecd042 commit 0bdcf8c

File tree

7 files changed

+19
-11
lines changed

7 files changed

+19
-11
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"java.configuration.updateBuildConfiguration": "automatic"
3+
}

kafka-connect-http-infra/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>com.github.castorm</groupId>
55
<artifactId>kafka-connect-http-parent</artifactId>
6-
<version>0.8.14</version>
6+
<version>0.8.15</version>
77
<relativePath>../pom.xml</relativePath>
88
</parent>
99

kafka-connect-http-test/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<artifactId>kafka-connect-http-parent</artifactId>
55
<groupId>com.github.castorm</groupId>
6-
<version>0.8.14</version>
6+
<version>0.8.15</version>
77
<relativePath>../pom.xml</relativePath>
88
</parent>
99
<modelVersion>4.0.0</modelVersion>

kafka-connect-http/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<artifactId>kafka-connect-http-parent</artifactId>
55
<groupId>com.github.castorm</groupId>
6-
<version>0.8.14</version>
6+
<version>0.8.15</version>
77
<relativePath>../pom.xml</relativePath>
88
</parent>
99
<modelVersion>4.0.0</modelVersion>

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public List<SourceRecord> poll() throws InterruptedException {
130130
long autoOffset = 0;
131131
long autoDateIncrement = 0;
132132
long autoDateBackoff = 0;
133+
boolean pagingEnabled = nextPageOffsetField != null && !nextPageOffsetField.isEmpty() && hasNextPageField != null && !hasNextPageField.isEmpty();
133134

134135
if( autoDateInitialOffset != null && !autoDateInitialOffset.isEmpty()) {
135136
try {
@@ -148,11 +149,15 @@ public List<SourceRecord> poll() throws InterruptedException {
148149

149150
offset.setValue(AUTOTIMESTAMP, autoOffset);
150151
}
151-
offset.setValue(nextPageOffsetField, "");
152-
offset.setValue(hasNextPageField, "");
152+
153153
String hasNextPageFlag = "true";
154154
String nextPageValue = "";
155155

156+
if(pagingEnabled){
157+
offset.setValue(nextPageOffsetField, "");
158+
offset.setValue(hasNextPageField, "");
159+
}
160+
156161
while(hasNextPageFlag.matches("true")) {
157162
HttpRequest request = requestFactory.createRequest(offset);
158163

@@ -166,11 +171,9 @@ public List<SourceRecord> poll() throws InterruptedException {
166171

167172
if(!records.isEmpty()) {
168173
allRecords.addAll(records);
169-
nextPageValue = (String) records.get(0).sourceOffset().get(nextPageOffsetField);
170-
hasNextPageFlag = (String) records.get(0).sourceOffset().get(hasNextPageField);
171-
172-
if(hasNextPageFlag != null && hasNextPageFlag.matches("true") && !nextPageValue.trim().isEmpty()) {
173-
log.info("Request for next page {}", nextPageValue);
174+
if(pagingEnabled){
175+
nextPageValue = (String) records.get(0).sourceOffset().get(nextPageOffsetField);
176+
hasNextPageFlag = (String) records.get(0).sourceOffset().get(hasNextPageField);
174177
offset.setValue(nextPageOffsetField, nextPageValue);
175178
offset.setValue(hasNextPageField, hasNextPageFlag);
176179
} else {

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public void configure(Map<String, ?> configs) {
8989
.connectTimeout(config.getConnectionTimeoutMillis(), MILLISECONDS)
9090
.readTimeout(config.getReadTimeoutMillis(), MILLISECONDS)
9191
.retryOnConnectionFailure(true)
92+
.followRedirects(false)
93+
.followSslRedirects(false)
9294
.addInterceptor(createLoggingInterceptor())
9395
.addInterceptor(chain -> chain.proceed(authorize(chain.request())))
9496
.authenticator((route, response) -> authorize(response.request()))

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.github.castorm</groupId>
77
<artifactId>kafka-connect-http-parent</artifactId>
8-
<version>0.8.14</version>
8+
<version>0.8.15</version>
99
<packaging>pom</packaging>
1010

1111
<name>Kafka Connect HTTP Parent</name>

0 commit comments

Comments
 (0)