Skip to content

Commit 1146dd2

Browse files
author
Iulius Hutuleac
committed
add token caching and token headers
1 parent 18c628a commit 1146dd2

File tree

10 files changed

+157
-31
lines changed

10 files changed

+157
-31
lines changed

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
5656
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
5757
private static final String OFFSET_INITIAL = "http.offset.initial";
5858
private static final String NEXT_PAGE_OFFSET = "http.offset.nextpage";
59+
private static final String HAS_NEXT_PAGE = "http.offset.hasnextpage";
5960

6061
private final TimerThrottler throttler;
6162
private final HttpRequestFactory requestFactory;
@@ -64,7 +65,8 @@ class HttpSourceConnectorConfig extends AbstractConfig {
6465
private final SourceRecordFilterFactory recordFilterFactory;
6566
private final SourceRecordSorter recordSorter;
6667
private final Map<String, String> initialOffset;
67-
private String nextPageOffset;
68+
private String nextPageOffsetField;
69+
private String hasNextPageField;
6870

6971
HttpSourceConnectorConfig(Map<String, ?> originals) {
7072
super(config(), originals);
@@ -76,7 +78,8 @@ class HttpSourceConnectorConfig extends AbstractConfig {
7678
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
7779
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
7880
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
79-
nextPageOffset = getString(NEXT_PAGE_OFFSET);
81+
nextPageOffsetField = getString(NEXT_PAGE_OFFSET);
82+
hasNextPageField = getString(HAS_NEXT_PAGE);
8083
}
8184

8285
public static ConfigDef config() {
@@ -88,6 +91,7 @@ public static ConfigDef config() {
8891
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
8992
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
9093
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
91-
.define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset");
94+
.define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset")
95+
.define(HAS_NEXT_PAGE, STRING, "", HIGH, "Has Next Page");
9296
}
9397
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,10 @@
4242
import java.io.IOException;
4343
import java.time.Instant;
4444
import java.util.ArrayList;
45-
import java.util.HashMap;
4645
import java.util.List;
4746
import java.util.Map;
4847
import java.util.function.Function;
4948

50-
import com.github.castorm.kafka.connect.common.ConfigUtils;
51-
52-
import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
5349
import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
5450
import static java.util.Collections.emptyList;
5551
import static java.util.Collections.emptyMap;
@@ -75,7 +71,8 @@ public class HttpSourceTask extends SourceTask {
7571

7672
private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());
7773

78-
private String nextPageOffset;
74+
private String nextPageOffsetField;
75+
private String hasNextPageField;
7976

8077
@Getter
8178
private Offset offset;
@@ -100,6 +97,8 @@ public void start(Map<String, String> settings) {
10097
recordSorter = config.getRecordSorter();
10198
recordFilterFactory = config.getRecordFilterFactory();
10299
offset = loadOffset(config.getInitialOffset());
100+
nextPageOffsetField = config.getNextPageOffsetField();
101+
hasNextPageField = config.getHasNextPageField();
103102
}
104103

105104
private Offset loadOffset(Map<String, String> initialOffset) {
@@ -111,34 +110,40 @@ private Offset loadOffset(Map<String, String> initialOffset) {
111110
public List<SourceRecord> poll() throws InterruptedException {
112111

113112
throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));
114-
offset.setValue(nextPageOffset, "");
115-
116-
boolean hasNextPage = true;
113+
offset.setValue(nextPageOffsetField, "");
114+
offset.setValue(hasNextPageField, "");
115+
String hasNextPageFlag = "true";
116+
String nextPageValue = "";
117117

118118
List<SourceRecord> allRecords = new ArrayList<>();
119-
while(hasNextPage) {
119+
while(hasNextPageFlag.matches("true")) {
120120
HttpRequest request = requestFactory.createRequest(offset);
121121

122+
log.info("Request for offset {}", offset.toString());
122123
log.info("Request for page {}", request.toString());
124+
log.info("Request for initial page {} hasNextPageFlag {}", nextPageValue, hasNextPageFlag);
123125

124126
HttpResponse response = execute(request);
125127

126128
List<SourceRecord> records = responseParser.parse(response);
127129

128130
if(!records.isEmpty()) {
129131
allRecords.addAll(records);
130-
String nextPage = (String) records.get(0).sourceOffset().get(nextPageOffset);
131-
if(nextPage != null && !nextPage.trim().isEmpty()) {
132-
log.info("Request for next page {}", nextPage);
133-
offset.setValue(nextPageOffset, nextPage);
132+
nextPageValue = (String) records.get(0).sourceOffset().get(nextPageOffsetField);
133+
hasNextPageFlag = (String) records.get(0).sourceOffset().get(hasNextPageField);
134+
135+
if(hasNextPageFlag != null && hasNextPageFlag.matches("true") && !nextPageValue.trim().isEmpty()) {
136+
log.info("Request for next page {}", nextPageValue);
137+
offset.setValue(nextPageOffsetField, nextPageValue);
138+
offset.setValue(hasNextPageField, hasNextPageFlag);
134139
} else {
135-
hasNextPage = false;
140+
hasNextPageFlag = "";
136141
}
137142

138143
} else {
139-
hasNextPage = false;
144+
hasNextPageFlag = "";
140145
}
141-
Thread.sleep(200);
146+
Thread.sleep(1000);
142147
}
143148

144149
List<SourceRecord> unseenRecords = recordSorter.sort(allRecords).stream()
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.github.castorm.kafka.connect.http.auth;
2+
3+
/*-
4+
* #%L
5+
* Kafka Connect HTTP
6+
* %%
7+
* Copyright (C) 2020 CastorM
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator;
24+
import okhttp3.Credentials;
25+
26+
import java.util.Map;
27+
import java.util.Optional;
28+
import java.util.function.Function;
29+
30+
import static org.apache.commons.lang.StringUtils.isEmpty;
31+
32+
public class BearerAuthenticator implements HttpAuthenticator {
33+
34+
private final Function<Map<String, ?>, BearerAuthenticatorConfig> configFactory;
35+
36+
Optional<String> header;
37+
38+
public BearerAuthenticator() {
39+
this(BearerAuthenticatorConfig::new);
40+
}
41+
42+
public BearerAuthenticator(Function<Map<String, ?>, BearerAuthenticatorConfig> configFactory) {
43+
this.configFactory = configFactory;
44+
}
45+
46+
@Override
47+
public void configure(Map<String, ?> configs) {
48+
49+
BearerAuthenticatorConfig config = configFactory.apply(configs);
50+
51+
if (!isEmpty(config.getBearer()) ) {
52+
header = Optional.of(config.getBearer());
53+
} else {
54+
header = Optional.empty();
55+
}
56+
}
57+
58+
@Override
59+
public Optional<String> getAuthorizationHeader() {
60+
return header;
61+
}
62+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.github.castorm.kafka.connect.http.auth;
2+
3+
/*-
4+
* #%L
5+
* kafka-connect-http
6+
* %%
7+
* Copyright (C) 2020 CastorM
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import lombok.Getter;
24+
import org.apache.kafka.common.config.AbstractConfig;
25+
import org.apache.kafka.common.config.ConfigDef;
26+
import org.apache.kafka.common.config.types.Password;
27+
28+
import java.util.Map;
29+
30+
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
31+
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
32+
33+
@Getter
34+
public class BearerAuthenticatorConfig extends AbstractConfig {
35+
36+
private static final String BEARER = "http.auth.bearer";
37+
38+
private final String bearer;
39+
40+
BearerAuthenticatorConfig(Map<String, ?> originals) {
41+
super(config(), originals);
42+
bearer = getString(BEARER);
43+
}
44+
45+
public static ConfigDef config() {
46+
return new ConfigDef()
47+
.define(BEARER, STRING, "", HIGH, "BEARER");
48+
}
49+
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ private HttpAuthenticator getAuthenticator(Map<String, ?> originals) {
4949
BasicHttpAuthenticator auth = new BasicHttpAuthenticator();
5050
auth.configure(originals);
5151
return auth;
52+
case BEARER:
53+
BearerAuthenticator bauth = new BearerAuthenticator();
54+
bauth.configure(originals);
55+
return bauth;
5256
case TOKEN_ENDPOINT:
5357
TokenEndpointAuthenticator tokenEndpointAuth = new TokenEndpointAuthenticator();
5458
tokenEndpointAuth.configure(originals);

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticationType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@
2121
*/
2222

2323
public enum HttpAuthenticationType {
24-
NONE, BASIC, TOKEN_ENDPOINT
24+
NONE, BASIC, TOKEN_ENDPOINT, BEARER
2525
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static okhttp3.logging.HttpLoggingInterceptor.Level.BASIC;
6868
import static okhttp3.logging.HttpLoggingInterceptor.Level.BODY;
6969
import static okhttp3.logging.HttpLoggingInterceptor.Level.NONE;
70+
import static okhttp3.logging.HttpLoggingInterceptor.Level.HEADERS;
7071
import static org.apache.commons.lang.StringUtils.isEmpty;
7172

7273
@Slf4j
@@ -109,9 +110,9 @@ private static HttpLoggingInterceptor createLoggingInterceptor() {
109110
if (log.isTraceEnabled()) {
110111
return new HttpLoggingInterceptor(log::trace).setLevel(BODY);
111112
} else if (log.isDebugEnabled()) {
112-
return new HttpLoggingInterceptor(log::debug).setLevel(BASIC);
113+
return new HttpLoggingInterceptor(log::debug).setLevel(HEADERS);
113114
} else {
114-
return new HttpLoggingInterceptor(log::info).setLevel(NONE);
115+
return new HttpLoggingInterceptor(log::info).setLevel(BASIC);
115116
}
116117
}
117118

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434
@EqualsAndHashCode
3535
public class Offset {
3636

37+
@Override
38+
public String toString() {
39+
return "Offset{" +
40+
"properties=" + properties +
41+
'}';
42+
}
43+
3744
private static final String KEY_KEY = "key";
3845

3946
private static final String TIMESTAMP_KEY = "timestamp";
@@ -79,4 +86,6 @@ public void setValue(String key, Object value) {
7986
public Optional<Instant> getTimestamp() {
8087
return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse);
8188
}
89+
90+
8291
}

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/StatusCodeHttpResponsePolicyTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,6 @@ void givenCodeProcess_whenResolve_thenProcess() {
6262
assertThat(policy.resolve(response.withCode(code))).isEqualTo(PROCESS);
6363
}
6464

65-
@Test
66-
void givenCodeSkip_whenResolve_thenSkip() {
67-
68-
given(config.getSkipCodes()).willReturn(Stream.of(code).collect(toSet()));
69-
policy.configure(emptyMap());
70-
71-
assertThat(policy.resolve(response.withCode(code))).isEqualTo(SKIP);
72-
}
7365

7466
@Test
7567
void givenCodeNoProcessNorSkip_whenResolve_thenFail() {

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
<logback.version>1.2.10</logback.version>
7878
<lombok.version>1.18.22</lombok.version>
7979
<kafka.version>3.6.1</kafka.version>
80-
<okhttp.version>4.9.3</okhttp.version>
80+
<okhttp.version>4.12.0</okhttp.version>
8181
<jackson.version>2.13.1</jackson.version>
8282
<freemarker.version>2.3.31</freemarker.version>
8383
<natty.version>0.13</natty.version>

0 commit comments

Comments
 (0)