@@ -55,6 +55,8 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
5555 private static final long FORCE_SEND_DELAY_MS = TimeUnit .SECONDS .toMillis (5 );
5656 private static final long SEND_DELAY_NS = TimeUnit .MILLISECONDS .toNanos (500 );
5757 private static final long SLEEP_TIME_MS = 100 ;
58+ private static final CommandElement FLUSH_ELEMENT = new CommandElement ();
59+ private static final CommandElement DUMP_ELEMENT = new CommandElement ();
5860
5961 private final MpscBlockingConsumerArrayQueue <Element > queue ;
6062 private final Thread worker ;
@@ -118,10 +120,10 @@ public void flush() {
118120 if (worker .isAlive ()) {
119121 int count = flushCounter .get ();
120122 int loop = 1 ;
121- boolean signaled = queue .offer (FlushElement . FLUSH_ELEMENT );
123+ boolean signaled = queue .offer (FLUSH_ELEMENT );
122124 while (!closed && !signaled ) {
123125 yieldOrSleep (loop ++);
124- signaled = queue .offer (FlushElement . FLUSH_ELEMENT );
126+ signaled = queue .offer (FLUSH_ELEMENT );
125127 }
126128 int newCount = flushCounter .get ();
127129 while (!closed && count >= newCount ) {
@@ -161,41 +163,7 @@ public Element get() {
161163 }
162164 }
163165
164- private static final class FlushElement implements Element {
165- static FlushElement FLUSH_ELEMENT = new FlushElement ();
166-
167- @ Override
168- public long oldestFinishedTime () {
169- return 0 ;
170- }
171-
172- @ Override
173- public boolean lastReferencedNanosAgo (long nanos ) {
174- return false ;
175- }
176-
177- @ Override
178- public void write () {}
179-
180- @ Override
181- public DDSpan getRootSpan () {
182- return null ;
183- }
184-
185- @ Override
186- public boolean setEnqueued (boolean enqueued ) {
187- return true ;
188- }
189-
190- @ Override
191- public boolean writeOnBufferFull () {
192- return true ;
193- }
194- }
195-
196- private static final class DumpElement implements Element {
197- static DumpElement DUMP_ELEMENT = new DumpElement ();
198-
166+ private static final class CommandElement implements Element {
199167 @ Override
200168 public long oldestFinishedTime () {
201169 return 0 ;
@@ -243,14 +211,14 @@ public void run() {
243211 pendingTrace = queue .take (); // block until available;
244212 }
245213
246- if (pendingTrace instanceof FlushElement ) {
214+ if (pendingTrace == FLUSH_ELEMENT ) {
247215 // Since this is an MPSC queue, the drain needs to be called on the consumer thread
248216 queue .drain (WriteDrain .WRITE_DRAIN );
249217 flushCounter .incrementAndGet ();
250218 continue ;
251219 }
252220
253- if (pendingTrace instanceof DumpElement ) {
221+ if (pendingTrace == DUMP_ELEMENT ) {
254222 queue .drain (DumpDrain .DUMP_DRAIN );
255223 queue .fill (DumpDrain .DUMP_DRAIN , DumpDrain .DATA .size ());
256224 dumpCounter .incrementAndGet ();
@@ -364,10 +332,10 @@ public void prepareForFlare() {
364332 if (buffer .worker .isAlive ()) {
365333 int count = buffer .dumpCounter .get ();
366334 int loop = 1 ;
367- boolean signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DumpElement . DUMP_ELEMENT );
335+ boolean signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DUMP_ELEMENT );
368336 while (!buffer .closed && !signaled ) {
369337 buffer .yieldOrSleep (loop ++);
370- signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DumpElement . DUMP_ELEMENT );
338+ signaled = buffer .queue .offer (DelayingPendingTraceBuffer .DUMP_ELEMENT );
371339 }
372340 int newCount = buffer .dumpCounter .get ();
373341 while (!buffer .closed && count >= newCount ) {
0 commit comments