-
-
Notifications
You must be signed in to change notification settings - Fork 1k
Add custom Capacitor plugin-socket for raw TCP support #4471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
haslinghuis
wants to merge
21
commits into
betaflight:master
Choose a base branch
from
haslinghuis:capacitor-plugin-socket
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+403
−1
Open
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
61dad5c
Add custom Capacitor plugin-socket
haslinghuis affbe27
Rabbit fixes
haslinghuis 8feb255
Refactor SocketPlugin.java
haslinghuis c5c83c9
Move receive operation to background thread
haslinghuis 83336c1
update isConnected flag in closeResources catch block
haslinghuis c1f0502
Handle null return from readLine
haslinghuis 1072211
guard against missing or null port parameter
haslinghuis cda48c0
fix inconsistent error handling in receive method
haslinghuis a8320d0
Rabbit keeps nitpicking
haslinghuis 7a83a79
Suggested by rabbit
haslinghuis 2f33e17
Add getStatus
haslinghuis 1f1b15e
Allow new connect attempt
haslinghuis c2f3c18
Replace file with rabbit suggestion again
haslinghuis b5717ae
Add back getStatus
haslinghuis 49ccabf
Add getStatus to web.ts too
haslinghuis 98362b9
Update message
haslinghuis 6948cee
Change data stream
haslinghuis a128d44
Continuous reading
haslinghuis 527b17e
Data Format
haslinghuis a11c50a
Guard against null PluginCall in handleCommunicationError
haslinghuis 924c183
Reset state to DISCONNECTED after disconnect failure
haslinghuis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
10 changes: 9 additions & 1 deletion
10
android/app/src/main/java/betaflight/configurator/MainActivity.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,13 @@ | ||
| package betaflight.configurator; | ||
|
|
||
| import android.os.Bundle; | ||
| import com.getcapacitor.BridgeActivity; | ||
| import betaflight.configurator.plugin.SocketPlugin; | ||
|
|
||
| public class MainActivity extends BridgeActivity {} | ||
| public class MainActivity extends BridgeActivity { | ||
| @Override | ||
| public void onCreate(Bundle savedInstanceState) { | ||
| super.onCreate(savedInstanceState); | ||
| registerPlugin(SocketPlugin.class); | ||
| } | ||
| } |
315 changes: 315 additions & 0 deletions
315
android/app/src/main/java/betaflight/configurator/plugin/SocketPlugin.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,315 @@ | ||
| package betaflight.configurator.plugin; | ||
|
|
||
| import android.util.Log; | ||
| import com.getcapacitor.JSObject; | ||
| import com.getcapacitor.Plugin; | ||
| import com.getcapacitor.PluginCall; | ||
| import com.getcapacitor.PluginMethod; | ||
| import com.getcapacitor.annotation.CapacitorPlugin; | ||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
| import java.net.InetSocketAddress; | ||
| import java.net.Socket; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| /** | ||
| * Capacitor plugin that provides raw TCP socket functionality with thread safety, | ||
| * robust resource management, and comprehensive error handling. | ||
| */ | ||
| @CapacitorPlugin(name = "SocketPlugin") | ||
| public class SocketPlugin extends Plugin { | ||
| private static final String TAG = "SocketPlugin"; | ||
|
|
||
| // Error messages | ||
| private static final String ERROR_IP_REQUIRED = "IP address is required"; | ||
| private static final String ERROR_INVALID_PORT = "Invalid port number"; | ||
| private static final String ERROR_ALREADY_CONNECTED = "Already connected; please disconnect first"; | ||
| private static final String ERROR_NOT_CONNECTED = "Not connected to any server"; | ||
| private static final String ERROR_DATA_REQUIRED = "Data is required"; | ||
| private static final String ERROR_CONNECTION_LOST = "Connection lost"; | ||
| private static final String ERROR_CONNECTION_CLOSED = "Connection closed by peer"; | ||
|
|
||
| // Connection settings | ||
| private static final int DEFAULT_TIMEOUT_MS = 30_000; | ||
| private static final int MIN_PORT = 1; | ||
| private static final int MAX_PORT = 65535; | ||
|
|
||
| private enum ConnectionState { | ||
| DISCONNECTED, | ||
| CONNECTING, | ||
| CONNECTED, | ||
| DISCONNECTING, | ||
| ERROR | ||
| } | ||
|
|
||
| // Thread-safe state and locks | ||
| private final AtomicReference<ConnectionState> state = new AtomicReference<>(ConnectionState.DISCONNECTED); | ||
| private final ReentrantLock socketLock = new ReentrantLock(); | ||
| private final ReentrantLock writerLock = new ReentrantLock(); | ||
|
|
||
| private Socket socket; | ||
| private InputStream input; | ||
| private OutputStream output; | ||
| private Thread readerThread; | ||
| private volatile boolean readerRunning = false; | ||
|
|
||
| @PluginMethod | ||
| public void connect(final PluginCall call) { | ||
| call.setKeepAlive(true); | ||
| String ip = call.getString("ip"); | ||
| int port = call.getInt("port", -1); | ||
|
|
||
| if (ip == null || ip.isEmpty()) { | ||
| call.reject(ERROR_IP_REQUIRED); | ||
| call.setKeepAlive(false); | ||
| return; | ||
| } | ||
| if (port < MIN_PORT || port > MAX_PORT) { | ||
| call.reject(ERROR_INVALID_PORT); | ||
| call.setKeepAlive(false); | ||
| return; | ||
| } | ||
| if (!compareAndSetState(ConnectionState.DISCONNECTED, ConnectionState.CONNECTING)) { | ||
| call.reject(ERROR_ALREADY_CONNECTED); | ||
| call.setKeepAlive(false); | ||
| return; | ||
| } | ||
|
|
||
| getBridge().getExecutor().execute(() -> { | ||
| socketLock.lock(); | ||
| try { | ||
| socket = new Socket(); | ||
| socket.connect(new InetSocketAddress(ip, port), DEFAULT_TIMEOUT_MS); | ||
| socket.setSoTimeout(DEFAULT_TIMEOUT_MS); | ||
|
|
||
| input = socket.getInputStream(); | ||
| output = socket.getOutputStream(); | ||
|
|
||
| state.set(ConnectionState.CONNECTED); | ||
| JSObject result = new JSObject(); | ||
| result.put("success", true); | ||
| call.resolve(result); | ||
| Log.d(TAG, "Connected to " + ip + ":" + port); | ||
|
|
||
| startReaderThread(); | ||
| } catch (Exception e) { | ||
| state.set(ConnectionState.ERROR); | ||
| closeResourcesInternal(); | ||
| call.reject("Connection failed: " + e.getMessage()); | ||
| Log.e(TAG, "Connection failed", e); | ||
| } finally { | ||
| socketLock.unlock(); | ||
| call.setKeepAlive(false); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @PluginMethod | ||
| public void send(final PluginCall call) { | ||
| String data = call.getString("data"); | ||
| if (data == null || data.isEmpty()) { | ||
| call.reject(ERROR_DATA_REQUIRED); | ||
| return; | ||
| } | ||
| if (state.get() != ConnectionState.CONNECTED) { | ||
| call.reject(ERROR_NOT_CONNECTED); | ||
| return; | ||
| } | ||
| call.setKeepAlive(true); | ||
|
|
||
| getBridge().getExecutor().execute(() -> { | ||
| writerLock.lock(); | ||
| try { | ||
| if (output == null || state.get() != ConnectionState.CONNECTED) { | ||
| call.reject(ERROR_CONNECTION_LOST); | ||
| return; | ||
| } | ||
| byte[] payload = (data + "\n").getBytes(StandardCharsets.UTF_8); | ||
| output.write(payload); | ||
| output.flush(); | ||
|
|
||
| JSObject result = new JSObject(); | ||
| result.put("success", true); | ||
| call.resolve(result); | ||
| Log.d(TAG, "Sent data: " + truncateForLog(data)); | ||
| } catch (Exception e) { | ||
| handleCommunicationError(e, "Send failed", call); | ||
| } finally { | ||
| writerLock.unlock(); | ||
| call.setKeepAlive(false); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @PluginMethod | ||
| public void receive(final PluginCall call) { | ||
| // Deprecated by continuous reader (Task 2) | ||
| JSObject result = new JSObject(); | ||
| result.put("data", ""); | ||
| call.reject("Continuous read active. Listen for 'dataReceived' events instead."); | ||
| } | ||
|
|
||
| @PluginMethod | ||
| public void disconnect(final PluginCall call) { | ||
| ConnectionState current = state.get(); | ||
| if (current == ConnectionState.DISCONNECTED) { | ||
| JSObject result = new JSObject(); | ||
| result.put("success", true); | ||
| call.resolve(result); | ||
| return; | ||
| } | ||
| if (!compareAndSetState(current, ConnectionState.DISCONNECTING)) { | ||
| call.reject("Invalid state for disconnect: " + current); | ||
| return; | ||
| } | ||
| call.setKeepAlive(true); | ||
|
|
||
| getBridge().getExecutor().execute(() -> { | ||
| socketLock.lock(); | ||
| try { | ||
| closeResourcesInternal(); | ||
| state.set(ConnectionState.DISCONNECTED); | ||
| JSObject result = new JSObject(); | ||
| result.put("success", true); | ||
| call.resolve(result); | ||
| Log.d(TAG, "Disconnected successfully"); | ||
| } catch (Exception e) { | ||
| state.set(ConnectionState.ERROR); | ||
| call.reject("Disconnect failed: " + e.getMessage()); | ||
| Log.e(TAG, "Disconnect failed", e); | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } finally { | ||
| socketLock.unlock(); | ||
| call.setKeepAlive(false); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @PluginMethod | ||
| public void getStatus(final PluginCall call) { | ||
| JSObject result = new JSObject(); | ||
| result.put("connected", state.get() == ConnectionState.CONNECTED); | ||
| result.put("state", state.get().toString()); | ||
| call.resolve(result); | ||
| } | ||
|
|
||
| @Override | ||
| protected void handleOnDestroy() { | ||
| socketLock.lock(); | ||
| try { | ||
| state.set(ConnectionState.DISCONNECTING); | ||
| closeResourcesInternal(); | ||
| state.set(ConnectionState.DISCONNECTED); | ||
| } catch (Exception e) { | ||
| Log.e(TAG, "Error cleaning up resources on destroy", e); | ||
| } finally { | ||
| socketLock.unlock(); | ||
| } | ||
| super.handleOnDestroy(); | ||
| } | ||
|
|
||
| private void startReaderThread() { | ||
| if (readerThread != null && readerThread.isAlive()) return; | ||
| readerRunning = true; | ||
| readerThread = new Thread(() -> { | ||
| Log.d(TAG, "Reader thread started"); | ||
| try { | ||
| ByteArrayOutputStream lineBuf = new ByteArrayOutputStream(); | ||
| while (readerRunning && state.get() == ConnectionState.CONNECTED && input != null) { | ||
| int b = input.read(); | ||
| if (b == -1) { | ||
| notifyDisconnectFromPeer(); | ||
| break; | ||
| } | ||
| if (b == '\n') { | ||
| String line = new String(lineBuf.toByteArray(), StandardCharsets.UTF_8); | ||
| lineBuf.reset(); | ||
| if (line.endsWith("\r")) { | ||
| line = line.substring(0, line.length() - 1); | ||
| } | ||
| JSObject payload = new JSObject(); | ||
| payload.put("data", line); | ||
| notifyListeners("dataReceived", payload); | ||
| } else { | ||
| lineBuf.write(b); | ||
| if (lineBuf.size() > 1024 * 1024) { // safety cap | ||
| lineBuf.reset(); | ||
| Log.w(TAG, "Dropped oversized line"); | ||
| } | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| if (readerRunning) { | ||
| Log.e(TAG, "Reader thread error", e); | ||
| JSObject err = new JSObject(); | ||
| err.put("error", e.getMessage()); | ||
| notifyListeners("dataReceivedError", err); | ||
| handleCommunicationError(e, "Receive failed", null); | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } finally { | ||
| Log.d(TAG, "Reader thread stopped"); | ||
| } | ||
| }, "SocketReaderThread"); | ||
| readerThread.start(); | ||
| } | ||
|
|
||
| private void notifyDisconnectFromPeer() { | ||
| Log.d(TAG, "Peer closed connection"); | ||
| JSObject evt = new JSObject(); | ||
| evt.put("reason", "peer_closed"); | ||
| notifyListeners("connectionClosed", evt); | ||
| socketLock.lock(); | ||
| try { | ||
| state.set(ConnectionState.ERROR); | ||
| closeResourcesInternal(); | ||
| state.set(ConnectionState.DISCONNECTED); | ||
| } finally { | ||
| socketLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| private void stopReaderThread() { | ||
| readerRunning = false; | ||
| if (readerThread != null) { | ||
| try { | ||
| readerThread.interrupt(); | ||
| readerThread.join(500); | ||
| } catch (InterruptedException ignored) {} | ||
| readerThread = null; | ||
| } | ||
| } | ||
|
|
||
| private void closeResourcesInternal() { | ||
| stopReaderThread(); | ||
| if (input != null) { try { input.close(); } catch (IOException e) { Log.e(TAG, "Error closing input stream", e); } finally { input = null; } } | ||
| if (output != null) { try { output.close(); } catch (IOException e) { Log.e(TAG, "Error closing output stream", e); } finally { output = null; } } | ||
| if (socket != null) { try { socket.close(); } catch (IOException e) { Log.e(TAG, "Error closing socket", e); } finally { socket = null; } } | ||
| } | ||
|
|
||
| private void handleCommunicationError(Exception error, String message, PluginCall call) { | ||
| socketLock.lock(); | ||
| try { | ||
| state.set(ConnectionState.ERROR); | ||
| closeResourcesInternal(); | ||
| state.set(ConnectionState.DISCONNECTED); | ||
| call.reject(message + ": " + error.getMessage()); | ||
| Log.e(TAG, message, error); | ||
| } finally { | ||
| socketLock.unlock(); | ||
| } | ||
| } | ||
|
|
||
| private boolean compareAndSetState(ConnectionState expected, ConnectionState newState) { | ||
| return state.compareAndSet(expected, newState); | ||
| } | ||
|
|
||
| private String truncateForLog(String data) { | ||
| if (data == null) return "null"; | ||
| final int maxLen = 100; | ||
| if (data.length() <= maxLen) return data; | ||
| return data.substring(0, maxLen) + "... (" + data.length() + " chars)"; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| { | ||
| "name": "capacitor-plugin-socket", | ||
| "version": "1.0.0", | ||
| "description": "A Capacitor plugin for handling raw TCP sockets.", | ||
| "main": "dist/index.js", | ||
| "types": "dist/index.d.ts", | ||
| "files": ["dist/*", "package.json", "README.md"], | ||
| "scripts": { | ||
| "clean": "rimraf dist", | ||
| "build": "npm run clean && tsc -p tsconfig.json" | ||
| }, | ||
| "keywords": ["capacitor", "plugin", "tcp", "socket"], | ||
| "author": "Betaflight <dev.betaflight.com>", | ||
| "license": "MIT", | ||
| "peerDependencies": { | ||
| "@capacitor/core": "^5.0.0" | ||
| }, | ||
| "devDependencies": { | ||
| "typescript": "^5.0.0" | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| export interface SocketPlugin { | ||
| connect(options: { ip: string; port: number }): Promise<{ success: boolean }>; | ||
| send(options: { data: string }): Promise<{ success: boolean }>; | ||
| receive(): Promise<{ data: string }>; | ||
| disconnect(): Promise<{ success: boolean }>; | ||
| getStatus(): Promise<{ connected: boolean, state: string }>; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| import { registerPlugin } from '@capacitor/core'; | ||
|
|
||
| const SocketPlugin = registerPlugin('SocketPlugin', { | ||
| web: () => import('./web').then(m => new m.SocketPluginWeb()), | ||
| }); | ||
|
|
||
| export * from './definitions'; | ||
| export { SocketPlugin }; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.