Skip to content

Commit a128d44

Browse files
committed
Continuous reading
1 parent 6948cee commit a128d44

File tree

1 file changed

+82
-34
lines changed

1 file changed

+82
-34
lines changed

android/app/src/main/java/betaflight/configurator/plugin/SocketPlugin.java

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ private enum ConnectionState {
5454
private Socket socket;
5555
private InputStream input;
5656
private OutputStream output;
57+
private Thread readerThread;
58+
private volatile boolean readerRunning = false;
5759

5860
@PluginMethod
5961
public void connect(final PluginCall call) {
@@ -92,6 +94,8 @@ public void connect(final PluginCall call) {
9294
result.put("success", true);
9395
call.resolve(result);
9496
Log.d(TAG, "Connected to " + ip + ":" + port);
97+
98+
startReaderThread();
9599
} catch (Exception e) {
96100
state.set(ConnectionState.ERROR);
97101
closeResourcesInternal();
@@ -143,34 +147,10 @@ public void send(final PluginCall call) {
143147

144148
@PluginMethod
145149
public void receive(final PluginCall call) {
146-
if (state.get() != ConnectionState.CONNECTED || input == null) {
147-
call.reject(ERROR_NOT_CONNECTED);
148-
return;
149-
}
150-
call.setKeepAlive(true);
151-
152-
getBridge().getExecutor().execute(() -> {
153-
try {
154-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
155-
int b;
156-
while ((b = input.read()) != -1 && b != '\n') {
157-
baos.write(b);
158-
}
159-
String data = new String(baos.toByteArray(), StandardCharsets.UTF_8);
160-
if (data == null) {
161-
handleCommunicationError(new IOException("End of stream"), ERROR_CONNECTION_CLOSED, call);
162-
return;
163-
}
164-
JSObject result = new JSObject();
165-
result.put("data", data);
166-
call.resolve(result);
167-
Log.d(TAG, "Received data: " + truncateForLog(data));
168-
} catch (Exception e) {
169-
handleCommunicationError(e, "Receive failed", call);
170-
} finally {
171-
call.setKeepAlive(false);
172-
}
173-
});
150+
// Deprecated by continuous reader (Task 2)
151+
JSObject result = new JSObject();
152+
result.put("data", "");
153+
call.reject("Continuous read active. Listen for 'dataReceived' events instead.");
174154
}
175155

176156
@PluginMethod
@@ -231,16 +211,84 @@ protected void handleOnDestroy() {
231211
super.handleOnDestroy();
232212
}
233213

234-
private void closeResourcesInternal() {
235-
if (input != null) { try { input.close(); } catch (IOException e) { Log.e(TAG, "Error closing input stream", e); } finally { input = null; }
236-
}
237-
if (output != null) { try { output.close(); } catch (IOException e) { Log.e(TAG, "Error closing output stream", e); } finally { output = null; }
214+
private void startReaderThread() {
215+
if (readerThread != null && readerThread.isAlive()) return;
216+
readerRunning = true;
217+
readerThread = new Thread(() -> {
218+
Log.d(TAG, "Reader thread started");
219+
try {
220+
ByteArrayOutputStream lineBuf = new ByteArrayOutputStream();
221+
while (readerRunning && state.get() == ConnectionState.CONNECTED && input != null) {
222+
int b = input.read();
223+
if (b == -1) {
224+
notifyDisconnectFromPeer();
225+
break;
226+
}
227+
if (b == '\n') {
228+
String line = new String(lineBuf.toByteArray(), StandardCharsets.UTF_8);
229+
lineBuf.reset();
230+
if (line.endsWith("\r")) {
231+
line = line.substring(0, line.length() - 1);
232+
}
233+
JSObject payload = new JSObject();
234+
payload.put("data", line);
235+
notifyListeners("dataReceived", payload);
236+
} else {
237+
lineBuf.write(b);
238+
if (lineBuf.size() > 1024 * 1024) { // safety cap
239+
lineBuf.reset();
240+
Log.w(TAG, "Dropped oversized line");
241+
}
242+
}
243+
}
244+
} catch (Exception e) {
245+
if (readerRunning) {
246+
Log.e(TAG, "Reader thread error", e);
247+
JSObject err = new JSObject();
248+
err.put("error", e.getMessage());
249+
notifyListeners("dataReceivedError", err);
250+
handleCommunicationError(e, "Receive failed", null);
251+
}
252+
} finally {
253+
Log.d(TAG, "Reader thread stopped");
254+
}
255+
}, "SocketReaderThread");
256+
readerThread.start();
257+
}
258+
259+
private void notifyDisconnectFromPeer() {
260+
Log.d(TAG, "Peer closed connection");
261+
JSObject evt = new JSObject();
262+
evt.put("reason", "peer_closed");
263+
notifyListeners("connectionClosed", evt);
264+
socketLock.lock();
265+
try {
266+
state.set(ConnectionState.ERROR);
267+
closeResourcesInternal();
268+
state.set(ConnectionState.DISCONNECTED);
269+
} finally {
270+
socketLock.unlock();
238271
}
239-
if (socket != null) {
240-
try { socket.close(); } catch (IOException e) { Log.e(TAG, "Error closing socket", e); } finally { socket = null; }
272+
}
273+
274+
private void stopReaderThread() {
275+
readerRunning = false;
276+
if (readerThread != null) {
277+
try {
278+
readerThread.interrupt();
279+
readerThread.join(500);
280+
} catch (InterruptedException ignored) {}
281+
readerThread = null;
241282
}
242283
}
243284

285+
private void closeResourcesInternal() {
286+
stopReaderThread();
287+
if (input != null) { try { input.close(); } catch (IOException e) { Log.e(TAG, "Error closing input stream", e); } finally { input = null; } }
288+
if (output != null) { try { output.close(); } catch (IOException e) { Log.e(TAG, "Error closing output stream", e); } finally { output = null; } }
289+
if (socket != null) { try { socket.close(); } catch (IOException e) { Log.e(TAG, "Error closing socket", e); } finally { socket = null; } }
290+
}
291+
244292
private void handleCommunicationError(Exception error, String message, PluginCall call) {
245293
socketLock.lock();
246294
try {

0 commit comments

Comments
 (0)