@@ -138,48 +138,167 @@ private static Releasable wrapReleasable(Releasable releasable) {
138138 };
139139 }
140140
141- public Releasable markCoordinatingOperationStarted (int operations , long bytes , boolean forceExecution ) {
142- long combinedBytes = this .currentCombinedCoordinatingAndPrimaryBytes .addAndGet (bytes );
143- long replicaWriteBytes = this .currentReplicaBytes .get ();
144- long totalBytes = combinedBytes + replicaWriteBytes ;
145- if (forceExecution == false && totalBytes > coordinatingLimit ) {
146- long bytesWithoutOperation = combinedBytes - bytes ;
147- long totalBytesWithoutOperation = totalBytes - bytes ;
148- this .currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-bytes );
149- this .coordinatingRejections .getAndIncrement ();
150- throw new EsRejectedExecutionException (
151- "rejected execution of coordinating operation ["
152- + "coordinating_and_primary_bytes="
153- + bytesWithoutOperation
154- + ", "
155- + "replica_bytes="
156- + replicaWriteBytes
157- + ", "
158- + "all_bytes="
159- + totalBytesWithoutOperation
160- + ", "
161- + "coordinating_operation_bytes="
162- + bytes
163- + ", "
164- + "max_coordinating_bytes="
165- + coordinatingLimit
166- + "]" ,
167- false
168- );
141+ public Incremental startIncrementalCoordinating (int operations , long bytes , boolean forceExecution ) {
142+ Incremental coordinating = new Incremental (forceExecution );
143+ coordinating .coordinating .increment (operations , bytes );
144+ return coordinating ;
145+ }
146+
147+ public Coordinating markCoordinatingOperationStarted (int operations , long bytes , boolean forceExecution ) {
148+ Coordinating coordinating = new Coordinating (forceExecution );
149+ coordinating .increment (operations , bytes );
150+ return coordinating ;
151+ }
152+
153+ public class Incremental implements Releasable {
154+
155+ private final AtomicBoolean closed = new AtomicBoolean ();
156+ private final boolean forceExecution ;
157+ private long currentUnparsedSize = 0 ;
158+ private long totalParsedBytes = 0 ;
159+ private Coordinating coordinating ;
160+
161+ public Incremental (boolean forceExecution ) {
162+ this .forceExecution = forceExecution ;
163+ this .coordinating = new Coordinating (forceExecution );
164+ }
165+
166+ public long totalParsedBytes () {
167+ return totalParsedBytes ;
168+ }
169+
170+ public void incrementUnparsedBytes (long bytes ) {
171+ assert closed .get () == false ;
172+ // TODO: Implement integration with IndexingPressure for unparsed bytes
173+ currentUnparsedSize += bytes ;
174+ }
175+
176+ public void transferUnparsedBytesToParsed (long bytes ) {
177+ assert closed .get () == false ;
178+ assert currentUnparsedSize >= bytes ;
179+ currentUnparsedSize -= bytes ;
180+ totalParsedBytes += bytes ;
181+ }
182+
183+ public void increment (int operations , long bytes ) {
184+ // TODO: Eventually most of the memory will already be accounted for in unparsed.
185+ coordinating .increment (operations , bytes );
186+ }
187+
188+ public long currentOperationsSize () {
189+ return coordinating .currentOperationsSize ;
190+ }
191+
192+ public boolean shouldSplit () {
193+ long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes .get () + currentReplicaBytes .get ());
194+ long currentOperationsSize = coordinating .currentOperationsSize ;
195+ if (currentUsage >= highWatermark && currentOperationsSize >= highWatermarkSize ) {
196+ highWaterMarkSplits .getAndIncrement ();
197+ logger .trace (
198+ () -> Strings .format (
199+ "Split bulk due to high watermark: current bytes [%d] and size [%d]" ,
200+ currentUsage ,
201+ currentOperationsSize
202+ )
203+ );
204+ return true ;
205+ }
206+ if (currentUsage >= lowWatermark && currentOperationsSize >= lowWatermarkSize ) {
207+ lowWaterMarkSplits .getAndIncrement ();
208+ logger .trace (
209+ () -> Strings .format (
210+ "Split bulk due to low watermark: current bytes [%d] and size [%d]" ,
211+ currentUsage ,
212+ currentOperationsSize
213+ )
214+ );
215+ return true ;
216+ }
217+ return false ;
218+ }
219+
220+ public Coordinating split () {
221+ Coordinating toReturn = coordinating ;
222+ coordinating = new Coordinating (forceExecution );
223+ return toReturn ;
224+ }
225+
226+ @ Override
227+ public void close () {
228+ coordinating .close ();
229+ }
230+ }
231+
232+ // TODO: Maybe this should be re-named and used for primary operations too. Eventually we will need to account for: ingest pipeline
233+ // expansions, reading updates, etc. This could just be a generic OP that could be expanded as appropriate
234+ public class Coordinating implements Releasable {
235+
236+ private final AtomicBoolean closed = new AtomicBoolean ();
237+ private final boolean forceExecution ;
238+ private int currentOperations = 0 ;
239+ private long currentOperationsSize = 0 ;
240+
241+ public Coordinating (boolean forceExecution ) {
242+ this .forceExecution = forceExecution ;
243+ }
244+
245+ private void increment (int operations , long bytes ) {
246+ assert closed .get () == false ;
247+ long combinedBytes = currentCombinedCoordinatingAndPrimaryBytes .addAndGet (bytes );
248+ long replicaWriteBytes = currentReplicaBytes .get ();
249+ long totalBytes = combinedBytes + replicaWriteBytes ;
250+ if (forceExecution == false && totalBytes > coordinatingLimit ) {
251+ long bytesWithoutOperation = combinedBytes - bytes ;
252+ long totalBytesWithoutOperation = totalBytes - bytes ;
253+ currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-bytes );
254+ coordinatingRejections .getAndIncrement ();
255+ throw new EsRejectedExecutionException (
256+ "rejected execution of coordinating operation ["
257+ + "coordinating_and_primary_bytes="
258+ + bytesWithoutOperation
259+ + ", "
260+ + "replica_bytes="
261+ + replicaWriteBytes
262+ + ", "
263+ + "all_bytes="
264+ + totalBytesWithoutOperation
265+ + ", "
266+ + "coordinating_operation_bytes="
267+ + bytes
268+ + ", "
269+ + "max_coordinating_bytes="
270+ + coordinatingLimit
271+ + "]" ,
272+ false
273+ );
274+ }
275+ currentOperations += operations ;
276+ currentOperationsSize += bytes ;
277+ logger .trace (() -> Strings .format ("adding [%d] coordinating operations and [%d] bytes" , operations , bytes ));
278+ currentCoordinatingBytes .getAndAdd (bytes );
279+ currentCoordinatingOps .getAndAdd (operations );
280+ totalCombinedCoordinatingAndPrimaryBytes .getAndAdd (bytes );
281+ totalCoordinatingBytes .getAndAdd (bytes );
282+ totalCoordinatingOps .getAndAdd (operations );
283+ totalCoordinatingRequests .getAndIncrement ();
284+ }
285+
286+ @ Override
287+ public void close () {
288+ if (closed .compareAndSet (false , true )) {
289+ logger .trace (
290+ () -> Strings .format ("removing [%d] coordinating operations and [%d] bytes" , currentOperations , currentOperationsSize )
291+ );
292+ currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-currentOperationsSize );
293+ currentCoordinatingBytes .getAndAdd (-currentOperationsSize );
294+ currentCoordinatingOps .getAndAdd (-currentOperations );
295+ currentOperationsSize = 0 ;
296+ currentOperations = 0 ;
297+ } else {
298+ logger .error ("IndexingPressure memory is adjusted twice" , new IllegalStateException ("Releasable is called twice" ));
299+ assert false : "IndexingPressure is adjusted twice" ;
300+ }
169301 }
170- logger .trace (() -> Strings .format ("adding [%d] coordinating operations and [%d] bytes" , operations , bytes ));
171- currentCoordinatingBytes .getAndAdd (bytes );
172- currentCoordinatingOps .getAndAdd (operations );
173- totalCombinedCoordinatingAndPrimaryBytes .getAndAdd (bytes );
174- totalCoordinatingBytes .getAndAdd (bytes );
175- totalCoordinatingOps .getAndAdd (operations );
176- totalCoordinatingRequests .getAndIncrement ();
177- return wrapReleasable (() -> {
178- logger .trace (() -> Strings .format ("removing [%d] coordinating operations and [%d] bytes" , operations , bytes ));
179- this .currentCombinedCoordinatingAndPrimaryBytes .getAndAdd (-bytes );
180- this .currentCoordinatingBytes .getAndAdd (-bytes );
181- this .currentCoordinatingOps .getAndAdd (-operations );
182- });
183302 }
184303
185304 public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted (int operations , long bytes ) {
@@ -266,21 +385,6 @@ public Releasable markReplicaOperationStarted(int operations, long bytes, boolea
266385 });
267386 }
268387
269- public boolean shouldSplitBulk (long size ) {
270- long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes .get () + currentReplicaBytes .get ());
271- if (currentUsage >= highWatermark && size >= highWatermarkSize ) {
272- highWaterMarkSplits .getAndIncrement ();
273- logger .trace (() -> Strings .format ("Split bulk due to high watermark: current bytes [%d] and size [%d]" , currentUsage , size ));
274- return (true );
275- }
276- if (currentUsage >= lowWatermark && size >= lowWatermarkSize ) {
277- lowWaterMarkSplits .getAndIncrement ();
278- logger .trace (() -> Strings .format ("Split bulk due to low watermark: current bytes [%d] and size [%d]" , currentUsage , size ));
279- return (true );
280- }
281- return (false );
282- }
283-
284388 public IndexingPressureStats stats () {
285389 return new IndexingPressureStats (
286390 totalCombinedCoordinatingAndPrimaryBytes .get (),
0 commit comments