Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ public ActivemqClient(ReadonlyConfig config) {
log.info("connection created");

} catch (Exception e) {
e.printStackTrace();
log.error("Error while creating AMQ client", e);
throw new ActivemqConnectorException(
ActivemqConnectorErrorCode.CREATE_ACTIVEMQ_CLIENT_FAILED,
"Error while create AMQ client ");
"Error while create AMQ client ",
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ private static RestClientBuilder getRestClientBuilder(
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (username.isPresent()) {
String passwordStr = null;
if (password.isPresent()) {
passwordStr = password.get();
}
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username.get(), password.get()));
new UsernamePasswordCredentials(username.get(), passwordStr));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

Expand Down Expand Up @@ -289,6 +293,36 @@ public void close() {
}
}

public boolean clearScroll(String scrollId) {
if (scrollId == null || scrollId.isEmpty()) {
return false;
}

String endpoint = "/_search/scroll";
Request request = new Request("DELETE", endpoint);
Map<String, String> param = new HashMap<>();
param.put("scroll_id", scrollId);
request.setJsonEntity(JsonUtils.toJsonString(param));

try {
Response response = restClient.performRequest(request);
if (response == null) {
log.warn("DELETE {} response null when clearing scrollId {}", endpoint, scrollId);
return false;
}
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
return true;
} else {
log.warn("Failed to clear scrollId {}, status code={}", scrollId, statusCode);
return false;
}
} catch (IOException e) {
log.warn("Error clearing scrollId " + scrollId, e);
return false;
}
}

/**
* first time to request search documents by scroll call /${index}/_search?scroll=${scroll}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,33 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
EasysearchSourceSplit split = splits.poll();
if (split != null) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
ScrollResult scrollResult =
ezsClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
ezsClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
String scrollId = null;
try {
ScrollResult scrollResult =
ezsClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
scrollId = scrollResult.getScrollId();
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
ezsClient.searchWithScrollId(
scrollResult.getScrollId(),
sourceIndexInfo.getScrollTime());
scrollId = scrollResult.getScrollId();
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
} finally {
if (scrollId != null && !scrollId.isEmpty()) {
try {
ezsClient.clearScroll(scrollId);
} catch (Exception e) {
log.warn("Failed to clear Easysearch scrollId: " + scrollId, e);
}
}
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String re
@Override
public void onFailure(
@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
log.error("WebSocket connection failed: " + t.getMessage());
t.printStackTrace();
log.error("WebSocket connection failed", t);
scheduleReconnect();
}

Expand All @@ -108,7 +107,8 @@ public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
try {
buffer.put(text);
} catch (InterruptedException e) {
e.printStackTrace();
log.error("Failed to put message into buffer", e);
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -139,7 +139,8 @@ private void scheduleReconnect() {
Thread.sleep(RETRY_DELAY_MS);
connect();
} catch (InterruptedException e) {
e.printStackTrace();
log.error("Reconnection attempt interrupted", e);
Thread.currentThread().interrupt();
}
})
.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public void write(SeaTunnelRow element) throws IOException {
try {
this.client.PutLogs(plr);
} catch (Throwable e) {
log.error("write logs failed", e);
e.printStackTrace();
log.error("Failed to write logs to SLS", e);
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ private List<String> readSinkDataFromIndex(String indexName) throws InterruptedE
query.put("range", range);
ScrollResult scrollResult =
easysearchClient.searchByScroll(indexName, source, query, "1m", 1000);
String scrollId = scrollResult.getScrollId();
scrollResult
.getDocs()
.forEach(
Expand All @@ -231,6 +232,12 @@ private List<String> readSinkDataFromIndex(String indexName) throws InterruptedE
o -> Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());

if (scrollId != null && !scrollId.isEmpty()) {
boolean cleared = easysearchClient.clearScroll(scrollId);
Assertions.assertTrue(cleared);
}

return docs;
}

Expand Down Expand Up @@ -344,6 +351,7 @@ private List<String> readSinkData() throws InterruptedException {
query.put("range", range);
ScrollResult scrollResult =
easysearchClient.searchByScroll("st_index2", source, query, "1m", 1000);
String scrollId = scrollResult.getScrollId();
scrollResult
.getDocs()
.forEach(
Expand All @@ -366,6 +374,12 @@ private List<String> readSinkData() throws InterruptedException {
o -> Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());

if (scrollId != null && !scrollId.isEmpty()) {
boolean cleared = easysearchClient.clearScroll(scrollId);
Assertions.assertTrue(cleared);
}

return docs;
}

Expand Down
Loading