11/*
2- * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+ * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33 */
44package com .marklogic .client .datamovement .impl ;
55
4242 * startIterating, withForestConfig, and retry.
4343 */
4444public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
45- private static Logger logger = LoggerFactory .getLogger (QueryBatcherImpl .class );
45+ private static final Logger logger = LoggerFactory .getLogger (QueryBatcherImpl .class );
4646 private String queryMethod ;
4747 private SearchQueryDefinition query ;
48- private SearchQueryDefinition originalQuery ;
4948 private Boolean filtered ;
5049 private Iterator <String > iterator ;
5150 private boolean threadCountSet = false ;
52- private List <QueryBatchListener > urisReadyListeners = new ArrayList <>();
53- private List <QueryFailureListener > failureListeners = new ArrayList <>();
54- private List <QueryBatcherListener > jobCompletionListeners = new ArrayList <>();
51+
52+ private final List <QueryBatchListener > urisReadyListeners = new ArrayList <>();
53+ private final List <QueryFailureListener > failureListeners = new ArrayList <>();
54+ private final List <QueryBatcherListener > jobCompletionListeners = new ArrayList <>();
55+
5556 private QueryThreadPoolExecutor threadPool ;
5657 private boolean consistentSnapshot = false ;
5758 private final AtomicLong batchNumber = new AtomicLong (0 );
@@ -61,51 +62,51 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
6162 private Map <Forest ,AtomicLong > forestResults = new HashMap <>();
6263 private Map <Forest ,AtomicBoolean > forestIsDone = new HashMap <>();
6364 private Map <Forest , AtomicInteger > retryForestMap = new HashMap <>();
64- private AtomicBoolean runJobCompletionListeners = new AtomicBoolean (false );
65+
66+ private final AtomicBoolean runJobCompletionListeners = new AtomicBoolean (false );
6567 private final Object lock = new Object ();
6668 private final Map <Forest ,List <QueryTask >> blackListedTasks = new HashMap <>();
69+
6770 private boolean isSingleThreaded = false ;
71+
6872 private long maxUris = Long .MAX_VALUE ;
6973 private long maxBatches = Long .MAX_VALUE ;
7074 private int maxDocToUriBatchRatio ;
7175 private int docToUriBatchRatio ;
7276 private int defaultDocBatchSize ;
7377 private int maxUriBatchSize ;
7478
75- QueryBatcherImpl (
76- SearchQueryDefinition originalQuery , DataMovementManager moveMgr , ForestConfiguration forestConfig ,
77- String serializedCtsQuery , Boolean filtered , int maxDocToUriBatchRatio , int defaultDocBatchSize , int maxUriBatchSize
78- ) {
79- this (moveMgr , forestConfig , maxDocToUriBatchRatio , defaultDocBatchSize , maxUriBatchSize );
80- // TODO: skip conversion in DataMovementManagerImpl.newQueryBatcherImpl() unless canSerializeQueryAsJSON()
81- if (serializedCtsQuery != null && serializedCtsQuery .length () > 0 &&
82- originalQuery instanceof AbstractSearchQueryDefinition &&
83- ((AbstractSearchQueryDefinition ) originalQuery ).canSerializeQueryAsJSON ()) {
84- QueryManagerImpl queryMgr = (QueryManagerImpl ) getPrimaryClient ().newQueryManager ();
85- this .queryMethod = "POST" ;
86- this .query = queryMgr .newRawCtsQueryDefinition (new StringHandle (serializedCtsQuery ).withFormat (Format .JSON ));
87- this .originalQuery = originalQuery ;
88- if (filtered != null ) {
89- this .filtered = filtered ;
90- }
91- } else {
92- initQuery (originalQuery );
93- }
94- }
79+ QueryBatcherImpl (SearchQueryDefinition originalQuery , DataMovementManager moveMgr , QueryConfig queryConfig ) {
80+ this (moveMgr , queryConfig );
81+
82+ final String serializedCtsQuery = queryConfig .serializedCtsQuery ();
83+ if (serializedCtsQuery != null && !serializedCtsQuery .isEmpty () &&
84+ originalQuery instanceof AbstractSearchQueryDefinition &&
85+ ((AbstractSearchQueryDefinition ) originalQuery ).canSerializeQueryAsJSON ()) {
86+ QueryManagerImpl queryMgr = (QueryManagerImpl ) getPrimaryClient ().newQueryManager ();
87+ this .queryMethod = "POST" ;
88+ this .query = queryMgr .newRawCtsQueryDefinition (new StringHandle (serializedCtsQuery ).withFormat (Format .JSON ));
89+ this .filtered = queryConfig .filtered ();
90+ } else {
91+ initQuery (originalQuery );
92+ }
93+ }
94+
9595 public QueryBatcherImpl (SearchQueryDefinition query , DataMovementManager moveMgr , ForestConfiguration forestConfig ) {
9696 this (moveMgr , forestConfig );
9797 initQuery (query );
9898 }
99+
99100 public QueryBatcherImpl (Iterator <String > iterator , DataMovementManager moveMgr , ForestConfiguration forestConfig ) {
100101 this (moveMgr , forestConfig );
101102 this .iterator = iterator ;
102103 }
103- private QueryBatcherImpl ( DataMovementManager moveMgr , ForestConfiguration forestConfig ,
104- int maxDocToUriBatchRatio , int defaultDocBatchSize , int maxUriBatchSize ) {
105- this (moveMgr , forestConfig );
106- this .maxDocToUriBatchRatio = maxDocToUriBatchRatio ;
107- this .defaultDocBatchSize = defaultDocBatchSize ;
108- this .maxUriBatchSize = maxUriBatchSize ;
104+
105+ private QueryBatcherImpl ( DataMovementManager moveMgr , QueryConfig queryConfig ) {
106+ this (moveMgr , queryConfig . forestConfig () );
107+ this .maxDocToUriBatchRatio = queryConfig . maxDocToUriBatchRatio () ;
108+ this .defaultDocBatchSize = queryConfig . defaultDocBatchSize () ;
109+ this .maxUriBatchSize = queryConfig . maxUriBatchSize () ;
109110 withBatchSize (defaultDocBatchSize );
110111 }
111112 private QueryBatcherImpl (DataMovementManager moveMgr , ForestConfiguration forestConfig ) {
@@ -187,7 +188,7 @@ public void retryWithFailureListeners(QueryEvent queryEvent) {
187188 }
188189
189190 private void retry (QueryEvent queryEvent , boolean callFailListeners ) {
190- if ( isStopped () == true ) {
191+ if ( isStopped ()) {
191192 logger .warn ("Job is now stopped, aborting the retry" );
192193 return ;
193194 }
@@ -449,7 +450,7 @@ public synchronized void start(JobTicket ticket) {
449450
450451 private synchronized void initialize () {
451452 Forest [] forests = getForestConfig ().listForests ();
452- if ( threadCountSet == false ) {
453+ if ( ! threadCountSet ) {
453454 if ( query != null ) {
454455 logger .warn ("threadCount not set--defaulting to number of forests ({})" , forests .length );
455456 withThreadCount (forests .length * docToUriBatchRatio );
@@ -529,7 +530,7 @@ public synchronized QueryBatcher withForestConfig(ForestConfiguration forestConf
529530 List <DatabaseClient > newClientList = clients (hostNames );
530531 clientList .set (newClientList );
531532 boolean started = (threadPool != null );
532- if ( started == true && oldForests .size () > 0 ) calculateDeltas (oldForests , forests );
533+ if ( started && ! oldForests .isEmpty () ) calculateDeltas (oldForests , forests );
533534 return this ;
534535 }
535536
@@ -550,7 +551,7 @@ private synchronized void calculateDeltas(Set<Forest> oldForests, Forest[] fores
550551 // this forest is not black-listed
551552 blackListedForests .remove (forest );
552553 }
553- if ( blackListedForests .size () > 0 ) {
554+ if ( ! blackListedForests .isEmpty () ) {
554555 DataMovementManagerImpl moveMgrImpl = getMoveMgr ();
555556 String primaryHost = moveMgrImpl .getPrimaryClient ().getHost ();
556557 if ( getHostNames (blackListedForests ).contains (primaryHost ) ) {
@@ -562,7 +563,7 @@ private synchronized void calculateDeltas(Set<Forest> oldForests, Forest[] fores
562563 }
563564
564565 private synchronized void cleanupExistingTasks (Set <Forest > addedForests , Set <Forest > restartedForests , Set <Forest > blackListedForests ) {
565- if ( blackListedForests .size () > 0 ) {
566+ if ( ! blackListedForests .isEmpty () ) {
566567 logger .warn ("removing jobs related to hosts [{}] from the queue" , getHostNames (blackListedForests ));
567568 // since some forests have been removed, let's remove from the queue any jobs that were targeting that forest
568569 List <Runnable > tasks = new ArrayList <>();
0 commit comments