Skip to content

Commit 35df220

Browse files
修复部分问题
1 parent dedcd58 commit 35df220

File tree

6 files changed

+130
-102
lines changed

6 files changed

+130
-102
lines changed

examples/js_ws/index.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
async function run() {
1+
(async ()=>{
22
let ws = websocket();
33
let shouldExit = false;
44
while (!shouldExit) {
@@ -17,5 +17,4 @@ async function run() {
1717
break;
1818
}
1919
}
20-
}
21-
run().then(r => {});
20+
})()

examples/js_ws_event/event.js

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,31 @@
1-
const name = (await request.getQuery("name"))?.trim();
1+
const name = (request.getQuery("name"))?.trim();
22

33
if (!name) {
44
throw new Error(`Missing or empty name parameter`);
55
}
66

77
const ws = websocket();
88

9-
try {
10-
// 事件处理
11-
event.subscribe("messages").on((msg) => {
12-
ws.writeText(msg);
13-
});
9+
async function eventPull() {
10+
while (true) {
11+
const data = await event.pull('messages')
12+
ws.writeText(data);
13+
}
14+
}
1415

15-
// 主循环
16-
for await (const data of ws.readText()) {
16+
async function messagePull() {
17+
while (true) {
18+
const data = await ws.readText()
1719
if (data === "exit") break;
18-
1920
if (data?.trim()) {
2021
await event.put("messages", JSON.stringify({
21-
name,
22+
name:name,
2223
data: data.trim()
2324
}));
2425
}
2526
}
26-
} finally {
27-
ws.close();
28-
}
27+
}
28+
29+
(async () => {
30+
await Promise.all(eventPull(), messagePull())
31+
})()

pkg/core/filter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ func NextCallWrapper(call FilterCall, parentCall NextCall, stack Filter) NextCal
5151
return func(ctx FilterContext, writer http.ResponseWriter, request *http.Request) error {
5252
zap.L().Debug(fmt.Sprintf("call filter(%s) before", stack.Type), zap.Any("filter", stack))
5353
err := call(ctx, writer, request, parentCall)
54-
zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type), zap.Any("filter", stack), zap.Error(err))
54+
zap.L().Debug(fmt.Sprintf("call filter(%s) after", stack.Type),
55+
zap.Any("filter", stack),
56+
zap.Error(err))
5557
return err
5658
}
5759
}

pkg/filters/goja/goja.go

Lines changed: 78 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package goja
22

33
import (
4-
"context"
4+
"encoding/json"
55
"errors"
6+
"fmt"
67
"io"
78
"net/http"
89
"path/filepath"
@@ -14,6 +15,7 @@ import (
1415
"github.com/dop251/goja_nodejs/eventloop"
1516
"github.com/dop251/goja_nodejs/require"
1617
"github.com/dop251/goja_nodejs/url"
18+
"go.uber.org/zap"
1719
"gopkg.d7z.net/gitea-pages/pkg/core"
1820
)
1921

@@ -47,75 +49,102 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
4749
if err != nil {
4850
return err
4951
}
50-
prg, err := goja.Compile("main.js", js, false)
52+
53+
debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().
54+
Get("debug") == "true", request, w)
55+
program, err := goja.Compile("main.js", js, false)
5156
if err != nil {
52-
return err
57+
return debug.Flush(err)
5358
}
54-
debug := NewDebug(global.EnableDebug && param.Debug && request.URL.Query().Get("debug") == "true", request, w)
55-
registry := newRegistry(ctx)
56-
registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(debug))
57-
loop := eventloop.NewEventLoop(eventloop.WithRegistry(registry), eventloop.EnableConsole(true))
58-
stop := make(chan struct{}, 1)
59-
shutdown := make(chan struct{}, 1)
60-
defer close(shutdown)
61-
timeout, timeoutCancelFunc := context.WithTimeout(ctx, global.Timeout)
62-
defer timeoutCancelFunc()
63-
count := 0
59+
registry := newRegistry(ctx, debug)
60+
jsLoop := eventloop.NewEventLoop(eventloop.WithRegistry(registry),
61+
eventloop.EnableConsole(true))
62+
63+
jsLoop.Start()
64+
defer jsLoop.Stop()
65+
6466
closers := NewClosers()
6567
defer closers.Close()
66-
go func() {
67-
defer func() {
68-
shutdown <- struct{}{}
68+
69+
stop := make(chan error, 1)
70+
defer close(stop)
71+
72+
jsLoop.RunOnLoop(func(vm *goja.Runtime) {
73+
err := func() error {
74+
url.Enable(vm)
75+
if err = RequestInject(ctx, vm, request); err != nil {
76+
return err
77+
}
78+
if err = ResponseInject(vm, debug, request); err != nil {
79+
return err
80+
}
81+
if err = KVInject(ctx, vm); err != nil {
82+
return err
83+
}
84+
if err = EventInject(ctx, vm, jsLoop); err != nil {
85+
return err
86+
}
87+
if global.EnableWebsocket {
88+
var closer io.Closer
89+
closer, err = WebsocketInject(ctx, vm, debug, request, jsLoop)
90+
if err != nil {
91+
return err
92+
}
93+
closers.AddCloser(closer.Close)
94+
}
95+
return nil
6996
}()
70-
select {
71-
case <-timeout.Done():
72-
case <-stop:
73-
}
74-
count = loop.Stop()
75-
}()
76-
loop.Run(func(vm *goja.Runtime) {
77-
url.Enable(vm)
78-
if err = RequestInject(ctx, vm, request); err != nil {
79-
panic(err)
97+
if err != nil {
98+
stop <- errors.Join(err, errors.New("js init failed"))
99+
return
80100
}
81-
if err = ResponseInject(vm, debug, request); err != nil {
82-
panic(err)
101+
result, err := vm.RunProgram(program)
102+
if err != nil {
103+
stop <- err
104+
return
83105
}
84-
if err = KVInject(ctx, vm); err != nil {
85-
panic(err)
86-
}
87-
if err = EventInject(ctx, vm); err != nil {
88-
panic(err)
89-
}
90-
if global.EnableWebsocket {
91-
var closer io.Closer
92-
closer, err = WebsocketInject(ctx, vm, debug, request, loop, timeoutCancelFunc)
93-
if err != nil {
94-
panic(err)
106+
export := result.Export()
107+
if export != nil {
108+
if promise, ok := export.(*goja.Promise); ok {
109+
go func() {
110+
for {
111+
switch promise.State() {
112+
case goja.PromiseStateFulfilled, goja.PromiseStateRejected:
113+
result := promise.Result().Export()
114+
switch data := result.(type) {
115+
case error:
116+
stop <- data
117+
default:
118+
marshal, _ := json.Marshal(result)
119+
zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)),
120+
zap.Any("result", promise.Result().ExportType()))
121+
stop <- nil
122+
}
123+
return
124+
default:
125+
time.Sleep(time.Millisecond * 5)
126+
}
127+
}
128+
}()
95129
}
96-
closers.AddCloser(closer.Close)
97130
}
98-
_, err = vm.RunProgram(prg)
99131
})
100-
stop <- struct{}{}
101-
close(stop)
102-
<-shutdown
103-
if count != 0 {
104-
err = errors.Join(context.DeadlineExceeded, err)
105-
}
106-
return debug.Flush(err)
132+
resultErr := <-stop
133+
return debug.Flush(resultErr)
107134
}, nil
108135
}, nil
109136
}
110137

111-
func newRegistry(ctx core.FilterContext) *require.Registry {
138+
func newRegistry(ctx core.FilterContext, printer console.Printer) *require.Registry {
112139
registry := require.NewRegistry(
113140
require.WithLoader(func(path string) ([]byte, error) {
114141
return ctx.PageVFS.Read(ctx, path)
115142
}),
116143
require.WithPathResolver(func(base, path string) string {
117144
return filepath.Join(base, filepath.FromSlash(path))
118145
}))
146+
registry.RegisterNativeModule(console.ModuleName, console.RequireWithPrinter(printer))
147+
119148
return registry
120149
}
121150

pkg/filters/goja/var_event.go

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,33 @@ package goja
22

33
import (
44
"github.com/dop251/goja"
5+
"github.com/dop251/goja_nodejs/eventloop"
56
"gopkg.d7z.net/gitea-pages/pkg/core"
67
)
78

8-
func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime) error {
9+
func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.EventLoop) error {
910
return jsCtx.GlobalObject().Set("event", map[string]interface{}{
10-
"subscribe": func(key string) (map[string]any, error) {
11-
subscribe, err := ctx.Event.Subscribe(ctx, key)
12-
if err != nil {
13-
return nil, err
14-
}
15-
return map[string]any{
16-
"on": func(f func(string)) {
17-
go func() {
18-
z:
19-
for {
20-
select {
21-
case <-ctx.Done():
22-
break z
23-
case data := <-subscribe:
24-
f(data)
25-
}
26-
}
27-
}()
28-
},
29-
"get": func() (string, error) {
30-
select {
31-
case <-ctx.Done():
32-
return "", ctx.Err()
33-
case data := <-subscribe:
34-
return data, nil
35-
}
36-
},
37-
}, nil
11+
"load": func(key string) *goja.Promise {
12+
promise, resolve, reject := jsCtx.NewPromise()
13+
go func() {
14+
subscribe, err := ctx.Event.Subscribe(ctx, key)
15+
if err != nil {
16+
loop.RunOnLoop(func(runtime *goja.Runtime) {
17+
_ = reject(runtime.ToValue(err))
18+
})
19+
}
20+
select {
21+
case data := <-subscribe:
22+
loop.RunOnLoop(func(runtime *goja.Runtime) {
23+
_ = resolve(runtime.ToValue(data))
24+
})
25+
case <-ctx.Done():
26+
loop.RunOnLoop(func(runtime *goja.Runtime) {
27+
_ = reject(runtime.ToValue(ctx.Err()))
28+
})
29+
}
30+
}()
31+
return promise
3832
},
3933
"put": func(key, value string) error {
4034
return ctx.Event.Publish(ctx, key, value)

pkg/filters/goja/var_websocket.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package goja
22

33
import (
4-
"context"
54
"io"
65
"net/http"
76
"time"
@@ -14,15 +13,14 @@ import (
1413
"gopkg.d7z.net/gitea-pages/pkg/core"
1514
)
1615

17-
func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop, cancelFunc context.CancelFunc) (io.Closer, error) {
16+
func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.ResponseWriter, request *http.Request, loop *eventloop.EventLoop) (io.Closer, error) {
1817
closers := NewClosers()
1918
return closers, jsCtx.GlobalObject().Set("websocket", func() (any, error) {
2019
upgrader := websocket.Upgrader{}
2120
conn, err := upgrader.Upgrade(w, request, nil)
2221
if err != nil {
2322
return nil, err
2423
}
25-
cancelFunc()
2624
go func() {
2725
ticker := time.NewTicker(15 * time.Second)
2826
defer ticker.Stop()
@@ -63,32 +61,35 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
6361
},
6462
"TypeTextMessage": websocket.TextMessage,
6563
"TypeBinaryMessage": websocket.BinaryMessage,
66-
"readText": func() goja.Value {
64+
"readText": func() *goja.Promise {
6765
promise, resolve, reject := jsCtx.NewPromise()
6866
go func() {
6967
select {
7068
case <-ctx.Done():
69+
loop.RunOnLoop(func(runtime *goja.Runtime) {
70+
_ = reject(runtime.ToValue(ctx.Err()))
71+
})
7172
return
7273
default:
7374
}
7475
defer func() {
7576
if r := recover(); r != nil {
7677
zap.L().Debug("websocket panic", zap.Any("panic", r))
77-
loop.Run(func(runtime *goja.Runtime) {
78+
loop.RunOnLoop(func(runtime *goja.Runtime) {
7879
_ = reject(runtime.ToValue(r))
7980
})
8081
}
8182
}()
8283
_, p, err := conn.ReadMessage()
83-
loop.Run(func(runtime *goja.Runtime) {
84+
loop.RunOnLoop(func(runtime *goja.Runtime) {
8485
if err != nil {
8586
_ = reject(runtime.ToValue(err))
8687
} else {
8788
_ = resolve(runtime.ToValue(string(p)))
8889
}
8990
})
9091
}()
91-
return promise.Result()
92+
return promise
9293
},
9394
"read": func() (any, error) {
9495
messageType, p, err := conn.ReadMessage()

0 commit comments

Comments
 (0)