Skip to content

Commit e5c04bc

Browse files
author
dxcity
committed
tagging release 3.274
1 parent be1fbad commit e5c04bc

File tree

66 files changed

+761
-272
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+761
-272
lines changed

ReleaseNotes.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
QDS 3.274:
2+
3+
* [QD-1140] Flaky test: FileConnectorCorruptedTest
4+
* [QD-1138] Data race in DXEndpoint state processing
5+
* [QD-1135] Support limit on the number of connections per server port
6+
* [QD-1133] Tests optimization
7+
18
QDS 3.273:
29

310
* [QD-1132] Tools.invoke involving a FileConnector always takes at least one second

auth/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
<relativePath>../pom.xml</relativePath>
1919
</parent>
2020
<modelVersion>4.0.0</modelVersion>

dxfeed-api/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
<relativePath>../pom.xml</relativePath>
1919
</parent>
2020
<modelVersion>4.0.0</modelVersion>

dxfeed-bin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
<relativePath>../pom.xml</relativePath>
1919
</parent>
2020
<modelVersion>4.0.0</modelVersion>

dxfeed-codegen-verify/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
</parent>
1919
<modelVersion>4.0.0</modelVersion>
2020

dxfeed-codegen/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
<relativePath>../pom.xml</relativePath>
1919
</parent>
2020
<modelVersion>4.0.0</modelVersion>

dxfeed-impl/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
</parent>
1919
<modelVersion>4.0.0</modelVersion>
2020

dxfeed-impl/src/main/java/com/dxfeed/api/impl/DXEndpointImpl.java

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private void connectImpl(String address, boolean start) {
281281
if (address == null)
282282
throw new NullPointerException();
283283
synchronized (lock) {
284-
if (stateHolder.isClosed() || address.equals(this.address))
284+
if (stateHolder.state == State.CLOSED || address.equals(this.address))
285285
return;
286286
disconnect();
287287
qdEndpoint.initializeConnectorsForAddress(address);
@@ -419,25 +419,6 @@ private State makeClosed() {
419419
}
420420
}
421421

422-
// lock-free
423-
private State computeState() {
424-
if (stateHolder.isClosed())
425-
return State.CLOSED;
426-
State state = State.NOT_CONNECTED;
427-
for (MessageConnector connector : qdEndpoint.getConnectors()) {
428-
switch (connector.getState()) {
429-
case CONNECTING:
430-
if (state != State.CONNECTED)
431-
state = State.CONNECTING;
432-
break;
433-
case CONNECTED:
434-
state = State.CONNECTED;
435-
break;
436-
}
437-
}
438-
return state;
439-
}
440-
441422
public boolean hasProperty(String key) {
442423
return props.getProperty(key) != null;
443424
}
@@ -729,14 +710,11 @@ public boolean supportsProperty(String key) {
729710
private class StateHolder implements Runnable {
730711
// SYNC(lock+this) on write to State.CLOSE; SYNC(this) on any other writes
731712
volatile State state = State.NOT_CONNECTED;
732-
733-
// notify changes from oldState to newState
734-
State oldState; // null when no notification was delivered yet
735-
State newState = state; // new state in previous notification
713+
private State lastFiredState = state;
736714

737715
// at most one task is scheduled at any time
738-
int scheduled; // > 0 when run() is scheduled to run, increment only any subsequent change
739-
Thread processingThread; // current thread that processes notifications
716+
private int scheduled; // > 0 when run() is scheduled to run or is running, increment on any change
717+
private volatile Thread processingThread; // current thread that processes notifications
740718

741719
// SYNC(lock), for connect & disconnect method to immediately recompute state
742720
synchronized void updateNow() {
@@ -767,29 +745,46 @@ private void scheduleImpl() {
767745

768746
@Override
769747
public void run() {
748+
int lastScheduled = 0;
770749
while (true) { // loop while we need to fire state change events
771-
State computedState = computeState();
750+
State oldState;
751+
State newState;
772752
synchronized (this) {
773-
if (state == State.CLOSED)
774-
computedState = State.CLOSED; // closed while were computing state
775-
oldState = newState;
776-
newState = state = computedState;
777-
if (newState == oldState) {
753+
state = computeState();
754+
oldState = lastFiredState;
755+
newState = state;
756+
if (newState == oldState && scheduled == lastScheduled) {
778757
// no change in state -- leave event processing loop
779758
processingThread = null;
780759
scheduled = 0;
781760
notifyAll(); // wakeup awaitOuter
782761
return;
783762
}
763+
lastScheduled = scheduled;
764+
lastFiredState = newState;
784765
// keep reference to processing thread to catch inner wait
785766
processingThread = Thread.currentThread();
786767
}
787-
fireStateChangeEvent(oldState, newState);
768+
if (oldState != newState)
769+
fireStateChangeEvent(oldState, newState);
788770
}
789771
}
790772

791-
boolean isClosed() {
792-
return state == State.CLOSED;
773+
@GuardedBy("this")
774+
private State computeState() {
775+
if (state == State.CLOSED)
776+
return State.CLOSED;
777+
boolean hasConnecting = false;
778+
for (MessageConnector connector : qdEndpoint.getConnectors()) {
779+
switch (connector.getState()) {
780+
case CONNECTING:
781+
hasConnecting = true;
782+
break;
783+
case CONNECTED:
784+
return State.CONNECTED;
785+
}
786+
}
787+
return hasConnecting ? State.CONNECTING : State.NOT_CONNECTED;
793788
}
794789

795790
// SYNC(lock) (then syncs on this)
@@ -817,34 +812,27 @@ private void await(State condition) throws InterruptedException {
817812
awaitOuter(condition);
818813
}
819814

820-
private void awaitInner(State condition) throws InterruptedException {
815+
private synchronized void awaitInner(State condition) throws InterruptedException {
821816
// reenter from inside of run() event processing loop - run inner processing loop and wait
822-
int lastScheduled = 0;
823817
while (true) {
824-
State computedState = computeState();
825-
synchronized (this) {
826-
if (state != State.CLOSED) // make sure not closed while were computing state
827-
state = computedState;
828-
if (isCondition(condition))
829-
return; // condition satisfied at currently computed state and no pending changes -- wait no more
830-
if (scheduled == lastScheduled)
831-
wait(); // wait until anything changes (see notifyAll in scheduleImpl)
832-
lastScheduled = scheduled;
833-
}
818+
state = computeState();
819+
if (isCondition(condition))
820+
return;
821+
wait(); // wait until anything changes (see notifyAll in scheduleImpl)
834822
}
835823
}
836824

837825
private synchronized void awaitOuter(State condition) throws InterruptedException {
838826
// just wait normally from another thread
839827
// wait until processed state is in expected condition and no further changes scheduled
840-
while (!isCondition(condition))
828+
while (!isCondition(condition) || scheduled != 0)
841829
wait();
842830
}
843831

844832
@GuardedBy("this")
845-
// true when computed state is as expected or closed and no pending changes
833+
// true when computed state is as expected or closed
846834
private boolean isCondition(State condition) {
847-
return (state == State.CLOSED || state == condition) && scheduled == 0;
835+
return state == State.CLOSED || state == condition;
848836
}
849837
}
850838
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* !++
3+
* QDS - Quick Data Signalling Library
4+
* !-
5+
* Copyright (C) 2002 - 2019 Devexperts LLC
6+
* !-
7+
* This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
8+
* If a copy of the MPL was not distributed with this file, You can obtain one at
9+
* http://mozilla.org/MPL/2.0/.
10+
* !__
11+
*/
12+
package com.dxfeed.api.test;
13+
14+
import com.devexperts.logging.Logging;
15+
import com.devexperts.test.ThreadCleanCheck;
16+
import com.dxfeed.api.DXEndpoint;
17+
import org.junit.After;
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
21+
import java.beans.PropertyChangeEvent;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.junit.Assert.assertEquals;
26+
import static org.junit.Assert.assertTrue;
27+
28+
public class DXEndpointListenerTest {
29+
private static final Logging log = Logging.getLogging(DXEndpointListenerTest.class);
30+
private static final int AWAIT_TIMEOUT = 10_000;
31+
32+
private volatile CountDownLatch pass;
33+
private volatile DXEndpoint.State expectedState;
34+
35+
@Before
36+
public void setUp() throws Exception {
37+
ThreadCleanCheck.before();
38+
}
39+
40+
@After
41+
public void tearDown() throws Exception {
42+
ThreadCleanCheck.after();
43+
}
44+
45+
@Test
46+
public void testSimpleListener() throws InterruptedException {
47+
DXEndpoint endpoint = DXEndpoint.create(DXEndpoint.Role.PUBLISHER);
48+
pass = new CountDownLatch(1);
49+
expectedState = DXEndpoint.State.CONNECTED;
50+
endpoint.addStateChangeListener(evt -> {
51+
logChangeEvent(evt);
52+
if (evt.getNewValue() == expectedState)
53+
pass.countDown();
54+
});
55+
endpoint.connect(":0");
56+
await("CONNECTED state reached", pass);
57+
pass = new CountDownLatch(1);
58+
expectedState = DXEndpoint.State.NOT_CONNECTED;
59+
endpoint.disconnect();
60+
await("NOT_CONNECTED state reached", pass);
61+
pass = new CountDownLatch(1);
62+
expectedState = DXEndpoint.State.CLOSED;
63+
endpoint.close();
64+
await("CLOSED state reached", pass);
65+
}
66+
67+
@Test
68+
public void testDisconnectFromListener() throws InterruptedException {
69+
DXEndpoint endpoint = DXEndpoint.create(DXEndpoint.Role.PUBLISHER);
70+
endpoint.addStateChangeListener(evt -> {
71+
logChangeEvent(evt);
72+
if (evt.getNewValue() == DXEndpoint.State.CONNECTED) {
73+
endpoint.disconnect();
74+
}
75+
});
76+
endpoint.connect(":0");
77+
endpoint.awaitNotConnected();
78+
endpoint.close();
79+
}
80+
81+
@Test
82+
public void testInnerAwaitNotConnected() throws InterruptedException {
83+
DXEndpoint endpoint = DXEndpoint.newBuilder()
84+
.withRole(DXEndpoint.Role.PUBLISHER)
85+
.withProperty(DXEndpoint.DXFEED_THREAD_POOL_SIZE_PROPERTY, "4")
86+
.build();
87+
pass = new CountDownLatch(1);
88+
CountDownLatch innerWaitSucceded = new CountDownLatch(1);
89+
endpoint.addStateChangeListener(evt -> {
90+
logChangeEvent(evt);
91+
if (evt.getNewValue() == DXEndpoint.State.CONNECTED) {
92+
pass.countDown();
93+
log.info("Inner wait for disconnect...");
94+
try {
95+
endpoint.awaitNotConnected();
96+
} catch (InterruptedException e) {
97+
e.printStackTrace();
98+
}
99+
innerWaitSucceded.countDown();
100+
}
101+
});
102+
endpoint.connect(":0");
103+
await("CONNECTED state reached", pass);
104+
log.info("Disconnecting...");
105+
endpoint.disconnect();
106+
await("NOT_CONNECTED witnessed by inner wait", innerWaitSucceded);
107+
log.info("Closing...");
108+
endpoint.close();
109+
}
110+
111+
@Test
112+
public void testInnerAwaitClosed() throws InterruptedException {
113+
DXEndpoint endpoint = DXEndpoint.newBuilder()
114+
.withRole(DXEndpoint.Role.PUBLISHER)
115+
.withProperty(DXEndpoint.DXFEED_THREAD_POOL_SIZE_PROPERTY, "4")
116+
.build();
117+
pass = new CountDownLatch(1);
118+
CountDownLatch innerWaitSucceded = new CountDownLatch(1);
119+
endpoint.addStateChangeListener(evt -> {
120+
logChangeEvent(evt);
121+
if (evt.getNewValue() == DXEndpoint.State.CONNECTED) {
122+
pass.countDown();
123+
}
124+
if (evt.getNewValue() == DXEndpoint.State.NOT_CONNECTED) {
125+
pass.countDown();
126+
log.info("Inner wait for disconnect...");
127+
try {
128+
endpoint.closeAndAwaitTermination();
129+
} catch (InterruptedException e) {
130+
e.printStackTrace();
131+
}
132+
innerWaitSucceded.countDown();
133+
}
134+
});
135+
endpoint.connect(":0");
136+
// It's important to get connected state here.
137+
// Fast disconnect may cause that listener will never be notified by current contract.
138+
await("CONNECTED state reached", pass);
139+
pass = new CountDownLatch(1);
140+
log.info("Disconnecting...");
141+
endpoint.disconnect();
142+
await("NOT_CONNECTED state reached", pass);
143+
log.info("Waiting inner close...");
144+
await("Closed by inner wait", innerWaitSucceded);
145+
assertEquals(DXEndpoint.State.CLOSED, endpoint.getState());
146+
}
147+
148+
private void await(String message, CountDownLatch pass) throws InterruptedException {
149+
assertTrue(message, pass.await(AWAIT_TIMEOUT, TimeUnit.MILLISECONDS));
150+
}
151+
152+
private void logChangeEvent(PropertyChangeEvent evt) {
153+
log.info("****** " + evt.getPropertyName() + ": " + evt.getOldValue() + " -> " + evt.getNewValue());
154+
}
155+
156+
}

dxfeed-ipf-filter/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<parent>
1515
<artifactId>QD</artifactId>
1616
<groupId>com.devexperts.qd</groupId>
17-
<version>3.273</version>
17+
<version>3.274</version>
1818
</parent>
1919
<modelVersion>4.0.0</modelVersion>
2020

0 commit comments

Comments
 (0)