Skip to content

Commit 827f9bd

Browse files
重构 websocket
1 parent 35df220 commit 827f9bd

File tree

10 files changed

+161
-84
lines changed

10 files changed

+161
-84
lines changed

cmd/local/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func main() {
7878

7979
type nopCache struct{}
8080

81-
func (n *nopCache) Child(_ string) cache.Cache {
81+
func (n *nopCache) Child(_ ...string) cache.Cache {
8282
return n
8383
}
8484

examples/js_ws/index.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
(async ()=>{
22
let ws = websocket();
3-
let shouldExit = false;
4-
while (!shouldExit) {
3+
while (true) {
54
let data = await ws.readText();
65
switch (data) {
76
case "exit":
8-
shouldExit = true;
9-
break;
7+
return
108
case "panic":
119
throw Error("错误");
1210
case "date":
13-
ws.writeText(new Date().toJSON())
11+
await ws.writeText(new Date().toJSON())
1412
break
1513
default:
16-
ws.writeText("收到信息:" + data)
14+
await ws.writeText("收到信息:" + data)
1715
break;
1816
}
1917
}

examples/js_ws_event/event.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ const ws = websocket();
88

99
async function eventPull() {
1010
while (true) {
11-
const data = await event.pull('messages')
12-
ws.writeText(data);
11+
const data = await event.load('messages')
12+
await ws.writeText(data);
1313
}
1414
}
1515

@@ -27,5 +27,5 @@ async function messagePull() {
2727
}
2828

2929
(async () => {
30-
await Promise.all(eventPull(), messagePull())
30+
await Promise.all([eventPull(), messagePull()])
3131
})()

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ require (
1414
github.com/hashicorp/golang-lru/v2 v2.0.7
1515
github.com/pkg/errors v0.9.1
1616
github.com/stretchr/testify v1.11.1
17-
go.uber.org/zap v1.27.0
18-
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495
17+
go.uber.org/zap v1.27.1
18+
gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb
1919
gopkg.in/yaml.v3 v3.0.1
2020
)
2121

@@ -53,7 +53,7 @@ require (
5353
go.etcd.io/etcd/client/pkg/v3 v3.6.6 // indirect
5454
go.etcd.io/etcd/client/v3 v3.6.6 // indirect
5555
go.uber.org/multierr v1.11.0 // indirect
56-
golang.org/x/crypto v0.44.0 // indirect
56+
golang.org/x/crypto v0.45.0 // indirect
5757
golang.org/x/net v0.47.0 // indirect
5858
golang.org/x/sys v0.38.0 // indirect
5959
golang.org/x/text v0.31.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,16 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
142142
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
143143
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
144144
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
145+
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
146+
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
145147
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
146148
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
147149
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
148150
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
149151
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
150152
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
153+
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
154+
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
151155
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
152156
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
153157
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -197,6 +201,8 @@ gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32 h1:3JvqnWFLWzAoS57vL
197201
gopkg.d7z.net/middleware v0.0.0-20251114145539-bb74bd940f32/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
198202
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495 h1:LvjpmL0nkZZtrUXCFZGyoh8O2X9l2B7ZXFldOzN8ShI=
199203
gopkg.d7z.net/middleware v0.0.0-20251119134829-0c55a98e6495/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
204+
gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb h1:2+IskB2qGQshl67tHdnzEXCm46+9E/QevYL3xpMul0E=
205+
gopkg.d7z.net/middleware v0.0.0-20251120123709-5d4e16e0d6fb/go.mod h1:/1/EuissKhUbuhUe01rcWuwpA5mt7jASb4uKVNOLtR8=
200206
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
201207
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
202208
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

pkg/core/alias.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package core
22

33
import (
44
"context"
5+
"encoding/base64"
56
"encoding/json"
67
"fmt"
78

@@ -36,7 +37,7 @@ func (a *DomainAlias) Query(ctx context.Context, domain string) (*Alias, error)
3637

3738
func (a *DomainAlias) Bind(ctx context.Context, domains []string, owner, repo, branch string) error {
3839
oldDomains := make([]string, 0)
39-
rKey := fmt.Sprintf("%s/%s/%s", owner, repo, branch)
40+
rKey := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s/%s/%s", owner, repo, branch)))
4041
if oldStr, err := a.config.Get(ctx, rKey); err == nil {
4142
_ = json.Unmarshal([]byte(oldStr), &oldDomains)
4243
}

pkg/filters/goja/goja.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package goja
22

33
import (
4-
"encoding/json"
54
"errors"
6-
"fmt"
75
"io"
86
"net/http"
97
"path/filepath"
@@ -15,7 +13,6 @@ import (
1513
"github.com/dop251/goja_nodejs/eventloop"
1614
"github.com/dop251/goja_nodejs/require"
1715
"github.com/dop251/goja_nodejs/url"
18-
"go.uber.org/zap"
1916
"gopkg.d7z.net/gitea-pages/pkg/core"
2017
)
2118

@@ -109,24 +106,27 @@ func FilterInstGoJa(gl core.Params) (core.FilterInstance, error) {
109106
go func() {
110107
for {
111108
switch promise.State() {
112-
case goja.PromiseStateFulfilled, goja.PromiseStateRejected:
113-
result := promise.Result().Export()
114-
switch data := result.(type) {
109+
case goja.PromiseStateFulfilled:
110+
stop <- nil
111+
return
112+
case goja.PromiseStateRejected:
113+
switch data := promise.Result().Export().(type) {
115114
case error:
116115
stop <- data
117116
default:
118-
marshal, _ := json.Marshal(result)
119-
zap.L().Debug(fmt.Sprintf("js promise result %s", string(marshal)),
120-
zap.Any("result", promise.Result().ExportType()))
121-
stop <- nil
117+
stop <- errors.New(promise.Result().String())
122118
}
123119
return
124120
default:
125121
time.Sleep(time.Millisecond * 5)
126122
}
127123
}
128124
}()
125+
} else {
126+
stop <- nil
129127
}
128+
} else {
129+
stop <- nil
130130
}
131131
})
132132
resultErr := <-stop

pkg/filters/goja/var_event.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,19 @@ func EventInject(ctx core.FilterContext, jsCtx *goja.Runtime, loop *eventloop.Ev
3030
}()
3131
return promise
3232
},
33-
"put": func(key, value string) error {
34-
return ctx.Event.Publish(ctx, key, value)
33+
"put": func(key, value string) *goja.Promise {
34+
promise, resolve, reject := jsCtx.NewPromise()
35+
go func() {
36+
err := ctx.Event.Publish(ctx, key, value)
37+
loop.RunOnLoop(func(runtime *goja.Runtime) {
38+
if err != nil {
39+
_ = reject(runtime.ToValue(err))
40+
} else {
41+
_ = resolve(goja.Undefined())
42+
}
43+
})
44+
}()
45+
return promise
3546
},
3647
})
3748
}

pkg/filters/goja/var_websocket.go

Lines changed: 109 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
2121
if err != nil {
2222
return nil, err
2323
}
24+
zap.L().Debug("websocket upgrader created")
25+
closers.AddCloser(conn.Close)
2426
go func() {
2527
ticker := time.NewTicker(15 * time.Second)
2628
defer ticker.Stop()
@@ -37,28 +39,7 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
3739
}
3840
}
3941
}()
40-
zap.L().Debug("websocket upgrader created")
41-
closers.AddCloser(conn.Close)
4242
return map[string]interface{}{
43-
"on": func(f func(mType int, message string)) {
44-
go func() {
45-
z:
46-
for {
47-
select {
48-
case <-ctx.Done():
49-
break z
50-
default:
51-
messageType, p, err := conn.ReadMessage()
52-
if err != nil {
53-
break z
54-
}
55-
f(messageType, string(p))
56-
}
57-
58-
}
59-
60-
}()
61-
},
6243
"TypeTextMessage": websocket.TextMessage,
6344
"TypeBinaryMessage": websocket.BinaryMessage,
6445
"readText": func() *goja.Promise {
@@ -91,36 +72,115 @@ func WebsocketInject(ctx core.FilterContext, jsCtx *goja.Runtime, w http.Respons
9172
}()
9273
return promise
9374
},
94-
"read": func() (any, error) {
95-
messageType, p, err := conn.ReadMessage()
96-
if err != nil {
97-
return nil, err
98-
}
99-
return map[string]interface{}{
100-
"type": messageType,
101-
"data": p,
102-
}, nil
103-
},
104-
"writeText": func(data string) error {
105-
return conn.WriteMessage(websocket.TextMessage, []byte(data))
75+
"read": func() *goja.Promise {
76+
promise, resolve, reject := jsCtx.NewPromise()
77+
go func() {
78+
select {
79+
case <-ctx.Done():
80+
loop.RunOnLoop(func(runtime *goja.Runtime) {
81+
_ = reject(runtime.ToValue(ctx.Err()))
82+
})
83+
return
84+
default:
85+
}
86+
defer func() {
87+
if r := recover(); r != nil {
88+
zap.L().Debug("websocket panic", zap.Any("panic", r))
89+
loop.RunOnLoop(func(runtime *goja.Runtime) {
90+
_ = reject(runtime.ToValue(r))
91+
})
92+
}
93+
}()
94+
messageType, p, err := conn.ReadMessage()
95+
loop.RunOnLoop(func(runtime *goja.Runtime) {
96+
if err != nil {
97+
_ = reject(runtime.ToValue(err))
98+
} else {
99+
_ = resolve(runtime.ToValue(map[string]interface{}{
100+
"type": messageType,
101+
"data": p,
102+
}))
103+
}
104+
})
105+
}()
106+
return promise
106107
},
107-
"write": func(mType int, data any) error {
108-
if item, ok := data.(goja.Value); ok {
109-
data = item.Export()
110-
}
111-
var dataRaw []byte
112-
switch it := data.(type) {
113-
case []byte:
114-
dataRaw = it
115-
case string:
116-
dataRaw = []byte(it)
117-
default:
118-
return errors.Errorf("invalid type for websocket text: %T", data)
119-
}
120-
return conn.WriteMessage(mType, dataRaw)
108+
"writeText": func(data string) *goja.Promise {
109+
promise, resolve, reject := jsCtx.NewPromise()
110+
go func() {
111+
select {
112+
case <-ctx.Done():
113+
loop.RunOnLoop(func(runtime *goja.Runtime) {
114+
_ = reject(runtime.ToValue(ctx.Err()))
115+
})
116+
return
117+
default:
118+
}
119+
defer func() {
120+
if r := recover(); r != nil {
121+
zap.L().Debug("websocket panic", zap.Any("panic", r))
122+
loop.RunOnLoop(func(runtime *goja.Runtime) {
123+
_ = reject(runtime.ToValue(r))
124+
})
125+
}
126+
}()
127+
err := conn.WriteMessage(websocket.TextMessage, []byte(data))
128+
loop.RunOnLoop(func(runtime *goja.Runtime) {
129+
if err != nil {
130+
_ = reject(runtime.ToValue(err))
131+
} else {
132+
_ = resolve(runtime.ToValue(nil))
133+
}
134+
})
135+
}()
136+
return promise
121137
},
122-
"ping": func() error {
123-
return conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(1*time.Second))
138+
"write": func(mType int, data any) *goja.Promise {
139+
promise, resolve, reject := jsCtx.NewPromise()
140+
go func() {
141+
select {
142+
case <-ctx.Done():
143+
loop.RunOnLoop(func(runtime *goja.Runtime) {
144+
_ = reject(runtime.ToValue(ctx.Err()))
145+
})
146+
return
147+
default:
148+
}
149+
defer func() {
150+
if r := recover(); r != nil {
151+
zap.L().Debug("websocket panic", zap.Any("panic", r))
152+
loop.RunOnLoop(func(runtime *goja.Runtime) {
153+
_ = reject(runtime.ToValue(r))
154+
})
155+
}
156+
}()
157+
158+
if item, ok := data.(goja.Value); ok {
159+
data = item.Export()
160+
}
161+
var dataRaw []byte
162+
switch it := data.(type) {
163+
case []byte:
164+
dataRaw = it
165+
case string:
166+
dataRaw = []byte(it)
167+
default:
168+
loop.RunOnLoop(func(runtime *goja.Runtime) {
169+
_ = reject(runtime.ToValue(errors.Errorf("invalid type for websocket text: %T", data)))
170+
})
171+
return
172+
}
173+
174+
err := conn.WriteMessage(mType, dataRaw)
175+
loop.RunOnLoop(func(runtime *goja.Runtime) {
176+
if err != nil {
177+
_ = reject(runtime.ToValue(err))
178+
} else {
179+
_ = resolve(goja.Undefined())
180+
}
181+
})
182+
}()
183+
return promise
124184
},
125185
}, nil
126186
})

0 commit comments

Comments
 (0)