Skip to content

Commit 2297d46

Browse files
tutman96Nevexo
authored andcommitted
wip: Plugin RPC with status reporting to the UI
1 parent db9698b commit 2297d46

File tree

9 files changed

+411
-74
lines changed

9 files changed

+411
-74
lines changed

internal/jsonrpc/rpc_server.go

Lines changed: 134 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,152 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"log"
89
"reflect"
10+
"sync"
11+
"sync/atomic"
12+
"time"
913
)
1014

1115
type JSONRPCServer struct {
1216
writer io.Writer
1317

1418
handlers map[string]*RPCHandler
19+
nextId atomic.Int64
20+
21+
responseChannelsMutex sync.Mutex
22+
responseChannels map[int64]chan JSONRPCResponse
1523
}
1624

1725
func NewJSONRPCServer(writer io.Writer, handlers map[string]*RPCHandler) *JSONRPCServer {
1826
return &JSONRPCServer{
19-
writer: writer,
20-
handlers: handlers,
27+
writer: writer,
28+
handlers: handlers,
29+
responseChannels: make(map[int64]chan JSONRPCResponse),
30+
nextId: atomic.Int64{},
31+
}
32+
}
33+
34+
func (s *JSONRPCServer) Request(method string, params map[string]interface{}, result interface{}) *JSONRPCResponseError {
35+
id := s.nextId.Add(1)
36+
request := JSONRPCRequest{
37+
JSONRPC: "2.0",
38+
Method: method,
39+
Params: params,
40+
ID: id,
41+
}
42+
requestBytes, err := json.Marshal(request)
43+
if err != nil {
44+
return &JSONRPCResponseError{
45+
Code: -32700,
46+
Message: "Parse error",
47+
Data: err,
48+
}
49+
}
50+
51+
// log.Printf("Sending RPC request: Method=%s, Params=%v, ID=%d", method, params, id)
52+
53+
responseChan := make(chan JSONRPCResponse, 1)
54+
s.responseChannelsMutex.Lock()
55+
s.responseChannels[id] = responseChan
56+
s.responseChannelsMutex.Unlock()
57+
defer func() {
58+
s.responseChannelsMutex.Lock()
59+
delete(s.responseChannels, id)
60+
s.responseChannelsMutex.Unlock()
61+
}()
62+
63+
_, err = s.writer.Write(requestBytes)
64+
if err != nil {
65+
return &JSONRPCResponseError{
66+
Code: -32603,
67+
Message: "Internal error",
68+
Data: err,
69+
}
70+
}
71+
72+
timeout := time.After(5 * time.Second)
73+
select {
74+
case response := <-responseChan:
75+
if response.Error != nil {
76+
return response.Error
77+
}
78+
79+
rawResult, err := json.Marshal(response.Result)
80+
if err != nil {
81+
return &JSONRPCResponseError{
82+
Code: -32603,
83+
Message: "Internal error",
84+
Data: err,
85+
}
86+
}
87+
88+
if err := json.Unmarshal(rawResult, result); err != nil {
89+
return &JSONRPCResponseError{
90+
Code: -32603,
91+
Message: "Internal error",
92+
Data: err,
93+
}
94+
}
95+
96+
return nil
97+
case <-timeout:
98+
return &JSONRPCResponseError{
99+
Code: -32603,
100+
Message: "Internal error",
101+
Data: "timeout waiting for response",
102+
}
21103
}
22104
}
23105

106+
type JSONRPCMessage struct {
107+
Method *string `json:"method,omitempty"`
108+
ID *int64 `json:"id,omitempty"`
109+
}
110+
24111
func (s *JSONRPCServer) HandleMessage(data []byte) error {
112+
// Data will either be a JSONRPCRequest or JSONRPCResponse object
113+
// We need to determine which one it is
114+
var raw JSONRPCMessage
115+
err := json.Unmarshal(data, &raw)
116+
if err != nil {
117+
errorResponse := JSONRPCResponse{
118+
JSONRPC: "2.0",
119+
Error: &JSONRPCResponseError{
120+
Code: -32700,
121+
Message: "Parse error",
122+
},
123+
ID: 0,
124+
}
125+
return s.writeResponse(errorResponse)
126+
}
127+
128+
if raw.Method == nil && raw.ID != nil {
129+
var resp JSONRPCResponse
130+
if err := json.Unmarshal(data, &resp); err != nil {
131+
fmt.Println("error unmarshalling response", err)
132+
return err
133+
}
134+
135+
s.responseChannelsMutex.Lock()
136+
responseChan, ok := s.responseChannels[*raw.ID]
137+
s.responseChannelsMutex.Unlock()
138+
if ok {
139+
responseChan <- resp
140+
} else {
141+
log.Println("No response channel found for ID", resp.ID)
142+
}
143+
return nil
144+
}
145+
25146
var request JSONRPCRequest
26-
err := json.Unmarshal(data, &request)
147+
err = json.Unmarshal(data, &request)
27148
if err != nil {
28149
errorResponse := JSONRPCResponse{
29150
JSONRPC: "2.0",
30-
Error: map[string]interface{}{
31-
"code": -32700,
32-
"message": "Parse error",
151+
Error: &JSONRPCResponseError{
152+
Code: -32700,
153+
Message: "Parse error",
33154
},
34155
ID: 0,
35156
}
@@ -41,9 +162,9 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error {
41162
if !ok {
42163
errorResponse := JSONRPCResponse{
43164
JSONRPC: "2.0",
44-
Error: map[string]interface{}{
45-
"code": -32601,
46-
"message": "Method not found",
165+
Error: &JSONRPCResponseError{
166+
Code: -32601,
167+
Message: "Method not found",
47168
},
48169
ID: request.ID,
49170
}
@@ -54,10 +175,10 @@ func (s *JSONRPCServer) HandleMessage(data []byte) error {
54175
if err != nil {
55176
errorResponse := JSONRPCResponse{
56177
JSONRPC: "2.0",
57-
Error: map[string]interface{}{
58-
"code": -32603,
59-
"message": "Internal error",
60-
"data": err.Error(),
178+
Error: &JSONRPCResponseError{
179+
Code: -32603,
180+
Message: "Internal error",
181+
Data: err.Error(),
61182
},
62183
ID: request.ID,
63184
}

internal/jsonrpc/types.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@ type JSONRPCRequest struct {
88
}
99

1010
type JSONRPCResponse struct {
11-
JSONRPC string `json:"jsonrpc"`
12-
Result interface{} `json:"result,omitempty"`
13-
Error interface{} `json:"error,omitempty"`
14-
ID interface{} `json:"id"`
11+
JSONRPC string `json:"jsonrpc"`
12+
Result interface{} `json:"result,omitempty"`
13+
Error *JSONRPCResponseError `json:"error,omitempty"`
14+
ID interface{} `json:"id"`
15+
}
16+
17+
type JSONRPCResponseError struct {
18+
Code int `json:"code"`
19+
Message string `json:"message"`
20+
Data interface{} `json:"data,omitempty"`
1521
}
1622

1723
type JSONRPCEvent struct {

internal/plugin/install.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package plugin
33
import (
44
"fmt"
55
"log"
6-
"net"
76
"os"
87
"os/exec"
98
"path"
@@ -22,7 +21,7 @@ type PluginInstall struct {
2221
manifest *PluginManifest
2322
runningVersion *string
2423
processManager *ProcessManager
25-
rpcListener net.Listener
24+
rpcServer *PluginRpcServer
2625
}
2726

2827
func (p *PluginInstall) GetManifest() (*PluginManifest, error) {
@@ -54,13 +53,24 @@ func (p *PluginInstall) GetStatus() (*PluginStatus, error) {
5453
Enabled: p.Enabled,
5554
}
5655

57-
status.Status = "stopped"
58-
if p.processManager != nil {
59-
status.Status = "running"
60-
if p.processManager.LastError != nil {
61-
status.Status = "errored"
62-
status.Error = p.processManager.LastError.Error()
56+
if p.rpcServer != nil && p.rpcServer.status.Status != "disconnected" {
57+
log.Printf("Status from RPC: %v", p.rpcServer.status)
58+
status.Status = p.rpcServer.status.Status
59+
status.Message = p.rpcServer.status.Message
60+
61+
if status.Status == "error" {
62+
status.Message = p.rpcServer.status.Message
63+
}
64+
} else {
65+
status.Status = "stopped"
66+
if p.processManager != nil {
67+
status.Status = "running"
68+
if p.processManager.LastError != nil {
69+
status.Status = "errored"
70+
status.Message = p.processManager.LastError.Error()
71+
}
6372
}
73+
log.Printf("Status from process manager: %v", status.Status)
6474
}
6575

6676
return &status, nil
@@ -94,34 +104,33 @@ func (p *PluginInstall) ReconcileSubprocess() error {
94104
p.processManager.Disable()
95105
p.processManager = nil
96106
p.runningVersion = nil
97-
p.rpcListener.Close()
98-
p.rpcListener = nil
107+
err = p.rpcServer.Stop()
108+
if err != nil {
109+
return fmt.Errorf("failed to stop rpc server: %v", err)
110+
}
99111
}
100112

101113
if versionShouldBeRunning == "" {
102114
return nil
103115
}
104116

105117
workingDir := path.Join(pluginsFolder, "working_dirs", p.manifest.Name)
106-
socketPath := path.Join(workingDir, "plugin.sock")
107-
108-
os.Remove(socketPath)
109118
err = os.MkdirAll(workingDir, 0755)
110119
if err != nil {
111120
return fmt.Errorf("failed to create working directory: %v", err)
112121
}
113122

114-
listener, err := net.Listen("unix", socketPath)
123+
p.rpcServer = NewPluginRpcServer(p, workingDir)
124+
err = p.rpcServer.Start()
115125
if err != nil {
116-
return fmt.Errorf("failed to listen on socket: %v", err)
126+
return fmt.Errorf("failed to start rpc server: %v", err)
117127
}
118-
p.rpcListener = listener
119128

120129
p.processManager = NewProcessManager(func() *exec.Cmd {
121130
cmd := exec.Command(manifest.BinaryPath)
122131
cmd.Dir = p.GetExtractedFolder()
123132
cmd.Env = append(cmd.Env,
124-
"JETKVM_PLUGIN_SOCK="+socketPath,
133+
"JETKVM_PLUGIN_SOCK="+p.rpcServer.SocketPath(),
125134
"JETKVM_PLUGIN_WORKING_DIR="+workingDir,
126135
)
127136
cmd.Stdout = os.Stdout
@@ -147,8 +156,7 @@ func (p *PluginInstall) Shutdown() {
147156
p.runningVersion = nil
148157
}
149158

150-
if p.rpcListener != nil {
151-
p.rpcListener.Close()
152-
p.rpcListener = nil
159+
if p.rpcServer != nil {
160+
p.rpcServer.Stop()
153161
}
154162
}

0 commit comments

Comments
 (0)