@@ -41,6 +41,7 @@ public class DXFeedImpl extends DXFeed {
4141 private static final ThreadLocal <LocalRemoveBatch > LOCAL_REMOVE_BATCH = new ThreadLocal <>();
4242
4343 private final DXEndpointImpl endpoint ;
44+ private final QDFilter filter ;
4445 private final RecordMode retrieveMode ;
4546 private final QDAgent .Builder [] eventProcessorAgentBuilders = new QDAgent .Builder [N_CONTRACTS ];
4647 private final IndexedSet <DXFeedSubscription <?>, EventProcessor <?, ?>> eventProcessors =
@@ -49,23 +50,40 @@ public class DXFeedImpl extends DXFeed {
4950 private final LastEventsProcessor lastEventsProcessor ; // != null when we have QDTicker in endpoint
5051 private final long aggregationPeriodMillis ;
5152
52- DXFeedImpl (DXEndpointImpl endpoint ) {
53+ // Default constructor for DXEndpointImpl
54+ @ SuppressWarnings ("deprecation" )
55+ public DXFeedImpl (DXEndpointImpl endpoint ) {
56+ this (endpoint , QDFilter .ANYTHING );
57+ }
58+
59+ /**
60+ * Allows to create DXFeed instance with the specific filter from the given endpoint. Endpoint will not handle
61+ * this feed's lifecycle - call closeImpl() or awaitTerminationAndCloseImpl() to clean up resources!
62+ * @param endpoint
63+ * @param filter
64+ * @deprecated For internal use only, do not use!
65+ */
66+ @ Deprecated
67+ public DXFeedImpl (DXEndpointImpl endpoint , QDFilter filter ) {
5368 this .endpoint = endpoint ;
69+ this .filter = filter ;
70+
5471 RecordMode mode = RecordMode .FLAGGED_DATA .withAttachment ();
5572 if (endpoint .getQDEndpoint ().hasEventTimeSequence ())
5673 mode = mode .withEventTimeSequence ();
5774 retrieveMode = mode ;
5875 for (QDContract contract : endpoint .getContracts ()) {
5976 QDAgent .Builder builder = endpoint .getCollector (contract ).agentBuilder ()
6077 .withHistorySnapshot (true )
78+ .withFilter (filter )
6179 .withAttachmentStrategy (EVENT_PROCESSOR_ATTACHMENT_STRATEGY );
6280 eventProcessorAgentBuilders [contract .ordinal ()] = builder ;
6381 }
6482 QDTicker ticker = (QDTicker ) endpoint .getCollector (QDContract .TICKER );
6583 if (ticker == null ) {
6684 lastEventsProcessor = null ;
6785 } else {
68- lastEventsProcessor = new LastEventsProcessor (ticker );
86+ lastEventsProcessor = new LastEventsProcessor (ticker , filter );
6987 lastEventsProcessor .start ();
7088 }
7189 aggregationPeriodMillis = endpoint .hasProperty (DXEndpoint .DXFEED_AGGREGATION_PERIOD_PROPERTY ) ?
@@ -92,15 +110,24 @@ public void awaitTerminationAndCloseImpl() throws InterruptedException {
92110 }
93111
94112 public void closeImpl () {
95- assert endpoint .isClosed (); // assert that close impl is called on endpoint that is already marked as closed
96- // no need to sync, because nothing is added or removed when endpoint is closed
113+ // cannot be sure that endpoint is already closed
114+ if (!endpoint .isClosed ()) {
115+ synchronized (endpoint .getLock ()) {
116+ closeComponents ();
117+ }
118+ } else {
119+ // no need to sync, because nothing is added or removed when endpoint is closed
120+ closeComponents ();
121+ }
122+ eventProcessors .clear ();
123+ closeables .clear ();
124+ }
125+
126+ private void closeComponents () {
97127 for (EventProcessor <?, ?> processor : eventProcessors )
98128 processor .close (false );
99- eventProcessors .clear ();
100- // no need to sync, because nothing is added or removed when endpoint is closed
101129 for (Closeable c : closeables )
102130 c .close ();
103- closeables .clear ();
104131 if (lastEventsProcessor != null )
105132 lastEventsProcessor .close ();
106133 }
@@ -148,9 +175,13 @@ public <E extends LastingEvent<?>> E getLastEvent(E event) {
148175 if (delegate == null )
149176 return event ;
150177 QDTicker ticker = (QDTicker )endpoint .getCollector (QDContract .TICKER ); // assert != null (if we have delegate)
151- LocalAddBatch lb = getLocalAddBatch ();
152178 String qdSymbol = delegate .getQDSymbolByEvent (event );
153179 int cipher = endpoint .encode (qdSymbol );
180+
181+ // check if symbol is accepted by filter
182+ if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
183+ return event ;
184+ LocalAddBatch lb = getLocalAddBatch ();
154185 if (ticker .getDataIfAvailable (lb .owner , delegate .getRecord (), cipher , qdSymbol ))
155186 return delegate .getEvent (event , lb .owner .cursor ());
156187 // not found -- return unmodified event
@@ -163,9 +194,13 @@ public <E extends LastingEvent<?>> E getLastEventIfSubscribed(Class<E> eventType
163194 if (delegate == null )
164195 return null ;
165196 assert lastEventsProcessor != null ; // if we have delegate
166- LocalAddBatch lb = getLocalAddBatch ();
167197 String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
168198 int cipher = endpoint .encode (qdSymbol );
199+
200+ // check if symbol is accepted by filter
201+ if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
202+ return null ;
203+ LocalAddBatch lb = getLocalAddBatch ();
169204 if (lastEventsProcessor .ticker .getDataIfSubscribed (lb .owner , delegate .getRecord (), cipher , qdSymbol ))
170205 return delegate .createEvent (symbol , lb .owner .cursor ());
171206 // not subscribed -- return null
@@ -180,10 +215,14 @@ public <E extends LastingEvent<?>> Promise<E> getLastEventPromise(Class<E> event
180215 if (delegate == null )
181216 return Promise .failed (new IllegalArgumentException (INVALID_EVENT_MSG ));
182217 assert lastEventsProcessor != null ; // if we have delegate
183- LocalAddBatch lb = getLocalAddBatch ();
184218 String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
185219 int cipher = endpoint .encode (qdSymbol );
220+
221+ // check if symbol is accepted by filter
222+ if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
223+ return Promise .failed (new CancellationException ("cancel" ));
186224 // optimization for single event -- check that it is immediately available without subscription
225+ LocalAddBatch lb = getLocalAddBatch ();
187226 if (lastEventsProcessor .ticker .getDataIfAvailable (lb .owner , delegate .getRecord (), cipher , qdSymbol ))
188227 return Promise .completed (delegate .createEvent (symbol , lb .owner .cursor ()));
189228 // not found -- need to subscribe
@@ -202,20 +241,28 @@ public <E extends LastingEvent<?>> List<Promise<E>> getLastEventsPromises(Class<
202241 if (eventType == null )
203242 throw new NullPointerException ();
204243 List <Promise <E >> result = new ArrayList <>(symbols .size ());
205- assert lastEventsProcessor != null ; // if we have delegate
244+
206245 LocalAddBatch lb = null ;
207246 for (Object symbol : symbols ) {
208247 EventDelegate <E > delegate = getLastingEventDelegateOrNull (eventType , symbol );
209248 if (delegate == null ) {
210249 result .add (Promise .failed (new IllegalArgumentException (INVALID_EVENT_MSG )));
211250 continue ;
212251 }
252+ assert lastEventsProcessor != null ; // if we have delegate
253+ String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
254+ int cipher = endpoint .encode (qdSymbol );
255+
256+ // check if symbol is accepted by filter
257+ if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol )) {
258+ result .add (Promise .failed (new CancellationException ("cancel" )));
259+ continue ;
260+ }
261+
213262 if (lb == null ) {
214263 lb = getLocalAddBatch ();
215264 lb .subscribeStartBatch ();
216265 }
217- String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
218- int cipher = endpoint .encode (qdSymbol );
219266 LastEventPromise <E > promise = new LastEventPromise <>(symbol , delegate , cipher , qdSymbol );
220267 result .add (promise );
221268 // optimization for single event -- check that it is immediately available without subscription
@@ -351,6 +398,10 @@ private <E extends IndexedEvent<?>> List<E> fetchFromHistoryIfSubscribed(Object
351398 QDHistory history = (QDHistory )endpoint .getCollector (QDContract .HISTORY ); // assert != null (if we have delegate)
352399 String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
353400 int cipher = endpoint .encode (qdSymbol );
401+
402+ // check if symbol is accepted by filter
403+ if (!filter .accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
404+ return Collections .emptyList ();
354405 // check subscription
355406 if (!history .isSubscribed (delegate .getRecord (), cipher , qdSymbol , fromQDTime ))
356407 return null ; // not subscribed
@@ -366,6 +417,10 @@ private <E extends IndexedEvent<?>> Promise<List<E>> fetchOrSubscribeFromHistory
366417 QDHistory history = (QDHistory )endpoint .getCollector (QDContract .HISTORY ); // assert != null (if we have delegate)
367418 String qdSymbol = delegate .getQDSymbolByEventSymbol (symbol );
368419 int cipher = endpoint .encode (qdSymbol );
420+
421+ // check if symbol is accepted by filter
422+ if (!filter .accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
423+ return Promise .failed (new CancellationException ("cancel" ));
369424 // check if data is available in history without subscription
370425 HistoryFetchResult <E > fetch = new HistoryFetchResult <>(symbol , fromQDTime , delegate , true );
371426 history .examineData (delegate .getRecord (), cipher , qdSymbol , fetchQDTime , toQDTime , fetch );
@@ -1074,10 +1129,11 @@ private class LastEventsProcessor extends RecordProcessor {
10741129 final QDTicker ticker ;
10751130 final QDAgent tickerAgent ;
10761131
1077- LastEventsProcessor (QDTicker ticker ) {
1132+ LastEventsProcessor (QDTicker ticker , QDFilter filter ) {
10781133 super (endpoint .getOrCreateExecutor ());
10791134 this .ticker = ticker ;
10801135 tickerAgent = ticker .agentBuilder () // anonymous, so that JMX beans are not registered (see QD-445)
1136+ .withFilter (filter )
10811137 .withAttachmentStrategy (LAST_EVENT_ATTACHMENT_STRATEGY )
10821138 .build ();
10831139 }
@@ -1087,7 +1143,6 @@ void start() {
10871143 }
10881144
10891145 void close () {
1090- assert endpoint .isClosed (); // should have been already closed
10911146 stopProcessing ();
10921147 tickerAgent .close ();
10931148 }
0 commit comments