diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java index d31a46fbe..7b2362c8a 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java @@ -1,11 +1,10 @@ /* - * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ package com.marklogic.client.datamovement.impl; import com.marklogic.client.DatabaseClient; import com.marklogic.client.DatabaseClientBuilder; -import com.marklogic.client.DatabaseClientFactory; import com.marklogic.client.datamovement.*; import com.marklogic.client.impl.DatabaseClientImpl; import com.marklogic.client.io.marker.ContentHandle; @@ -123,10 +122,8 @@ private QueryBatcher newQueryBatcherImpl(SearchQueryDefinition query) { QueryBatcherImpl queryBatcher = null; // preprocess the query if the effective version is at least 10.0-5 if (Long.compareUnsigned(getServerVersion(), Long.parseUnsignedLong("10000500")) >= 0) { - DataMovementServices.QueryConfig queryConfig = service.initConfig("POST", query); - queryBatcher = new QueryBatcherImpl(query, this, queryConfig.forestConfig, - queryConfig.serializedCtsQuery, queryConfig.filtered, - queryConfig.maxDocToUriBatchRatio, queryConfig.defaultDocBatchSize, queryConfig.maxUriBatchSize); + QueryConfig queryConfig = service.initConfig("POST", query); + queryBatcher = new QueryBatcherImpl(query, this, queryConfig); } else { queryBatcher = new QueryBatcherImpl(query, this, getForestConfig()); } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java index 8699c81e8..eb58c15e2 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ package com.marklogic.client.datamovement.impl; @@ -23,7 +23,8 @@ import java.util.List; public class DataMovementServices { - private static Logger logger = LoggerFactory.getLogger(DataMovementServices.class); + + private static final Logger logger = LoggerFactory.getLogger(DataMovementServices.class); private DatabaseClient client; @@ -36,59 +37,58 @@ public DataMovementServices setClient(DatabaseClient client) { return this; } - QueryConfig initConfig(String method, SearchQueryDefinition qdef) { - logger.debug("initializing forest configuration with query"); - if (qdef == null) throw new IllegalArgumentException("null query definition"); - - JsonNode result = ((DatabaseClientImpl) this.client).getServices() - .forestInfo(null, method, new RequestParameters(), qdef, new JacksonHandle()) - .get(); - // System.out.println(result.toPrettyString()); - - QueryConfig queryConfig = new QueryConfig(); - - try { - ObjectMapper mapper = new ObjectMapper(); - JsonNode queryResult = result.get("query"); - if (queryResult != null && queryResult.isObject() && queryResult.has("ctsquery")) { - queryConfig.serializedCtsQuery = mapper.writeValueAsString(queryResult); - logger.debug("initialized query to: {}", queryConfig.serializedCtsQuery); - } - JsonNode filteredResult = result.get("filtered"); - if (filteredResult != null && filteredResult.isBoolean()) { - queryConfig.filtered = filteredResult.asBoolean(); - logger.debug("initialized filtering to: {}", queryConfig.filtered.toString()); - } - JsonNode maxDocToUriBatchRatio = result.get("maxDocToUriBatchRatio"); - if (maxDocToUriBatchRatio != null && maxDocToUriBatchRatio.isInt()) { - queryConfig.maxDocToUriBatchRatio = maxDocToUriBatchRatio.asInt(); - logger.debug("initialized maxDocToUriBatchRatio to : {}", queryConfig.maxDocToUriBatchRatio); - } else { - queryConfig.maxDocToUriBatchRatio = -1; - } - JsonNode defaultDocBatchSize = result.get("defaultDocBatchSize"); - if (defaultDocBatchSize != null && defaultDocBatchSize.isInt()) { - queryConfig.defaultDocBatchSize = defaultDocBatchSize.asInt(); - logger.debug("initialized defaultDocBatchSize to : {}", queryConfig.defaultDocBatchSize); - } else { - queryConfig.defaultDocBatchSize = -1; - } - JsonNode maxUriBatchSize = result.get("maxUriBatchSize"); - if (maxUriBatchSize != null && maxUriBatchSize.isInt()) { - queryConfig.maxUriBatchSize = maxUriBatchSize.asInt(); - logger.debug("initialized maxUriBatchSize to : {}", queryConfig.maxUriBatchSize); - } else { - queryConfig.maxUriBatchSize = -1; - } - - } catch (JsonProcessingException e) { - logger.error("failed to initialize query", e); - } - - queryConfig.forestConfig = makeForestConfig(result.has("forests") ? result.get("forests") : result); - - return queryConfig; - } + QueryConfig initConfig(String method, SearchQueryDefinition qdef) { + logger.debug("initializing forest configuration with query"); + if (qdef == null) throw new IllegalArgumentException("null query definition"); + + JsonNode result = ((DatabaseClientImpl) this.client).getServices() + .forestInfo(null, method, new RequestParameters(), qdef, new JacksonHandle()) + .get(); + + JsonNode queryResult = result.get("query"); + + String serializedCtsQuery = null; + if (queryResult != null && queryResult.isObject() && queryResult.has("ctsquery")) { + try { + serializedCtsQuery = new ObjectMapper().writeValueAsString(queryResult); + logger.debug("initialized query to: {}", serializedCtsQuery); + } catch (JsonProcessingException e) { + logger.warn("Unable to serialize query result while initializing QueryBatcher; cause: {}", e.getMessage()); + } + } + + JsonNode filteredResult = result.get("filtered"); + Boolean filtered = null; + if (filteredResult != null && filteredResult.isBoolean()) { + filtered = filteredResult.asBoolean(); + logger.debug("initialized filtering to: {}", filtered); + } + + JsonNode maxDocToUriBatchRatioNode = result.get("maxDocToUriBatchRatio"); + int maxDocToUriBatchRatio = -1; + if (maxDocToUriBatchRatioNode != null && maxDocToUriBatchRatioNode.isInt()) { + maxDocToUriBatchRatio = maxDocToUriBatchRatioNode.asInt(); + logger.debug("initialized maxDocToUriBatchRatio to : {}", maxDocToUriBatchRatio); + } + + JsonNode defaultDocBatchSizeNode = result.get("defaultDocBatchSize"); + int defaultDocBatchSize = -1; + if (defaultDocBatchSizeNode != null && defaultDocBatchSizeNode.isInt()) { + defaultDocBatchSize = defaultDocBatchSizeNode.asInt(); + logger.debug("initialized defaultDocBatchSize to : {}", defaultDocBatchSize); + } + + JsonNode maxUriBatchSizeNode = result.get("maxUriBatchSize"); + int maxUriBatchSize = -1; + if (maxUriBatchSizeNode != null && maxUriBatchSizeNode.isInt()) { + maxUriBatchSize = maxUriBatchSizeNode.asInt(); + logger.debug("initialized maxUriBatchSize to : {}", maxUriBatchSize); + } + + ForestConfiguration forestConfig = makeForestConfig(result.has("forests") ? result.get("forests") : result); + return new QueryConfig(serializedCtsQuery, forestConfig, filtered, + maxDocToUriBatchRatio, defaultDocBatchSize, maxUriBatchSize); + } ForestConfigurationImpl readForestConfig() { logger.debug("initializing forest configuration"); @@ -183,12 +183,4 @@ private String generateJobId() { return UUID.randomUUID().toString(); } - static class QueryConfig { - String serializedCtsQuery; - ForestConfiguration forestConfig; - Boolean filtered; - int maxDocToUriBatchRatio; - int defaultDocBatchSize; - int maxUriBatchSize; - } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java index f9581867a..031e8b3df 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ package com.marklogic.client.datamovement.impl; @@ -42,16 +42,17 @@ * startIterating, withForestConfig, and retry. */ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher { - private static Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class); + private static final Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class); private String queryMethod; private SearchQueryDefinition query; - private SearchQueryDefinition originalQuery; private Boolean filtered; private Iterator iterator; private boolean threadCountSet = false; - private List urisReadyListeners = new ArrayList<>(); - private List failureListeners = new ArrayList<>(); - private List jobCompletionListeners = new ArrayList<>(); + + private final List urisReadyListeners = new ArrayList<>(); + private final List failureListeners = new ArrayList<>(); + private final List jobCompletionListeners = new ArrayList<>(); + private QueryThreadPoolExecutor threadPool; private boolean consistentSnapshot = false; private final AtomicLong batchNumber = new AtomicLong(0); @@ -61,10 +62,13 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher { private Map forestResults = new HashMap<>(); private Map forestIsDone = new HashMap<>(); private Map retryForestMap = new HashMap<>(); - private AtomicBoolean runJobCompletionListeners = new AtomicBoolean(false); + + private final AtomicBoolean runJobCompletionListeners = new AtomicBoolean(false); private final Object lock = new Object(); private final Map> blackListedTasks = new HashMap<>(); + private boolean isSingleThreaded = false; + private long maxUris = Long.MAX_VALUE; private long maxBatches = Long.MAX_VALUE; private int maxDocToUriBatchRatio; @@ -72,40 +76,37 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher { private int defaultDocBatchSize; private int maxUriBatchSize; - QueryBatcherImpl( - SearchQueryDefinition originalQuery, DataMovementManager moveMgr, ForestConfiguration forestConfig, - String serializedCtsQuery, Boolean filtered, int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize - ) { - this(moveMgr, forestConfig, maxDocToUriBatchRatio, defaultDocBatchSize, maxUriBatchSize); - // TODO: skip conversion in DataMovementManagerImpl.newQueryBatcherImpl() unless canSerializeQueryAsJSON() - if (serializedCtsQuery != null && serializedCtsQuery.length() > 0 && - originalQuery instanceof AbstractSearchQueryDefinition && - ((AbstractSearchQueryDefinition) originalQuery).canSerializeQueryAsJSON()) { - QueryManagerImpl queryMgr = (QueryManagerImpl) getPrimaryClient().newQueryManager(); - this.queryMethod = "POST"; - this.query = queryMgr.newRawCtsQueryDefinition(new StringHandle(serializedCtsQuery).withFormat(Format.JSON)); - this.originalQuery = originalQuery; - if (filtered != null) { - this.filtered = filtered; - } - } else { - initQuery(originalQuery); - } - } + QueryBatcherImpl(SearchQueryDefinition originalQuery, DataMovementManager moveMgr, QueryConfig queryConfig) { + this(moveMgr, queryConfig); + + final String serializedCtsQuery = queryConfig.serializedCtsQuery(); + if (serializedCtsQuery != null && !serializedCtsQuery.isEmpty() && + originalQuery instanceof AbstractSearchQueryDefinition && + ((AbstractSearchQueryDefinition) originalQuery).canSerializeQueryAsJSON()) { + QueryManagerImpl queryMgr = (QueryManagerImpl) getPrimaryClient().newQueryManager(); + this.queryMethod = "POST"; + this.query = queryMgr.newRawCtsQueryDefinition(new StringHandle(serializedCtsQuery).withFormat(Format.JSON)); + this.filtered = queryConfig.filtered(); + } else { + initQuery(originalQuery); + } + } + public QueryBatcherImpl(SearchQueryDefinition query, DataMovementManager moveMgr, ForestConfiguration forestConfig) { this(moveMgr, forestConfig); initQuery(query); } + public QueryBatcherImpl(Iterator iterator, DataMovementManager moveMgr, ForestConfiguration forestConfig) { this(moveMgr, forestConfig); this.iterator = iterator; } - private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig, - int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize) { - this(moveMgr, forestConfig); - this.maxDocToUriBatchRatio = maxDocToUriBatchRatio; - this.defaultDocBatchSize = defaultDocBatchSize; - this.maxUriBatchSize = maxUriBatchSize; + + private QueryBatcherImpl(DataMovementManager moveMgr, QueryConfig queryConfig) { + this(moveMgr, queryConfig.forestConfig()); + this.maxDocToUriBatchRatio = queryConfig.maxDocToUriBatchRatio(); + this.defaultDocBatchSize = queryConfig.defaultDocBatchSize(); + this.maxUriBatchSize = queryConfig.maxUriBatchSize(); withBatchSize(defaultDocBatchSize); } private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) { @@ -187,7 +188,7 @@ public void retryWithFailureListeners(QueryEvent queryEvent) { } private void retry(QueryEvent queryEvent, boolean callFailListeners) { - if ( isStopped() == true ) { + if ( isStopped()) { logger.warn("Job is now stopped, aborting the retry"); return; } @@ -449,7 +450,7 @@ public synchronized void start(JobTicket ticket) { private synchronized void initialize() { Forest[] forests = getForestConfig().listForests(); - if ( threadCountSet == false ) { + if ( !threadCountSet ) { if ( query != null ) { logger.warn("threadCount not set--defaulting to number of forests ({})", forests.length); withThreadCount(forests.length * docToUriBatchRatio); @@ -529,7 +530,7 @@ public synchronized QueryBatcher withForestConfig(ForestConfiguration forestConf List newClientList = clients(hostNames); clientList.set(newClientList); boolean started = (threadPool != null); - if ( started == true && oldForests.size() > 0 ) calculateDeltas(oldForests, forests); + if ( started && !oldForests.isEmpty() ) calculateDeltas(oldForests, forests); return this; } @@ -550,7 +551,7 @@ private synchronized void calculateDeltas(Set oldForests, Forest[] fores // this forest is not black-listed blackListedForests.remove(forest); } - if ( blackListedForests.size() > 0 ) { + if ( !blackListedForests.isEmpty() ) { DataMovementManagerImpl moveMgrImpl = getMoveMgr(); String primaryHost = moveMgrImpl.getPrimaryClient().getHost(); if ( getHostNames(blackListedForests).contains(primaryHost) ) { @@ -562,7 +563,7 @@ private synchronized void calculateDeltas(Set oldForests, Forest[] fores } private synchronized void cleanupExistingTasks(Set addedForests, Set restartedForests, Set blackListedForests) { - if ( blackListedForests.size() > 0 ) { + if ( !blackListedForests.isEmpty() ) { logger.warn("removing jobs related to hosts [{}] from the queue", getHostNames(blackListedForests)); // since some forests have been removed, let's remove from the queue any jobs that were targeting that forest List tasks = new ArrayList<>(); diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryConfig.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryConfig.java new file mode 100644 index 000000000..f46883622 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryConfig.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.impl; + +import com.marklogic.client.datamovement.ForestConfiguration; + +record QueryConfig( + String serializedCtsQuery, + ForestConfiguration forestConfig, + Boolean filtered, + int maxDocToUriBatchRatio, + int defaultDocBatchSize, + int maxUriBatchSize +) { +}