2424import com .lmax .disruptor .dsl .ProducerType ;
2525import io .opencensus .implcore .internal .DaemonThreadFactory ;
2626import io .opencensus .implcore .internal .EventQueue ;
27- import java .util .concurrent .Executors ;
2827import javax .annotation .Nullable ;
2928import javax .annotation .concurrent .ThreadSafe ;
3029
@@ -101,19 +100,20 @@ public final class DisruptorEventQueue implements EventQueue {
101100 private final RingBuffer <DisruptorEvent > ringBuffer ;
102101
103102 // Creates a new EventQueue. Private to prevent creation of non-singleton instance.
104- // Suppress warnings for disruptor.handleEventsWith and Disruptor constructor
105- @ SuppressWarnings ({"deprecation" , " unchecked" , "varargs " })
103+ // Suppress warnings for disruptor.handleEventsWith.
104+ @ SuppressWarnings ({"unchecked" })
106105 private DisruptorEventQueue () {
107- // Create new Disruptor for processing. Note that this uses a single thread for processing; this
108- // ensures that the event handler can take unsynchronized actions whenever possible.
106+ // Create new Disruptor for processing. Note that Disruptor creates a single thread per
107+ // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
108+ // this ensures that the event handler can take unsynchronized actions whenever possible.
109109 disruptor =
110- new Disruptor <DisruptorEvent >(
111- new DisruptorEventFactory () ,
110+ new Disruptor <>(
111+ DisruptorEventFactory . INSTANCE ,
112112 DISRUPTOR_BUFFER_SIZE ,
113- Executors . newSingleThreadExecutor ( new DaemonThreadFactory ("OpenCensus.Disruptor" ) ),
113+ new DaemonThreadFactory ("OpenCensus.Disruptor" ),
114114 ProducerType .MULTI ,
115115 new SleepingWaitStrategy ());
116- disruptor .handleEventsWith (new DisruptorEventHandler () );
116+ disruptor .handleEventsWith (DisruptorEventHandler . INSTANCE );
117117 disruptor .start ();
118118 ringBuffer = disruptor .getRingBuffer ();
119119 }
@@ -145,10 +145,12 @@ public void enqueue(Entry entry) {
145145
146146 // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
147147 private static final class DisruptorEvent {
148- @ Nullable private Entry entry ;
148+ // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
149+ // intuitively this variable must be volatile.
150+ @ Nullable private volatile Entry entry = null ;
149151
150152 // Sets the EventQueueEntry associated with this DisruptorEvent.
151- void setEntry (Entry entry ) {
153+ void setEntry (@ Nullable Entry entry ) {
152154 this .entry = entry ;
153155 }
154156
@@ -160,7 +162,9 @@ Entry getEntry() {
160162 }
161163
162164 // Factory for DisruptorEvent.
163- private static final class DisruptorEventFactory implements EventFactory <DisruptorEvent > {
165+ private enum DisruptorEventFactory implements EventFactory <DisruptorEvent > {
166+ INSTANCE ;
167+
164168 @ Override
165169 public DisruptorEvent newInstance () {
166170 return new DisruptorEvent ();
@@ -171,12 +175,17 @@ public DisruptorEvent newInstance() {
171175 * Every event that gets added to {@link EventQueue} will get processed here. Just calls the
172176 * underlying process() method.
173177 */
174- private static final class DisruptorEventHandler implements EventHandler <DisruptorEvent > {
178+ private enum DisruptorEventHandler implements EventHandler <DisruptorEvent > {
179+ INSTANCE ;
180+
175181 @ Override
176- // TODO(sebright): Fix the Checker Framework warning.
177- @ SuppressWarnings ("nullness" )
178182 public void onEvent (DisruptorEvent event , long sequence , boolean endOfBatch ) {
179- event .getEntry ().process ();
183+ Entry entry = event .getEntry ();
184+ if (entry != null ) {
185+ entry .process ();
186+ }
187+ // Remove the reference to the previous entry to allow the memory to be gc'ed.
188+ event .setEntry (null );
180189 }
181190 }
182191}
0 commit comments