Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientBuilder;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.*;
import com.marklogic.client.impl.DatabaseClientImpl;
import com.marklogic.client.io.marker.ContentHandle;
Expand Down Expand Up @@ -123,10 +122,8 @@ private QueryBatcher newQueryBatcherImpl(SearchQueryDefinition query) {
QueryBatcherImpl queryBatcher = null;
// preprocess the query if the effective version is at least 10.0-5
if (Long.compareUnsigned(getServerVersion(), Long.parseUnsignedLong("10000500")) >= 0) {
DataMovementServices.QueryConfig queryConfig = service.initConfig("POST", query);
queryBatcher = new QueryBatcherImpl(query, this, queryConfig.forestConfig,
queryConfig.serializedCtsQuery, queryConfig.filtered,
queryConfig.maxDocToUriBatchRatio, queryConfig.defaultDocBatchSize, queryConfig.maxUriBatchSize);
QueryConfig queryConfig = service.initConfig("POST", query);
queryBatcher = new QueryBatcherImpl(query, this, queryConfig);
} else {
queryBatcher = new QueryBatcherImpl(query, this, getForestConfig());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.impl;

Expand All @@ -23,7 +23,8 @@
import java.util.List;

public class DataMovementServices {
private static Logger logger = LoggerFactory.getLogger(DataMovementServices.class);

private static final Logger logger = LoggerFactory.getLogger(DataMovementServices.class);

private DatabaseClient client;

Expand All @@ -36,59 +37,58 @@ public DataMovementServices setClient(DatabaseClient client) {
return this;
}

QueryConfig initConfig(String method, SearchQueryDefinition qdef) {
logger.debug("initializing forest configuration with query");
if (qdef == null) throw new IllegalArgumentException("null query definition");

JsonNode result = ((DatabaseClientImpl) this.client).getServices()
.forestInfo(null, method, new RequestParameters(), qdef, new JacksonHandle())
.get();
// System.out.println(result.toPrettyString());

QueryConfig queryConfig = new QueryConfig();

try {
ObjectMapper mapper = new ObjectMapper();
JsonNode queryResult = result.get("query");
if (queryResult != null && queryResult.isObject() && queryResult.has("ctsquery")) {
queryConfig.serializedCtsQuery = mapper.writeValueAsString(queryResult);
logger.debug("initialized query to: {}", queryConfig.serializedCtsQuery);
}
JsonNode filteredResult = result.get("filtered");
if (filteredResult != null && filteredResult.isBoolean()) {
queryConfig.filtered = filteredResult.asBoolean();
logger.debug("initialized filtering to: {}", queryConfig.filtered.toString());
}
JsonNode maxDocToUriBatchRatio = result.get("maxDocToUriBatchRatio");
if (maxDocToUriBatchRatio != null && maxDocToUriBatchRatio.isInt()) {
queryConfig.maxDocToUriBatchRatio = maxDocToUriBatchRatio.asInt();
logger.debug("initialized maxDocToUriBatchRatio to : {}", queryConfig.maxDocToUriBatchRatio);
} else {
queryConfig.maxDocToUriBatchRatio = -1;
}
JsonNode defaultDocBatchSize = result.get("defaultDocBatchSize");
if (defaultDocBatchSize != null && defaultDocBatchSize.isInt()) {
queryConfig.defaultDocBatchSize = defaultDocBatchSize.asInt();
logger.debug("initialized defaultDocBatchSize to : {}", queryConfig.defaultDocBatchSize);
} else {
queryConfig.defaultDocBatchSize = -1;
}
JsonNode maxUriBatchSize = result.get("maxUriBatchSize");
if (maxUriBatchSize != null && maxUriBatchSize.isInt()) {
queryConfig.maxUriBatchSize = maxUriBatchSize.asInt();
logger.debug("initialized maxUriBatchSize to : {}", queryConfig.maxUriBatchSize);
} else {
queryConfig.maxUriBatchSize = -1;
}

} catch (JsonProcessingException e) {
logger.error("failed to initialize query", e);
}

queryConfig.forestConfig = makeForestConfig(result.has("forests") ? result.get("forests") : result);

return queryConfig;
}
QueryConfig initConfig(String method, SearchQueryDefinition qdef) {
logger.debug("initializing forest configuration with query");
if (qdef == null) throw new IllegalArgumentException("null query definition");

JsonNode result = ((DatabaseClientImpl) this.client).getServices()
.forestInfo(null, method, new RequestParameters(), qdef, new JacksonHandle())
.get();

JsonNode queryResult = result.get("query");

String serializedCtsQuery = null;
if (queryResult != null && queryResult.isObject() && queryResult.has("ctsquery")) {
try {
serializedCtsQuery = new ObjectMapper().writeValueAsString(queryResult);
logger.debug("initialized query to: {}", serializedCtsQuery);
} catch (JsonProcessingException e) {
logger.warn("Unable to serialize query result while initializing QueryBatcher; cause: {}", e.getMessage());
}
}

JsonNode filteredResult = result.get("filtered");
Boolean filtered = null;
if (filteredResult != null && filteredResult.isBoolean()) {
filtered = filteredResult.asBoolean();
logger.debug("initialized filtering to: {}", filtered);
}

JsonNode maxDocToUriBatchRatioNode = result.get("maxDocToUriBatchRatio");
int maxDocToUriBatchRatio = -1;
if (maxDocToUriBatchRatioNode != null && maxDocToUriBatchRatioNode.isInt()) {
maxDocToUriBatchRatio = maxDocToUriBatchRatioNode.asInt();
logger.debug("initialized maxDocToUriBatchRatio to : {}", maxDocToUriBatchRatio);
}

JsonNode defaultDocBatchSizeNode = result.get("defaultDocBatchSize");
int defaultDocBatchSize = -1;
if (defaultDocBatchSizeNode != null && defaultDocBatchSizeNode.isInt()) {
defaultDocBatchSize = defaultDocBatchSizeNode.asInt();
logger.debug("initialized defaultDocBatchSize to : {}", defaultDocBatchSize);
}

JsonNode maxUriBatchSizeNode = result.get("maxUriBatchSize");
int maxUriBatchSize = -1;
if (maxUriBatchSizeNode != null && maxUriBatchSizeNode.isInt()) {
maxUriBatchSize = maxUriBatchSizeNode.asInt();
logger.debug("initialized maxUriBatchSize to : {}", maxUriBatchSize);
}

ForestConfiguration forestConfig = makeForestConfig(result.has("forests") ? result.get("forests") : result);
return new QueryConfig(serializedCtsQuery, forestConfig, filtered,
maxDocToUriBatchRatio, defaultDocBatchSize, maxUriBatchSize);
}

ForestConfigurationImpl readForestConfig() {
logger.debug("initializing forest configuration");
Expand Down Expand Up @@ -183,12 +183,4 @@ private String generateJobId() {
return UUID.randomUUID().toString();
}

static class QueryConfig {
String serializedCtsQuery;
ForestConfiguration forestConfig;
Boolean filtered;
int maxDocToUriBatchRatio;
int defaultDocBatchSize;
int maxUriBatchSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.impl;

Expand Down Expand Up @@ -42,16 +42,17 @@
* startIterating, withForestConfig, and retry.
*/
public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
private static Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class);
private static final Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class);
private String queryMethod;
private SearchQueryDefinition query;
private SearchQueryDefinition originalQuery;
private Boolean filtered;
private Iterator<String> iterator;
private boolean threadCountSet = false;
private List<QueryBatchListener> urisReadyListeners = new ArrayList<>();
private List<QueryFailureListener> failureListeners = new ArrayList<>();
private List<QueryBatcherListener> jobCompletionListeners = new ArrayList<>();

private final List<QueryBatchListener> urisReadyListeners = new ArrayList<>();
private final List<QueryFailureListener> failureListeners = new ArrayList<>();
private final List<QueryBatcherListener> jobCompletionListeners = new ArrayList<>();

private QueryThreadPoolExecutor threadPool;
private boolean consistentSnapshot = false;
private final AtomicLong batchNumber = new AtomicLong(0);
Expand All @@ -61,51 +62,51 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
private Map<Forest,AtomicLong> forestResults = new HashMap<>();
private Map<Forest,AtomicBoolean> forestIsDone = new HashMap<>();
private Map<Forest, AtomicInteger> retryForestMap = new HashMap<>();
private AtomicBoolean runJobCompletionListeners = new AtomicBoolean(false);

private final AtomicBoolean runJobCompletionListeners = new AtomicBoolean(false);
private final Object lock = new Object();
private final Map<Forest,List<QueryTask>> blackListedTasks = new HashMap<>();

private boolean isSingleThreaded = false;

private long maxUris = Long.MAX_VALUE;
private long maxBatches = Long.MAX_VALUE;
private int maxDocToUriBatchRatio;
private int docToUriBatchRatio;
private int defaultDocBatchSize;
private int maxUriBatchSize;

QueryBatcherImpl(
SearchQueryDefinition originalQuery, DataMovementManager moveMgr, ForestConfiguration forestConfig,
String serializedCtsQuery, Boolean filtered, int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize
) {
this(moveMgr, forestConfig, maxDocToUriBatchRatio, defaultDocBatchSize, maxUriBatchSize);
// TODO: skip conversion in DataMovementManagerImpl.newQueryBatcherImpl() unless canSerializeQueryAsJSON()
if (serializedCtsQuery != null && serializedCtsQuery.length() > 0 &&
originalQuery instanceof AbstractSearchQueryDefinition &&
((AbstractSearchQueryDefinition) originalQuery).canSerializeQueryAsJSON()) {
QueryManagerImpl queryMgr = (QueryManagerImpl) getPrimaryClient().newQueryManager();
this.queryMethod = "POST";
this.query = queryMgr.newRawCtsQueryDefinition(new StringHandle(serializedCtsQuery).withFormat(Format.JSON));
this.originalQuery = originalQuery;
if (filtered != null) {
this.filtered = filtered;
}
} else {
initQuery(originalQuery);
}
}
QueryBatcherImpl(SearchQueryDefinition originalQuery, DataMovementManager moveMgr, QueryConfig queryConfig) {
this(moveMgr, queryConfig);

final String serializedCtsQuery = queryConfig.serializedCtsQuery();
if (serializedCtsQuery != null && !serializedCtsQuery.isEmpty() &&
originalQuery instanceof AbstractSearchQueryDefinition &&
((AbstractSearchQueryDefinition) originalQuery).canSerializeQueryAsJSON()) {
QueryManagerImpl queryMgr = (QueryManagerImpl) getPrimaryClient().newQueryManager();
this.queryMethod = "POST";
this.query = queryMgr.newRawCtsQueryDefinition(new StringHandle(serializedCtsQuery).withFormat(Format.JSON));
this.filtered = queryConfig.filtered();
Copy link

Copilot AI Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filtered field is being assigned the value from queryConfig.filtered() which can be null (Boolean object), but the previous code had a null check. If filtered is null, this could cause unexpected behavior. Consider preserving the null check: if (queryConfig.filtered() != null) { this.filtered = queryConfig.filtered(); }

Suggested change
this.filtered = queryConfig.filtered();
if (queryConfig.filtered() != null) {
this.filtered = queryConfig.filtered();
}

Copilot uses AI. Check for mistakes.
} else {
initQuery(originalQuery);
}
}

public QueryBatcherImpl(SearchQueryDefinition query, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
this(moveMgr, forestConfig);
initQuery(query);
}

public QueryBatcherImpl(Iterator<String> iterator, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
this(moveMgr, forestConfig);
this.iterator = iterator;
}
private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig,
int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize) {
this(moveMgr, forestConfig);
this.maxDocToUriBatchRatio = maxDocToUriBatchRatio;
this.defaultDocBatchSize = defaultDocBatchSize;
this.maxUriBatchSize = maxUriBatchSize;

private QueryBatcherImpl(DataMovementManager moveMgr, QueryConfig queryConfig) {
this(moveMgr, queryConfig.forestConfig());
this.maxDocToUriBatchRatio = queryConfig.maxDocToUriBatchRatio();
this.defaultDocBatchSize = queryConfig.defaultDocBatchSize();
this.maxUriBatchSize = queryConfig.maxUriBatchSize();
withBatchSize(defaultDocBatchSize);
}
private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) {
Expand Down Expand Up @@ -187,7 +188,7 @@ public void retryWithFailureListeners(QueryEvent queryEvent) {
}

private void retry(QueryEvent queryEvent, boolean callFailListeners) {
if ( isStopped() == true ) {
if ( isStopped()) {
logger.warn("Job is now stopped, aborting the retry");
return;
}
Expand Down Expand Up @@ -449,7 +450,7 @@ public synchronized void start(JobTicket ticket) {

private synchronized void initialize() {
Forest[] forests = getForestConfig().listForests();
if ( threadCountSet == false ) {
if ( !threadCountSet ) {
if ( query != null ) {
logger.warn("threadCount not set--defaulting to number of forests ({})", forests.length);
withThreadCount(forests.length * docToUriBatchRatio);
Expand Down Expand Up @@ -529,7 +530,7 @@ public synchronized QueryBatcher withForestConfig(ForestConfiguration forestConf
List<DatabaseClient> newClientList = clients(hostNames);
clientList.set(newClientList);
boolean started = (threadPool != null);
if ( started == true && oldForests.size() > 0 ) calculateDeltas(oldForests, forests);
if ( started && !oldForests.isEmpty() ) calculateDeltas(oldForests, forests);
return this;
}

Expand All @@ -550,7 +551,7 @@ private synchronized void calculateDeltas(Set<Forest> oldForests, Forest[] fores
// this forest is not black-listed
blackListedForests.remove(forest);
}
if ( blackListedForests.size() > 0 ) {
if ( !blackListedForests.isEmpty() ) {
DataMovementManagerImpl moveMgrImpl = getMoveMgr();
String primaryHost = moveMgrImpl.getPrimaryClient().getHost();
if ( getHostNames(blackListedForests).contains(primaryHost) ) {
Expand All @@ -562,7 +563,7 @@ private synchronized void calculateDeltas(Set<Forest> oldForests, Forest[] fores
}

private synchronized void cleanupExistingTasks(Set<Forest> addedForests, Set<Forest> restartedForests, Set<Forest> blackListedForests) {
if ( blackListedForests.size() > 0 ) {
if ( !blackListedForests.isEmpty() ) {
logger.warn("removing jobs related to hosts [{}] from the queue", getHostNames(blackListedForests));
// since some forests have been removed, let's remove from the queue any jobs that were targeting that forest
List<Runnable> tasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.datamovement.ForestConfiguration;

record QueryConfig(
String serializedCtsQuery,
ForestConfiguration forestConfig,
Boolean filtered,
int maxDocToUriBatchRatio,
int defaultDocBatchSize,
int maxUriBatchSize
) {
}