Skip to content

Commit c6ede32

Browse files
committed
fix(Android): Fixed connections being not concurrent
1 parent 2db5065 commit c6ede32

File tree

4 files changed

+56
-60
lines changed

4 files changed

+56
-60
lines changed

android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.net.InetSocketAddress;
1313
import java.net.Socket;
1414

15+
import androidx.annotation.Nullable;
16+
1517
class TcpSocketClient {
1618
private TcpReceiverTask receiverTask;
1719
private Socket socket;
@@ -23,23 +25,29 @@ class TcpSocketClient {
2325
this.id = id;
2426
}
2527

26-
/**
27-
* TcpSocketClient constructor
28-
*
29-
* @param address server address
30-
* @param port server port
31-
* @param options extra options
32-
*/
33-
public TcpSocketClient(final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id,
34-
final String address, final Integer port, final ReadableMap options, final Network network)
35-
throws IOException {
28+
TcpSocketClient(final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id, @Nullable final Socket socket) {
3629
this(id);
30+
this.socket = socket;
31+
receiverTask = new TcpReceiverTask();
32+
mReceiverListener = receiverListener;
33+
}
34+
35+
36+
public int getId() {
37+
return id;
38+
}
39+
40+
public Socket getSocket() {
41+
return socket;
42+
}
43+
44+
public void connect(final String address, final Integer port, final ReadableMap options, final Network network) throws IOException {
45+
if (socket != null) throw new IOException("Already connected");
46+
socket = new Socket();
3747
// Get the addresses
3848
String localAddress = options.getString("localAddress");
3949
InetAddress localInetAddress = InetAddress.getByName(localAddress);
4050
InetAddress remoteInetAddress = InetAddress.getByName(address);
41-
// Create the socket
42-
socket = new Socket();
4351
if (network != null)
4452
network.bindSocket(socket);
4553
// setReuseAddress
@@ -54,28 +62,12 @@ public TcpSocketClient(final TcpReceiverTask.OnDataReceivedListener receiverList
5462
int localPort = options.getInt("localPort");
5563
socket.bind(new InetSocketAddress(localInetAddress, localPort));
5664
socket.connect(new InetSocketAddress(remoteInetAddress, port));
57-
receiverTask = new TcpReceiverTask();
58-
mReceiverListener = receiverListener;
59-
//noinspection unchecked
60-
receiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, new Pair<>(this, receiverListener));
65+
startListening();
6166
}
6267

63-
TcpSocketClient(final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id, final Socket socket) {
64-
this(id);
65-
this.socket = socket;
66-
receiverTask = new TcpReceiverTask();
67-
mReceiverListener = receiverListener;
68+
public void startListening() {
6869
//noinspection unchecked
69-
receiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, new Pair<>(this, receiverListener));
70-
}
71-
72-
73-
public int getId() {
74-
return id;
75-
}
76-
77-
public Socket getSocket() {
78-
return socket;
70+
receiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, new Pair<>(this, mReceiverListener));
7971
}
8072

8173
/**
@@ -99,7 +91,6 @@ public void close() {
9991
// stop the receiving task
10092
receiverTask.cancel(true);
10193
}
102-
10394
// close the socket
10495
if (socket != null && !socket.isClosed()) {
10596
socket.close();

android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import android.net.ConnectivityManager;
77
import android.net.NetworkCapabilities;
88
import android.net.NetworkRequest;
9-
import android.util.SparseArray;
9+
import android.os.AsyncTask;
1010
import android.util.Base64;
1111
import android.net.Network;
1212

@@ -24,6 +24,7 @@
2424
import java.net.Inet6Address;
2525
import java.net.InetAddress;
2626
import java.net.InetSocketAddress;
27+
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.CountDownLatch;
2829

2930
import androidx.annotation.NonNull;
@@ -32,8 +33,8 @@
3233
public class TcpSocketModule extends ReactContextBaseJavaModule implements TcpReceiverTask.OnDataReceivedListener {
3334

3435
private final ReactApplicationContext mReactContext;
35-
private final SparseArray<TcpSocketClient> socketClients = new SparseArray<>();
36-
private final SparseArray<Network> mNetworkMap = new SparseArray<>();
36+
private final ConcurrentHashMap<Integer, TcpSocketClient> socketClients = new ConcurrentHashMap<>();
37+
private final ConcurrentHashMap<String, Network> mNetworkMap = new ConcurrentHashMap<>();
3738
private Network mSelectedNetwork;
3839

3940
private static final String TAG = "TcpSockets";
@@ -44,7 +45,8 @@ public TcpSocketModule(ReactApplicationContext reactContext) {
4445
}
4546

4647
@Override
47-
public @NonNull String getName() {
48+
public @NonNull
49+
String getName() {
4850
return TAG;
4951
}
5052

@@ -63,9 +65,9 @@ private void sendEvent(String eventName, WritableMap params) {
6365
private void selectNetwork(@Nullable final String iface, @Nullable final String ipAddress) throws InterruptedException {
6466
if (iface == null) return;
6567
mSelectedNetwork = null;
66-
if (ipAddress != null){
67-
Network cachedNetwork = mNetworkMap.get(ipAddress.hashCode());
68-
if (cachedNetwork != null){
68+
if (ipAddress != null) {
69+
Network cachedNetwork = mNetworkMap.get(ipAddress);
70+
if (cachedNetwork != null) {
6971
mSelectedNetwork = cachedNetwork;
7072
return;
7173
}
@@ -81,7 +83,7 @@ private void selectNetwork(@Nullable final String iface, @Nullable final String
8183
public void onAvailable(Network network) {
8284
mSelectedNetwork = network;
8385
if (ipAddress != null && !ipAddress.equals("0.0.0.0"))
84-
mNetworkMap.put(ipAddress.hashCode(), mSelectedNetwork);
86+
mNetworkMap.put(ipAddress, mSelectedNetwork);
8587
awaitingNetwork.countDown(); // Stop waiting
8688
}
8789

@@ -97,16 +99,16 @@ public void onUnavailable() {
9799
mSelectedNetwork = null;
98100
break;
99101
}
100-
if (mSelectedNetwork!= null && ipAddress != null && !ipAddress.equals("0.0.0.0"))
101-
mNetworkMap.put(ipAddress.hashCode(), mSelectedNetwork);
102+
if (mSelectedNetwork != null && ipAddress != null && !ipAddress.equals("0.0.0.0"))
103+
mNetworkMap.put(ipAddress, mSelectedNetwork);
102104
}
103105

104106
/**
105107
* Creates a TCP Socket and establish a connection with the given host
106108
*
107-
* @param cId socket ID
108-
* @param host socket IP address
109-
* @param port socket port to be bound
109+
* @param cId socket ID
110+
* @param host socket IP address
111+
* @param port socket port to be bound
110112
* @param options extra options
111113
*/
112114
@SuppressLint("StaticFieldLeak")
@@ -131,14 +133,15 @@ protected void doInBackgroundGuarded(Void... params) {
131133
String localAddress = options.getString("localAddress");
132134
String iface = options.getString("interface");
133135
selectNetwork(iface, localAddress);
134-
client = new TcpSocketClient(TcpSocketModule.this, cId, host, port, options, mSelectedNetwork);
136+
client = new TcpSocketClient(TcpSocketModule.this, cId, null);
135137
socketClients.put(cId, client);
138+
client.connect(host, port, options, mSelectedNetwork);
136139
onConnect(cId, host, port);
137140
} catch (Exception e) {
138141
onError(cId, e.getMessage());
139142
}
140143
}
141-
}.execute();
144+
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
142145
}
143146

144147
@SuppressLint("StaticFieldLeak")
@@ -149,7 +152,7 @@ public void write(final Integer cId, final String base64String, final Callback c
149152
@Override
150153
protected void doInBackgroundGuarded(Void... params) {
151154
TcpSocketClient socketClient = socketClients.get(cId);
152-
if (socketClient == null){
155+
if (socketClient == null) {
153156
return;
154157
}
155158
try {
@@ -164,7 +167,7 @@ protected void doInBackgroundGuarded(Void... params) {
164167
callback.invoke();
165168
}
166169
}
167-
}.execute();
170+
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
168171
}
169172

170173
@SuppressLint("StaticFieldLeak")
@@ -181,7 +184,7 @@ protected void doInBackgroundGuarded(Void... params) {
181184
socketClient.close();
182185
socketClients.remove(cId);
183186
}
184-
}.execute();
187+
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
185188
}
186189

187190
@SuppressWarnings("unused")
@@ -207,7 +210,7 @@ protected void doInBackgroundGuarded(Void... params) {
207210
onError(cId, uhe.getMessage());
208211
}
209212
}
210-
}.execute();
213+
}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
211214
}
212215

213216
// TcpReceiverTask.OnDataReceivedListener
@@ -255,7 +258,7 @@ public void onError(Integer id, String error) {
255258
}
256259

257260
@Override
258-
public void onConnection(Integer serverId, Integer clientId, InetSocketAddress socketAddress){
261+
public void onConnection(Integer serverId, Integer clientId, InetSocketAddress socketAddress) {
259262
WritableMap eventParams = Arguments.createMap();
260263
eventParams.putInt("id", serverId);
261264

android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import android.annotation.SuppressLint;
44
import android.os.AsyncTask;
5-
import android.util.SparseArray;
65

76
import com.facebook.react.bridge.ReadableMap;
87

@@ -11,13 +10,13 @@
1110
import java.net.InetSocketAddress;
1211
import java.net.ServerSocket;
1312
import java.net.Socket;
13+
import java.util.concurrent.ConcurrentHashMap;
1414

15-
public class TcpSocketServer extends TcpSocketClient {
15+
public final class TcpSocketServer extends TcpSocketClient {
1616
private ServerSocket serverSocket;
1717
private TcpReceiverTask.OnDataReceivedListener mReceiverListener;
1818
private int clientSocketIds;
19-
private final SparseArray<TcpSocketClient> socketClients;
20-
private final SparseArray<TcpSocketClient> serverSocketClients = new SparseArray<>();
19+
private final ConcurrentHashMap<Integer, TcpSocketClient> socketClients;
2120

2221
@SuppressLint("StaticFieldLeak")
2322
private final AsyncTask listening = new AsyncTask() {
@@ -28,8 +27,8 @@ protected Void doInBackground(Object[] objects) {
2827
Socket socket = serverSocket.accept();
2928
int clientId = getClientId();
3029
TcpSocketClient socketClient = new TcpSocketClient(mReceiverListener, clientId, socket);
31-
serverSocketClients.put(clientId, socketClient);
3230
socketClients.put(clientId, socketClient);
31+
socketClient.startListening();
3332
mReceiverListener.onConnection(getId(), clientId, new InetSocketAddress(socket.getInetAddress(), socket.getPort()));
3433
}
3534
} catch (IOException e) {
@@ -42,7 +41,7 @@ protected Void doInBackground(Object[] objects) {
4241
};
4342

4443

45-
public TcpSocketServer(final SparseArray<TcpSocketClient> socketClients, final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id,
44+
public TcpSocketServer(final ConcurrentHashMap<Integer, TcpSocketClient> socketClients, final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id,
4645
final ReadableMap options) throws IOException {
4746
super(id);
4847
// Get data from options
@@ -88,7 +87,7 @@ public void write(final byte[] data) {
8887
@Override
8988
public void close() {
9089
try {
91-
if (listening != null && !listening.isCancelled()) {
90+
if (!listening.isCancelled()) {
9291
// stop the receiving task
9392
listening.cancel(true);
9493
}

src/TcpSocket.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,10 @@ export default class TcpSocket {
165165
}
166166

167167
_unregisterEvents() {
168-
this._eventEmitter.listeners().forEach((listener) => listener.remove());
168+
this._eventEmitter.removeAllListeners('connect');
169+
this._eventEmitter.removeAllListeners('close');
170+
this._eventEmitter.removeAllListeners('error');
171+
this._eventEmitter.removeAllListeners('data');
169172
}
170173

171174
_onConnect(address) {

0 commit comments

Comments
 (0)