@@ -42,6 +42,7 @@ public class DXFeedImpl extends DXFeed {
4242
4343 private final DXEndpointImpl endpoint ;
4444 private final QDFilter filter ;
45+ private final QDFilter .UpdateListener filterListener ;
4546 private final RecordMode retrieveMode ;
4647 private final QDAgent .Builder [] eventProcessorAgentBuilders = new QDAgent .Builder [N_CONTRACTS ];
4748 private final IndexedSet <DXFeedSubscription <?>, EventProcessor <?, ?>> eventProcessors =
@@ -67,6 +68,13 @@ public DXFeedImpl(DXEndpointImpl endpoint) {
6768 public DXFeedImpl (DXEndpointImpl endpoint , QDFilter filter ) {
6869 this .endpoint = endpoint ;
6970 this .filter = filter ;
71+ if (filter .isDynamic ()) {
72+ // Add filter to force automatic update
73+ filterListener = this ::updateSubscriptionsOnFilterUpdate ;
74+ filter .getUpdated ().addUpdateListener (filterListener );
75+ } else {
76+ filterListener = null ;
77+ }
7078
7179 RecordMode mode = RecordMode .FLAGGED_DATA .withAttachment ();
7280 if (endpoint .getQDEndpoint ().hasEventTimeSequence ())
@@ -102,11 +110,13 @@ public static void clearDataInBuffer(RecordBuffer buf, boolean keepTime) {
102110 }
103111
104112 public void awaitTerminationAndCloseImpl () throws InterruptedException {
113+ // Async iteration
105114 for (EventProcessor <?, ?> processor : eventProcessors .toArray (new EventProcessor [eventProcessors .size ()])) {
106115 if (processor == null )
107116 break ;
108- processor .awaitTerminationAndClose ();
117+ processor .awaitTermination ();
109118 }
119+ closeImpl ();
110120 }
111121
112122 public void closeImpl () {
@@ -119,17 +129,23 @@ public void closeImpl() {
119129 // no need to sync, because nothing is added or removed when endpoint is closed
120130 closeComponents ();
121131 }
122- eventProcessors .clear ();
123- closeables .clear ();
124132 }
125133
126134 private void closeComponents () {
127135 for (EventProcessor <?, ?> processor : eventProcessors )
128136 processor .close (false );
137+ eventProcessors .clear ();
129138 for (Closeable c : closeables )
130139 c .close ();
140+ closeables .clear ();
131141 if (lastEventsProcessor != null )
132142 lastEventsProcessor .close ();
143+
144+ // Remove listener for dynamic filter
145+ if (filterListener != null ) {
146+ assert filter .isDynamic (); // filter listener is created only for dynamic filters
147+ filter .getUpdated ().removeUpdateListener (filterListener );
148+ }
133149 }
134150
135151 private void removeEventProcessor (DXFeedSubscription <?> subscription ) {
@@ -153,18 +169,21 @@ private void removeCloseable(Closeable c) {
153169 @ Override
154170 @ SuppressWarnings ("unchecked" )
155171 public void attachSubscription (DXFeedSubscription <?> subscription ) {
172+ //FIXME Store listener for later removal on detach. Check for double addition.
156173 subscription .addChangeListener (new SubscriptionChangeListener (subscription , false ));
157174 }
158175
159176 @ Override
160177 @ SuppressWarnings ("unchecked" )
161178 public void detachSubscription (DXFeedSubscription <?> subscription ) {
179+ //FIXME Remove the same listener that was added before.
162180 subscription .removeChangeListener (new SubscriptionChangeListener (subscription , false ));
163181 }
164182
165183 @ Override
166184 @ SuppressWarnings ("unchecked" )
167185 public void detachSubscriptionAndClear (DXFeedSubscription <?> subscription ) {
186+ //FIXME Remove the same listener that was added before.
168187 subscription .removeChangeListener (new SubscriptionChangeListener (subscription , true ));
169188 }
170189
@@ -179,7 +198,7 @@ public <E extends LastingEvent<?>> E getLastEvent(E event) {
179198 int cipher = endpoint .encode (qdSymbol );
180199
181200 // check if symbol is accepted by filter
182- if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
201+ if (!filter .getUpdatedFilter (). accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
183202 return event ;
184203 LocalAddBatch lb = getLocalAddBatch ();
185204 if (ticker .getDataIfAvailable (lb .owner , delegate .getRecord (), cipher , qdSymbol ))
@@ -198,7 +217,7 @@ public <E extends LastingEvent<?>> E getLastEventIfSubscribed(Class<E> eventType
198217 int cipher = endpoint .encode (qdSymbol );
199218
200219 // check if symbol is accepted by filter
201- if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
220+ if (!filter .getUpdatedFilter (). accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
202221 return null ;
203222 LocalAddBatch lb = getLocalAddBatch ();
204223 if (lastEventsProcessor .ticker .getDataIfSubscribed (lb .owner , delegate .getRecord (), cipher , qdSymbol ))
@@ -219,7 +238,7 @@ public <E extends LastingEvent<?>> Promise<E> getLastEventPromise(Class<E> event
219238 int cipher = endpoint .encode (qdSymbol );
220239
221240 // check if symbol is accepted by filter
222- if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
241+ if (!filter .getUpdatedFilter (). accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol ))
223242 return Promise .failed (new CancellationException ("cancel" ));
224243 // optimization for single event -- check that it is immediately available without subscription
225244 LocalAddBatch lb = getLocalAddBatch ();
@@ -254,7 +273,7 @@ public <E extends LastingEvent<?>> List<Promise<E>> getLastEventsPromises(Class<
254273 int cipher = endpoint .encode (qdSymbol );
255274
256275 // check if symbol is accepted by filter
257- if (!filter .accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol )) {
276+ if (!filter .getUpdatedFilter (). accept (QDContract .TICKER , delegate .getRecord (), cipher , qdSymbol )) {
258277 result .add (Promise .failed (new CancellationException ("cancel" )));
259278 continue ;
260279 }
@@ -400,7 +419,7 @@ private <E extends IndexedEvent<?>> List<E> fetchFromHistoryIfSubscribed(Object
400419 int cipher = endpoint .encode (qdSymbol );
401420
402421 // check if symbol is accepted by filter
403- if (!filter .accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
422+ if (!filter .getUpdatedFilter (). accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
404423 return Collections .emptyList ();
405424 // check subscription
406425 if (!history .isSubscribed (delegate .getRecord (), cipher , qdSymbol , fromQDTime ))
@@ -419,7 +438,7 @@ private <E extends IndexedEvent<?>> Promise<List<E>> fetchOrSubscribeFromHistory
419438 int cipher = endpoint .encode (qdSymbol );
420439
421440 // check if symbol is accepted by filter
422- if (!filter .accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
441+ if (!filter .getUpdatedFilter (). accept (QDContract .HISTORY , delegate .getRecord (), cipher , qdSymbol ))
423442 return Promise .failed (new CancellationException ("cancel" ));
424443 // check if data is available in history without subscription
425444 HistoryFetchResult <E > fetch = new HistoryFetchResult <>(symbol , fromQDTime , delegate , true );
@@ -524,6 +543,19 @@ public long getAggregationPeriodMillis() {
524543 return aggregationPeriodMillis ;
525544 }
526545
546+ private void updateSubscriptionsOnFilterUpdate (QDFilter updatedFilter ) {
547+ // Filter parameter is ignored since agents will always use latest filter version on subscription
548+ for (EventProcessor <?, ?> processor : eventProcessors ) {
549+ Set <?> symbols = processor .subscription .getSymbols ();
550+ EnumMap <QDContract , RecordBuffer > sub = toSubscription (processor .subscription , symbols , true );
551+ for (QDContract contract : sub .keySet ()) {
552+ RecordBuffer buffer = sub .get (contract );
553+ processor .getOrCreateAgent (contract ).setSubscription (buffer );
554+ buffer .release ();
555+ }
556+ }
557+ }
558+
527559 private <E > void executePromiseHandler (final Promise <E > promise , final PromiseHandler <? super E > handler ) {
528560 if (handler != null )
529561 endpoint .getOrCreateExecutor ().execute (() -> handler .promiseDone (promise ));
@@ -884,7 +916,7 @@ QDAgent getOrCreateAgent(QDContract contract) {
884916 QDAgent agent = agents [contract .ordinal ()];
885917 if (agent != null )
886918 return agent ;
887- agent = eventProcessorAgentBuilders [contract .ordinal ()].build ();
919+ agent = eventProcessorAgentBuilders [contract .ordinal ()].withFilter ( filter . getUpdatedFilter ()). build ();
888920 if (endpoint .getRole () == DXEndpoint .Role .STREAM_FEED )
889921 agent .setBufferOverflowStrategy (QDAgent .BufferOverflowStrategy .BLOCK );
890922 agents [contract .ordinal ()] = agent ;
@@ -942,11 +974,6 @@ private void signalNoMoreDataToProcess() {
942974 terminationLatch .countDown ();
943975 }
944976
945- void awaitTerminationAndClose () throws InterruptedException {
946- awaitTermination ();
947- close (false );
948- }
949-
950977 private void awaitTermination () throws InterruptedException {
951978 terminationLatch = new CountDownLatch (1 );
952979 if (hasMoreDataToProcess ()) {
0 commit comments