Skip to content

Commit b7c39f7

Browse files
committed
feat: Port to new Dudirekta version
1 parent 421d1e1 commit b7c39f7

File tree

7 files changed

+147
-36
lines changed

7 files changed

+147
-36
lines changed

cmd/r3map-benchmark-direct-mount/main.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"crypto/rand"
7+
"encoding/json"
78
"errors"
89
"flag"
910
"fmt"
@@ -138,14 +139,12 @@ func main() {
138139

139140
case backendTypeDudirekta:
140141
ready := make(chan struct{})
141-
registry := rpc.NewRegistry(
142-
&struct{}{},
143-
services.BackendRemote{},
142+
registry := rpc.NewRegistry[services.BackendRemote, json.RawMessage](
143+
struct{}{},
144144

145145
time.Second*10,
146146
ctx,
147147
&rpc.Options{
148-
ResponseBufferLen: rpc.DefaultResponseBufferLen,
149148
OnClientConnect: func(remoteID string) {
150149
ready <- struct{}{}
151150
},
@@ -159,7 +158,29 @@ func main() {
159158
defer conn.Close()
160159

161160
go func() {
162-
if err := registry.Link(conn); err != nil {
161+
encoder := json.NewEncoder(conn)
162+
decoder := json.NewDecoder(conn)
163+
164+
if err := registry.LinkStream(
165+
func(v rpc.Message[json.RawMessage]) error {
166+
return encoder.Encode(v)
167+
},
168+
func(v *rpc.Message[json.RawMessage]) error {
169+
return decoder.Decode(v)
170+
},
171+
172+
func(v any) (json.RawMessage, error) {
173+
b, err := json.Marshal(v)
174+
if err != nil {
175+
return nil, err
176+
}
177+
178+
return json.RawMessage(b), nil
179+
},
180+
func(data json.RawMessage, v any) error {
181+
return json.Unmarshal([]byte(data), v)
182+
},
183+
); err != nil {
163184
if !utils.IsClosedErr(err) {
164185
panic(err)
165186
}
@@ -169,11 +190,12 @@ func main() {
169190
<-ready
170191

171192
var peer *services.BackendRemote
172-
for _, candidate := range registry.Peers() {
173-
peer = &candidate
174193

175-
break
176-
}
194+
_ = registry.ForRemotes(func(remoteID string, remote services.BackendRemote) error {
195+
peer = &remote
196+
197+
return nil
198+
})
177199

178200
if peer == nil {
179201
panic(errNoPeerFound)

cmd/r3map-benchmark-managed-mount/main.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"crypto/rand"
7+
"encoding/json"
78
"errors"
89
"flag"
910
"fmt"
@@ -162,14 +163,12 @@ func main() {
162163

163164
case backendTypeDudirekta:
164165
ready := make(chan struct{})
165-
registry := rpc.NewRegistry(
166-
&struct{}{},
167-
services.BackendRemote{},
166+
registry := rpc.NewRegistry[services.BackendRemote, json.RawMessage](
167+
struct{}{},
168168

169169
time.Second*10,
170170
ctx,
171171
&rpc.Options{
172-
ResponseBufferLen: rpc.DefaultResponseBufferLen,
173172
OnClientConnect: func(remoteID string) {
174173
ready <- struct{}{}
175174
},
@@ -183,7 +182,29 @@ func main() {
183182
defer conn.Close()
184183

185184
go func() {
186-
if err := registry.Link(conn); err != nil {
185+
encoder := json.NewEncoder(conn)
186+
decoder := json.NewDecoder(conn)
187+
188+
if err := registry.LinkStream(
189+
func(v rpc.Message[json.RawMessage]) error {
190+
return encoder.Encode(v)
191+
},
192+
func(v *rpc.Message[json.RawMessage]) error {
193+
return decoder.Decode(v)
194+
},
195+
196+
func(v any) (json.RawMessage, error) {
197+
b, err := json.Marshal(v)
198+
if err != nil {
199+
return nil, err
200+
}
201+
202+
return json.RawMessage(b), nil
203+
},
204+
func(data json.RawMessage, v any) error {
205+
return json.Unmarshal([]byte(data), v)
206+
},
207+
); err != nil {
187208
if !utils.IsClosedErr(err) {
188209
panic(err)
189210
}
@@ -193,11 +214,12 @@ func main() {
193214
<-ready
194215

195216
var peer *services.BackendRemote
196-
for _, candidate := range registry.Peers() {
197-
peer = &candidate
198217

199-
break
200-
}
218+
_ = registry.ForRemotes(func(remoteID string, remote services.BackendRemote) error {
219+
peer = &remote
220+
221+
return nil
222+
})
201223

202224
if peer == nil {
203225
panic(errNoPeerFound)

cmd/r3map-benchmark-migration-server/main.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"context"
66
"crypto/rand"
7+
"encoding/json"
78
"errors"
89
"flag"
910
"fmt"
@@ -258,14 +259,12 @@ func main() {
258259
}()
259260
} else {
260261
clients := 0
261-
registry := rpc.NewRegistry(
262+
registry := rpc.NewRegistry[struct{}, json.RawMessage](
262263
svc,
263-
struct{}{},
264264

265265
time.Second*10,
266266
ctx,
267267
&rpc.Options{
268-
ResponseBufferLen: rpc.DefaultResponseBufferLen,
269268
OnClientConnect: func(remoteID string) {
270269
clients++
271270

@@ -309,7 +308,29 @@ func main() {
309308
}
310309
}()
311310

312-
if err := registry.Link(conn); err != nil {
311+
encoder := json.NewEncoder(conn)
312+
decoder := json.NewDecoder(conn)
313+
314+
if err := registry.LinkStream(
315+
func(v rpc.Message[json.RawMessage]) error {
316+
return encoder.Encode(v)
317+
},
318+
func(v *rpc.Message[json.RawMessage]) error {
319+
return decoder.Decode(v)
320+
},
321+
322+
func(v any) (json.RawMessage, error) {
323+
b, err := json.Marshal(v)
324+
if err != nil {
325+
return nil, err
326+
}
327+
328+
return json.RawMessage(b), nil
329+
},
330+
func(data json.RawMessage, v any) error {
331+
return json.Unmarshal([]byte(data), v)
332+
},
333+
); err != nil {
313334
panic(err)
314335
}
315336
}()

cmd/r3map-benchmark-migration/main.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"bytes"
66
"context"
77
"crypto/rand"
8+
"encoding/json"
89
"errors"
910
"flag"
1011
"fmt"
@@ -216,22 +217,42 @@ func main() {
216217
defer conn.Close()
217218

218219
ready := make(chan struct{})
219-
registry := rpc.NewRegistry(
220+
registry := rpc.NewRegistry[services.SeederRemote, json.RawMessage](
220221
&struct{}{},
221-
services.SeederRemote{},
222222

223223
time.Second*10,
224224
ctx,
225225
&rpc.Options{
226-
ResponseBufferLen: rpc.DefaultResponseBufferLen,
227226
OnClientConnect: func(remoteID string) {
228227
ready <- struct{}{}
229228
},
230229
},
231230
)
232231

233232
go func() {
234-
if err := registry.Link(conn); err != nil {
233+
encoder := json.NewEncoder(conn)
234+
decoder := json.NewDecoder(conn)
235+
236+
if err := registry.LinkStream(
237+
func(v rpc.Message[json.RawMessage]) error {
238+
return encoder.Encode(v)
239+
},
240+
func(v *rpc.Message[json.RawMessage]) error {
241+
return decoder.Decode(v)
242+
},
243+
244+
func(v any) (json.RawMessage, error) {
245+
b, err := json.Marshal(v)
246+
if err != nil {
247+
return nil, err
248+
}
249+
250+
return json.RawMessage(b), nil
251+
},
252+
func(data json.RawMessage, v any) error {
253+
return json.Unmarshal([]byte(data), v)
254+
},
255+
); err != nil {
235256
if !utils.IsClosedErr(err) {
236257
seederErrs <- err
237258

@@ -244,11 +265,11 @@ func main() {
244265

245266
<-ready
246267

247-
for _, candidate := range registry.Peers() {
248-
peer = &candidate
268+
_ = registry.ForRemotes(func(remoteID string, remote services.SeederRemote) error {
269+
peer = &remote
249270

250-
break
251-
}
271+
return nil
272+
})
252273

253274
if peer == nil {
254275
panic(errNoPeerFound)

cmd/r3map-benchmark-mount-server/main.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"flag"
78
"fmt"
@@ -145,14 +146,12 @@ func main() {
145146
}()
146147
} else {
147148
clients := 0
148-
registry := rpc.NewRegistry(
149+
registry := rpc.NewRegistry[struct{}, json.RawMessage](
149150
svc,
150-
struct{}{},
151151

152152
time.Second*10,
153153
ctx,
154154
&rpc.Options{
155-
ResponseBufferLen: rpc.DefaultResponseBufferLen,
156155
OnClientConnect: func(remoteID string) {
157156
clients++
158157

@@ -196,7 +195,29 @@ func main() {
196195
}
197196
}()
198197

199-
if err := registry.Link(conn); err != nil {
198+
encoder := json.NewEncoder(conn)
199+
decoder := json.NewDecoder(conn)
200+
201+
if err := registry.LinkStream(
202+
func(v rpc.Message[json.RawMessage]) error {
203+
return encoder.Encode(v)
204+
},
205+
func(v *rpc.Message[json.RawMessage]) error {
206+
return decoder.Decode(v)
207+
},
208+
209+
func(v any) (json.RawMessage, error) {
210+
b, err := json.Marshal(v)
211+
if err != nil {
212+
return nil, err
213+
}
214+
215+
return json.RawMessage(b), nil
216+
},
217+
func(data json.RawMessage, v any) error {
218+
return json.Unmarshal([]byte(data), v)
219+
},
220+
); err != nil {
200221
panic(err)
201222
}
202223
}()

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ require (
99
github.com/loopholelabs/frisbee-go v0.7.1
1010
github.com/loopholelabs/polyglot-go v0.5.1
1111
github.com/minio/minio-go v6.0.14+incompatible
12-
github.com/pojntfx/dudirekta v0.5.1
12+
github.com/pojntfx/dudirekta v0.6.0
1313
github.com/pojntfx/go-nbd v0.3.2
14-
github.com/redis/go-redis/v9 v9.0.5
14+
github.com/redis/go-redis/v9 v9.2.1
1515
github.com/rs/zerolog v1.30.0
1616
github.com/schollz/progressbar/v3 v3.13.1
1717
google.golang.org/grpc v1.57.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
7272
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
7373
github.com/pojntfx/dudirekta v0.5.1 h1:omrLk+lFJTQd6F/FGhW/tXEJF6Q/Zb4x++6zgWu8Ahw=
7474
github.com/pojntfx/dudirekta v0.5.1/go.mod h1:2G79XDOe1c3Nz3G+LQfiNZ5K/SS3b2TP1K9JyRt8woI=
75+
github.com/pojntfx/dudirekta v0.6.0 h1:bNO5jrHDCSiVf+YCjU6GiEyY7S9/KtgzsGC0KruobUc=
76+
github.com/pojntfx/dudirekta v0.6.0/go.mod h1:zNquHCePC+378rWnJ0SWs1p9qdZdrJ4PFGNyVdQWD1U=
7577
github.com/pojntfx/go-nbd v0.3.2 h1:qI6S4qsHD87V9fTH6jiS4DIqq/rWmI0El0xSToMUDeg=
7678
github.com/pojntfx/go-nbd v0.3.2/go.mod h1:SehHnbi2e8NiSAKby42Itm8SIoS7b+wAprsfPH3qgYk=
7779
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
7880
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
81+
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
82+
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
7983
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
8084
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
8185
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=

0 commit comments

Comments
 (0)