Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package betaflight.configurator;

import android.os.Bundle;
import com.getcapacitor.BridgeActivity;
import betaflight.configurator.plugin.SocketPlugin;

public class MainActivity extends BridgeActivity {}
public class MainActivity extends BridgeActivity {
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
registerPlugin(SocketPlugin.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
package betaflight.configurator.plugin;

import android.util.Base64;
import android.util.Log;
import com.getcapacitor.JSObject;
import com.getcapacitor.Plugin;
import com.getcapacitor.PluginCall;
import com.getcapacitor.PluginMethod;
import com.getcapacitor.annotation.CapacitorPlugin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

/**
* Capacitor plugin that provides raw TCP socket functionality with thread safety,
* robust resource management, and comprehensive error handling.
*/
@CapacitorPlugin(name = "SocketPlugin")
public class SocketPlugin extends Plugin {
private static final String TAG = "SocketPlugin";

// Error messages
private static final String ERROR_IP_REQUIRED = "IP address is required";
private static final String ERROR_INVALID_PORT = "Invalid port number";
private static final String ERROR_ALREADY_CONNECTED = "Already connected; please disconnect first";
private static final String ERROR_NOT_CONNECTED = "Not connected to any server";
private static final String ERROR_DATA_REQUIRED = "Data is required";
private static final String ERROR_CONNECTION_LOST = "Connection lost";
private static final String ERROR_CONNECTION_CLOSED = "Connection closed by peer";

// Connection settings
private static final int DEFAULT_TIMEOUT_MS = 30_000;
private static final int MIN_PORT = 1;
private static final int MAX_PORT = 65535;

private enum ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
ERROR
}

// Thread-safe state and locks
private final AtomicReference<ConnectionState> state = new AtomicReference<>(ConnectionState.DISCONNECTED);
private final ReentrantLock socketLock = new ReentrantLock();
private final ReentrantLock writerLock = new ReentrantLock();

private Socket socket;
private InputStream input;
private OutputStream output;
private Thread readerThread;
private volatile boolean readerRunning = false;

@PluginMethod
public void connect(final PluginCall call) {
call.setKeepAlive(true);
String ip = call.getString("ip");
int port = call.getInt("port", -1);

if (ip == null || ip.isEmpty()) {
call.reject(ERROR_IP_REQUIRED);
call.setKeepAlive(false);
return;
}
if (port < MIN_PORT || port > MAX_PORT) {
call.reject(ERROR_INVALID_PORT);
call.setKeepAlive(false);
return;
}
if (!compareAndSetState(ConnectionState.DISCONNECTED, ConnectionState.CONNECTING)) {
call.reject(ERROR_ALREADY_CONNECTED);
call.setKeepAlive(false);
return;
}

getBridge().getExecutor().execute(() -> {
socketLock.lock();
try {
socket = new Socket();
socket.connect(new InetSocketAddress(ip, port), DEFAULT_TIMEOUT_MS);
socket.setSoTimeout(DEFAULT_TIMEOUT_MS);

input = socket.getInputStream();
output = socket.getOutputStream();

state.set(ConnectionState.CONNECTED);
JSObject result = new JSObject();
result.put("success", true);
call.resolve(result);
Log.d(TAG, "Connected to " + ip + ":" + port);

startReaderThread();
} catch (Exception e) {
state.set(ConnectionState.ERROR);
closeResourcesInternal();
call.reject("Connection failed: " + e.getMessage());
Log.e(TAG, "Connection failed", e);
} finally {
socketLock.unlock();
call.setKeepAlive(false);
}
});
}

@PluginMethod
public void send(final PluginCall call) {
String data = call.getString("data");
if (data == null || data.isEmpty()) {
call.reject(ERROR_DATA_REQUIRED);
return;
}
if (state.get() != ConnectionState.CONNECTED) {
call.reject(ERROR_NOT_CONNECTED);
return;
}
call.setKeepAlive(true);

getBridge().getExecutor().execute(() -> {
writerLock.lock();
try {
if (output == null || state.get() != ConnectionState.CONNECTED) {
call.reject(ERROR_CONNECTION_LOST);
return;
}
byte[] payload = (data + "\n").getBytes(StandardCharsets.UTF_8);
output.write(payload);
output.flush();

JSObject result = new JSObject();
result.put("success", true);
call.resolve(result);
Log.d(TAG, "Sent data: " + truncateForLog(data));
} catch (Exception e) {
handleCommunicationError(e, "Send failed", call);
} finally {
writerLock.unlock();
call.setKeepAlive(false);
}
});
}

@PluginMethod
public void receive(final PluginCall call) {
// Deprecated by continuous reader (Task 2)
JSObject result = new JSObject();
result.put("data", "");
call.reject("Continuous read active. Listen for 'dataReceived' events instead.");
}

@PluginMethod
public void disconnect(final PluginCall call) {
ConnectionState current = state.get();
if (current == ConnectionState.DISCONNECTED) {
JSObject result = new JSObject();
result.put("success", true);
call.resolve(result);
return;
}
if (!compareAndSetState(current, ConnectionState.DISCONNECTING)) {
call.reject("Invalid state for disconnect: " + current);
return;
}
call.setKeepAlive(true);

getBridge().getExecutor().execute(() -> {
socketLock.lock();
try {
closeResourcesInternal();
state.set(ConnectionState.DISCONNECTED);
JSObject result = new JSObject();
result.put("success", true);
call.resolve(result);
Log.d(TAG, "Disconnected successfully");
} catch (Exception e) {
state.set(ConnectionState.ERROR);
call.reject("Disconnect failed: " + e.getMessage());
Log.e(TAG, "Disconnect failed", e);
} finally {
socketLock.unlock();
call.setKeepAlive(false);
}
});
}

@PluginMethod
public void getStatus(final PluginCall call) {
JSObject result = new JSObject();
result.put("connected", state.get() == ConnectionState.CONNECTED);
result.put("state", state.get().toString());
call.resolve(result);
}

@Override
protected void handleOnDestroy() {
socketLock.lock();
try {
state.set(ConnectionState.DISCONNECTING);
closeResourcesInternal();
state.set(ConnectionState.DISCONNECTED);
} catch (Exception e) {
Log.e(TAG, "Error cleaning up resources on destroy", e);
} finally {
socketLock.unlock();
}
super.handleOnDestroy();
}

private void startReaderThread() {
if (readerThread != null && readerThread.isAlive()) return;
readerRunning = true;
readerThread = new Thread(() -> {
Log.d(TAG, "Reader thread started");
try {
byte[] buf = new byte[4096];
while (readerRunning && state.get() == ConnectionState.CONNECTED && input != null) {
int read = input.read(buf);
if (read == -1) {
notifyDisconnectFromPeer();
break;
}
if (read > 0) {
byte[] chunk = Arrays.copyOf(buf, read);
String b64 = Base64.encodeToString(chunk, Base64.NO_WRAP);
JSObject payload = new JSObject();
payload.put("data", b64);
notifyListeners("dataReceived", payload);
}
}
} catch (Exception e) {
if (readerRunning) {
Log.e(TAG, "Reader thread error", e);
JSObject err = new JSObject();
err.put("error", e.getMessage());
notifyListeners("dataReceivedError", err);
handleCommunicationError(e, "Receive failed", null);
}
} finally {
Log.d(TAG, "Reader thread stopped");
}
}, "SocketReaderThread");
readerThread.start();
}

private void notifyDisconnectFromPeer() {
Log.d(TAG, "Peer closed connection");
JSObject evt = new JSObject();
evt.put("reason", "peer_closed");
notifyListeners("connectionClosed", evt);
socketLock.lock();
try {
state.set(ConnectionState.ERROR);
closeResourcesInternal();
state.set(ConnectionState.DISCONNECTED);
} finally {
socketLock.unlock();
}
}

private void stopReaderThread() {
readerRunning = false;
if (readerThread != null) {
try {
readerThread.interrupt();
readerThread.join(500);
} catch (InterruptedException ignored) {}
readerThread = null;
}
}

private void closeResourcesInternal() {
stopReaderThread();
if (input != null) { try { input.close(); } catch (IOException e) { Log.e(TAG, "Error closing input stream", e); } finally { input = null; } }
if (output != null) { try { output.close(); } catch (IOException e) { Log.e(TAG, "Error closing output stream", e); } finally { output = null; } }
if (socket != null) { try { socket.close(); } catch (IOException e) { Log.e(TAG, "Error closing socket", e); } finally { socket = null; } }
}

private void handleCommunicationError(Exception error, String message, PluginCall call) {
socketLock.lock();
try {
state.set(ConnectionState.ERROR);
closeResourcesInternal();
state.set(ConnectionState.DISCONNECTED);

String fullMsg = message + ": " + (error != null ? error.getMessage() : "unknown error");
if (call != null) {
call.reject(fullMsg);
} else {
// No PluginCall available (e.g., background reader thread). Log the error.
Log.e(TAG, fullMsg, error);
// Optionally notify listeners (commented to avoid duplicate notifications):
// JSObject err = new JSObject();
// err.put("error", fullMsg);
// notifyListeners("socketError", err);
}
Log.e(TAG, message, error);
} finally {
socketLock.unlock();
}
}

private boolean compareAndSetState(ConnectionState expected, ConnectionState newState) {
return state.compareAndSet(expected, newState);
}

private String truncateForLog(String data) {
if (data == null) return "null";
final int maxLen = 100;
if (data.length() <= maxLen) return data;
return data.substring(0, maxLen) + "... (" + data.length() + " chars)";
}
}
21 changes: 21 additions & 0 deletions capacitor-plugin-socket/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "capacitor-plugin-socket",
"version": "1.0.0",
"description": "A Capacitor plugin for handling raw TCP sockets.",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": ["dist/*", "package.json", "README.md"],
"scripts": {
"clean": "rimraf dist",
"build": "npm run clean && tsc -p tsconfig.json"
},
"keywords": ["capacitor", "plugin", "tcp", "socket"],
"author": "Betaflight <dev.betaflight.com>",
"license": "MIT",
"peerDependencies": {
"@capacitor/core": "^5.0.0"
},
"devDependencies": {
"typescript": "^5.0.0"
}
}
7 changes: 7 additions & 0 deletions capacitor-plugin-socket/src/definitions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface SocketPlugin {
connect(options: { ip: string; port: number }): Promise<{ success: boolean }>;
send(options: { data: string }): Promise<{ success: boolean }>;
receive(): Promise<{ data: string }>;
disconnect(): Promise<{ success: boolean }>;
getStatus(): Promise<{ connected: boolean, state: string }>;
}
8 changes: 8 additions & 0 deletions capacitor-plugin-socket/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { registerPlugin } from '@capacitor/core';

const SocketPlugin = registerPlugin('SocketPlugin', {
web: () => import('./web').then(m => new m.SocketPluginWeb()),
});

export * from './definitions';
export { SocketPlugin };
Loading