Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit aeed71c

Browse files
[Fix][Connector-v2][Easysearch] Handle optional password in credentials and add clearScroll method (apache#10161)
1 parent dbad117 commit aeed71c

File tree

3 files changed

+75
-13
lines changed

3 files changed

+75
-13
lines changed

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,14 @@ private static RestClientBuilder getRestClientBuilder(
158158
restClientBuilder.setHttpClientConfigCallback(
159159
httpClientBuilder -> {
160160
if (username.isPresent()) {
161+
String passwordStr = null;
162+
if (password.isPresent()) {
163+
passwordStr = password.get();
164+
}
161165
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
162166
credentialsProvider.setCredentials(
163167
AuthScope.ANY,
164-
new UsernamePasswordCredentials(username.get(), password.get()));
168+
new UsernamePasswordCredentials(username.get(), passwordStr));
165169
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
166170
}
167171

@@ -289,6 +293,36 @@ public void close() {
289293
}
290294
}
291295

296+
public boolean clearScroll(String scrollId) {
297+
if (scrollId == null || scrollId.isEmpty()) {
298+
return false;
299+
}
300+
301+
String endpoint = "/_search/scroll";
302+
Request request = new Request("DELETE", endpoint);
303+
Map<String, String> param = new HashMap<>();
304+
param.put("scroll_id", scrollId);
305+
request.setJsonEntity(JsonUtils.toJsonString(param));
306+
307+
try {
308+
Response response = restClient.performRequest(request);
309+
if (response == null) {
310+
log.warn("DELETE {} response null when clearing scrollId {}", endpoint, scrollId);
311+
return false;
312+
}
313+
int statusCode = response.getStatusLine().getStatusCode();
314+
if (statusCode == HttpStatus.SC_OK) {
315+
return true;
316+
} else {
317+
log.warn("Failed to clear scrollId {}, status code={}", scrollId, statusCode);
318+
return false;
319+
}
320+
} catch (IOException e) {
321+
log.warn("Error clearing scrollId " + scrollId, e);
322+
return false;
323+
}
324+
}
325+
292326
/**
293327
* first time to request search documents by scroll call /${index}/_search?scroll=${scroll}
294328
*

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/source/EasysearchSourceReader.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,33 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
7474
EasysearchSourceSplit split = splits.poll();
7575
if (split != null) {
7676
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
77-
ScrollResult scrollResult =
78-
ezsClient.searchByScroll(
79-
sourceIndexInfo.getIndex(),
80-
sourceIndexInfo.getSource(),
81-
sourceIndexInfo.getQuery(),
82-
sourceIndexInfo.getScrollTime(),
83-
sourceIndexInfo.getScrollSize());
84-
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
85-
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
86-
scrollResult =
87-
ezsClient.searchWithScrollId(
88-
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
77+
String scrollId = null;
78+
try {
79+
ScrollResult scrollResult =
80+
ezsClient.searchByScroll(
81+
sourceIndexInfo.getIndex(),
82+
sourceIndexInfo.getSource(),
83+
sourceIndexInfo.getQuery(),
84+
sourceIndexInfo.getScrollTime(),
85+
sourceIndexInfo.getScrollSize());
86+
scrollId = scrollResult.getScrollId();
8987
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
88+
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
89+
scrollResult =
90+
ezsClient.searchWithScrollId(
91+
scrollResult.getScrollId(),
92+
sourceIndexInfo.getScrollTime());
93+
scrollId = scrollResult.getScrollId();
94+
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
95+
}
96+
} finally {
97+
if (scrollId != null && !scrollId.isEmpty()) {
98+
try {
99+
ezsClient.clearScroll(scrollId);
100+
} catch (Exception e) {
101+
log.warn("Failed to clear Easysearch scrollId: " + scrollId, e);
102+
}
103+
}
90104
}
91105
} else if (noMoreSplit) {
92106
// signal to the source that we have reached the end of the data.

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ private List<String> readSinkDataFromIndex(String indexName) throws InterruptedE
209209
query.put("range", range);
210210
ScrollResult scrollResult =
211211
easysearchClient.searchByScroll(indexName, source, query, "1m", 1000);
212+
String scrollId = scrollResult.getScrollId();
212213
scrollResult
213214
.getDocs()
214215
.forEach(
@@ -231,6 +232,12 @@ private List<String> readSinkDataFromIndex(String indexName) throws InterruptedE
231232
o -> Integer.valueOf(o.get("c_int").toString())))
232233
.map(JsonUtils::toJsonString)
233234
.collect(Collectors.toList());
235+
236+
if (scrollId != null && !scrollId.isEmpty()) {
237+
boolean cleared = easysearchClient.clearScroll(scrollId);
238+
Assertions.assertTrue(cleared);
239+
}
240+
234241
return docs;
235242
}
236243

@@ -344,6 +351,7 @@ private List<String> readSinkData() throws InterruptedException {
344351
query.put("range", range);
345352
ScrollResult scrollResult =
346353
easysearchClient.searchByScroll("st_index2", source, query, "1m", 1000);
354+
String scrollId = scrollResult.getScrollId();
347355
scrollResult
348356
.getDocs()
349357
.forEach(
@@ -366,6 +374,12 @@ private List<String> readSinkData() throws InterruptedException {
366374
o -> Integer.valueOf(o.get("c_int").toString())))
367375
.map(JsonUtils::toJsonString)
368376
.collect(Collectors.toList());
377+
378+
if (scrollId != null && !scrollId.isEmpty()) {
379+
boolean cleared = easysearchClient.clearScroll(scrollId);
380+
Assertions.assertTrue(cleared);
381+
}
382+
369383
return docs;
370384
}
371385

0 commit comments

Comments
 (0)