Skip to content

Commit e45c6e4

Browse files
committed
fix: EstablishLink yields a link.MountedLink
Fixes #276 Signed-off-by: Christian Stewart <christian@aperture.us>
1 parent 9db9713 commit e45c6e4

File tree

2 files changed

+149
-2
lines changed

2 files changed

+149
-2
lines changed

link/hold-open/establish_link.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newEstablishLinkHandler(
4747

4848
// HandleValueAdded is called when a value is added to the directive.
4949
func (e *establishLinkHandler) HandleValueAdded(inst directive.Instance, val directive.AttachedValue) {
50-
vl, ok := val.GetValue().(link.Link)
50+
vl, ok := val.GetValue().(link.MountedLink)
5151
if !ok || vl == nil {
5252
return
5353
}
@@ -58,7 +58,7 @@ func (e *establishLinkHandler) HandleValueAdded(inst directive.Instance, val dir
5858

5959
if nrr {
6060
e.le.
61-
WithField("link-uuid", vl.GetUUID()).
61+
WithField("link-uuid", vl.GetLinkUUID()).
6262
WithField("local-peer", vl.GetLocalPeer().String()).
6363
Debug("starting peer hold-open tracking")
6464
go func() {

link/hold-open/hold_open_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package link_holdopen_controller
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/aperturerobotics/bifrost/link"
8+
"github.com/aperturerobotics/bifrost/peer"
9+
"github.com/aperturerobotics/bifrost/stream"
10+
stream_echo "github.com/aperturerobotics/bifrost/stream/echo"
11+
"github.com/aperturerobotics/bifrost/testbed"
12+
"github.com/aperturerobotics/bifrost/transport/common/dialer"
13+
transport_controller "github.com/aperturerobotics/bifrost/transport/controller"
14+
"github.com/aperturerobotics/bifrost/transport/inproc"
15+
"github.com/aperturerobotics/controllerbus/bus"
16+
"github.com/aperturerobotics/controllerbus/controller/loader"
17+
"github.com/aperturerobotics/controllerbus/controller/resolver"
18+
"github.com/aperturerobotics/controllerbus/directive"
19+
"github.com/sirupsen/logrus"
20+
)
21+
22+
func buildTestbed(t *testing.T, ctx context.Context) (*testbed.Testbed, *logrus.Entry) {
23+
log := logrus.New()
24+
log.SetLevel(logrus.DebugLevel)
25+
le := logrus.NewEntry(log)
26+
27+
tb, err := testbed.NewTestbed(ctx, le, testbed.TestbedOpts{})
28+
if err != nil {
29+
t.Fatal(err.Error())
30+
}
31+
tb.StaticResolver.AddFactory(inproc.NewFactory(tb.Bus))
32+
tb.StaticResolver.AddFactory(NewFactory(tb.Bus))
33+
return tb, le
34+
}
35+
36+
func execPeer(ctx context.Context, t *testing.T, tb *testbed.Testbed, conf *inproc.Config) (
37+
*transport_controller.Controller,
38+
*inproc.Inproc,
39+
directive.Reference,
40+
) {
41+
peerId, err := peer.IDFromPrivateKey(tb.PrivKey)
42+
if err != nil {
43+
t.Fatal(err.Error())
44+
}
45+
46+
if conf == nil {
47+
conf = &inproc.Config{}
48+
}
49+
conf.TransportPeerId = peerId.String()
50+
51+
tpci1, _, tp1Ref, err := loader.WaitExecControllerRunning(
52+
ctx,
53+
tb.Bus,
54+
resolver.NewLoadControllerWithConfig(conf),
55+
nil,
56+
)
57+
if err != nil {
58+
t.Fatal(err.Error())
59+
}
60+
tpc1 := tpci1.(*transport_controller.Controller)
61+
tpt1, err := tpc1.GetTransport(ctx)
62+
if err != nil {
63+
t.Fatal(err.Error())
64+
}
65+
return tpc1, tpt1.(*inproc.Inproc), tp1Ref
66+
}
67+
68+
// TestHoldOpenWithMountedLink tests that the hold-open controller correctly
69+
// handles MountedLink values from EstablishLinkWithPeer.
70+
//
71+
// This verifies the fix for issue #276 where the type assertion for link.Link
72+
// failed because EstablishLinkWithPeerValue was changed to MountedLink.
73+
func TestHoldOpenWithMountedLink(t *testing.T) {
74+
ctx, ctxCancel := context.WithCancel(context.Background())
75+
defer ctxCancel()
76+
77+
tb1, le1 := buildTestbed(t, ctx)
78+
le1 = le1.WithField("testbed", 0)
79+
tb2, le2 := buildTestbed(t, ctx)
80+
le2 = le2.WithField("testbed", 1)
81+
82+
_, tp1, tp1Ref := execPeer(ctx, t, tb1, nil)
83+
peerId1 := tp1.GetPeerID()
84+
defer tp1Ref.Release()
85+
86+
_, tp2, tp2Ref := execPeer(ctx, t, tb2, &inproc.Config{
87+
Dialers: map[string]*dialer.DialerOpts{
88+
peerId1.String(): {
89+
Address: tp1.LocalAddr().String(),
90+
},
91+
},
92+
})
93+
peerId2 := tp2.GetPeerID()
94+
defer tp2Ref.Release()
95+
96+
le1.Infof("constructed peer 1 with id %s", peerId1.String())
97+
le2.Infof("constructed peer 2 with id %s", peerId2.String())
98+
99+
tp2.ConnectToInproc(ctx, tp1)
100+
tp1.ConnectToInproc(ctx, tp2)
101+
102+
// Start the hold-open controller on tb2
103+
_, _, holdOpenRef, err := bus.ExecOneOff(
104+
ctx,
105+
tb2.Bus,
106+
resolver.NewLoadControllerWithConfig(&Config{}),
107+
nil,
108+
nil,
109+
)
110+
if err != nil {
111+
t.Fatal(err.Error())
112+
}
113+
defer holdOpenRef.Release()
114+
115+
// Establish a link - the hold-open controller should handle the MountedLink
116+
lnk, lnkRel, err := link.EstablishLinkWithPeerEx(ctx, tb2.Bus, "", peerId1, false)
117+
if err != nil {
118+
t.Fatal(err.Error())
119+
}
120+
defer lnkRel()
121+
122+
le2.Infof("opened link from 2 -> 1 with uuid %v", lnk.GetLinkUUID())
123+
124+
// Verify the link works by using the echo stream
125+
ms, err := lnk.OpenMountedStream(ctx, stream_echo.DefaultProtocolID, stream.OpenOpts{})
126+
if err != nil {
127+
t.Fatal(err.Error())
128+
}
129+
defer ms.GetStream().Close()
130+
131+
data := []byte("hold-open test")
132+
_, err = ms.GetStream().Write(data)
133+
if err != nil {
134+
t.Fatal(err.Error())
135+
}
136+
137+
outData := make([]byte, len(data)*2)
138+
n, err := ms.GetStream().Read(outData)
139+
if err != nil {
140+
t.Fatal(err.Error())
141+
}
142+
if n != len(data) {
143+
t.Fatalf("expected %d bytes, got %d", len(data), n)
144+
}
145+
146+
le2.Infof("echoed data successfully: %s", string(outData[:n]))
147+
}

0 commit comments

Comments
 (0)