@@ -138,24 +138,111 @@ private static Releasable wrapReleasable(Releasable releasable) {
138138 };
139139 }
140140
141+ public Incremental startIncrementalCoordinating (int operations , long bytes , boolean forceExecution ) {
142+ Incremental coordinating = new Incremental (forceExecution );
143+ coordinating .coordinating .increment (operations , bytes );
144+ return coordinating ;
145+ }
146+
141147 public Coordinating markCoordinatingOperationStarted (int operations , long bytes , boolean forceExecution ) {
142148 Coordinating coordinating = new Coordinating (forceExecution );
143149 coordinating .increment (operations , bytes );
144150 return coordinating ;
145151 }
146152
153+ public class Incremental implements Releasable {
154+
155+ private final AtomicBoolean closed = new AtomicBoolean ();
156+ private final boolean forceExecution ;
157+ private long currentUnparsedSize = 0 ;
158+ private long totalParsedBytes = 0 ;
159+ private Coordinating coordinating ;
160+
161+ public Incremental (boolean forceExecution ) {
162+ this .forceExecution = forceExecution ;
163+ this .coordinating = new Coordinating (forceExecution );
164+ }
165+
166+ public long totalParsedBytes () {
167+ return totalParsedBytes ;
168+ }
169+
170+ public void incrementUnparsedBytes (long bytes ) {
171+ assert closed .get () == false ;
172+ // TODO: Implement integration with IndexingPressure for unparsed bytes
173+ currentUnparsedSize += bytes ;
174+ }
175+
176+ public void transferUnparsedBytesToParsed (long bytes ) {
177+ assert closed .get () == false ;
178+ assert currentUnparsedSize >= bytes ;
179+ currentUnparsedSize -= bytes ;
180+ totalParsedBytes += bytes ;
181+ }
182+
183+ public void increment (int operations , long bytes ) {
184+ // TODO: Eventually most of the memory will already be accounted for in unparsed.
185+ coordinating .increment (operations , bytes );
186+ }
187+
188+ public long currentOperationsSize () {
189+ return coordinating .currentOperationsSize ;
190+ }
191+
192+ public boolean shouldSplit () {
193+ long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes .get () + currentReplicaBytes .get ());
194+ long currentOperationsSize = coordinating .currentOperationsSize ;
195+ if (currentUsage >= highWatermark && currentOperationsSize >= highWatermarkSize ) {
196+ highWaterMarkSplits .getAndIncrement ();
197+ logger .trace (
198+ () -> Strings .format (
199+ "Split bulk due to high watermark: current bytes [%d] and size [%d]" ,
200+ currentUsage ,
201+ currentOperationsSize
202+ )
203+ );
204+ return true ;
205+ }
206+ if (currentUsage >= lowWatermark && currentOperationsSize >= lowWatermarkSize ) {
207+ lowWaterMarkSplits .getAndIncrement ();
208+ logger .trace (
209+ () -> Strings .format (
210+ "Split bulk due to low watermark: current bytes [%d] and size [%d]" ,
211+ currentUsage ,
212+ currentOperationsSize
213+ )
214+ );
215+ return true ;
216+ }
217+ return false ;
218+ }
219+
220+ public Coordinating split () {
221+ Coordinating toReturn = coordinating ;
222+ coordinating = new Coordinating (forceExecution );
223+ return toReturn ;
224+ }
225+
226+ @ Override
227+ public void close () {
228+ coordinating .close ();
229+ }
230+ }
231+
232+ // TODO: Maybe this should be re-named and used for primary operations too. Eventually we will need to account for: ingest pipeline
233+ // expansions, reading updates, etc. This could just be a generic OP that could be expanded as appropriate
147234 public class Coordinating implements Releasable {
148235
149236 private final AtomicBoolean closed = new AtomicBoolean ();
150237 private final boolean forceExecution ;
151238 private int currentOperations = 0 ;
152- private long currentSize = 0 ;
239+ private long currentOperationsSize = 0 ;
153240
154241 public Coordinating (boolean forceExecution ) {
155242 this .forceExecution = forceExecution ;
156243 }
157244
158- public void increment (int operations , long bytes ) {
245+ private void increment (int operations , long bytes ) {
159246 assert closed .get () == false ;
160247 long combinedBytes = currentCombinedCoordinatingAndPrimaryBytes .addAndGet (bytes );
161248 long replicaWriteBytes = currentReplicaBytes .get ();
@@ -186,7 +273,7 @@ public void increment(int operations, long bytes) {
186273 );
187274 }
188275 currentOperations += operations ;
189- currentSize += bytes ;
276+ currentOperationsSize += bytes ;
190277 logger .trace (() -> Strings .format ("adding [%d] coordinating operations and [%d] bytes" , operations , bytes ));
191278 currentCoordinatingBytes .getAndAdd (bytes );
192279 currentCoordinatingOps .getAndAdd (operations );
@@ -196,43 +283,17 @@ public void increment(int operations, long bytes) {
196283 totalCoordinatingRequests .getAndIncrement ();
197284 }
198285
199- public long currentSize () {
200- return currentSize ;
201- }
202-
203- public void releaseCurrent () {
204- currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-currentSize );
205- currentCoordinatingBytes .getAndAdd (-currentSize );
206- currentCoordinatingOps .getAndAdd (-currentOperations );
207- currentSize = 0 ;
208- currentOperations = 0 ;
209- }
210-
211- public boolean shouldSplit () {
212- long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes .get () + currentReplicaBytes .get ());
213- if (currentUsage >= highWatermark && currentSize >= highWatermarkSize ) {
214- highWaterMarkSplits .getAndIncrement ();
215- logger .trace (
216- () -> Strings .format ("Split bulk due to high watermark: current bytes [%d] and size [%d]" , currentUsage , currentSize )
217- );
218- return true ;
219- }
220- if (currentUsage >= lowWatermark && currentSize >= lowWatermarkSize ) {
221- lowWaterMarkSplits .getAndIncrement ();
222- logger .trace (
223- () -> Strings .format ("Split bulk due to low watermark: current bytes [%d] and size [%d]" , currentUsage , currentSize )
224- );
225- return true ;
226- }
227- return false ;
228-
229- }
230-
231286 @ Override
232287 public void close () {
233288 if (closed .compareAndSet (false , true )) {
234- logger .trace (() -> Strings .format ("removing [%d] coordinating operations and [%d] bytes" , currentOperations , currentSize ));
235- releaseCurrent ();
289+ logger .trace (
290+ () -> Strings .format ("removing [%d] coordinating operations and [%d] bytes" , currentOperations , currentOperationsSize )
291+ );
292+ currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-currentOperationsSize );
293+ currentCoordinatingBytes .getAndAdd (-currentOperationsSize );
294+ currentCoordinatingOps .getAndAdd (-currentOperations );
295+ currentOperationsSize = 0 ;
296+ currentOperations = 0 ;
236297 } else {
237298 logger .error ("IndexingPressure memory is adjusted twice" , new IllegalStateException ("Releasable is called twice" ));
238299 assert false : "IndexingPressure is adjusted twice" ;
0 commit comments