Skip to content

Commit 9ef2d51

Browse files
authored
Zmq Auth Regression test (#171)
* add tests to validate go zmq pubsub authentication behavior * adding a minimal floating change * linter fixes
1 parent 92166f1 commit 9ef2d51

File tree

4 files changed

+355
-18
lines changed

4 files changed

+355
-18
lines changed

pkg/regprocessor/auth_test.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package regprocessor
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
zmq "github.com/pebbe/zmq4"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/refraction-networking/conjure/application/transports/wrapping/min"
15+
pb "github.com/refraction-networking/gotapdance/protobuf"
16+
)
17+
18+
// TODO: Add monitor to RegProcessor and metrics / logging for connections
19+
// that get past the firewall but fail to connect for any reason.
20+
21+
// Mock Socket Configuration of stations connecting to the central ZMQ pubsub
22+
type socketConfig struct {
23+
address string
24+
authenticationType string
25+
centralPublicKey string
26+
stationPrivkeyZ85 string
27+
}
28+
29+
// NOTE: DO NOT DISABLE THIS TEST
30+
//
31+
// If this test is failing, go back and check the changes that you made. Proper authentication for
32+
// the ZMQ sockets is crucial - ensure that this test is passing. If the zmq library updates or the
33+
// RegProcessor interface changes update the test accordingly. In this test the client is the
34+
// stations and the server is the central registration API.
35+
func TestZMQAuth(t *testing.T) {
36+
nMessages := 10
37+
zmqBindAddr := "127.0.0.1"
38+
var zmqPort uint16 = 39_000
39+
serverAddr := "tcp://" + net.JoinHostPort(zmqBindAddr, fmt.Sprint(zmqPort))
40+
41+
serverPubkeyZ85, serverPrivkeyZ85, err := zmq.NewCurveKeypair()
42+
require.Nil(t, err)
43+
44+
clientPubkeyZ85, clientPrivkeyZ85, err := zmq.NewCurveKeypair()
45+
require.Nil(t, err)
46+
otherPubkeyZ85, otherPrivkeyZ85, err := zmq.NewCurveKeypair()
47+
require.Nil(t, err)
48+
49+
stationPublicKeys := []string{clientPubkeyZ85}
50+
51+
done := make(chan struct{})
52+
next := make(chan struct{})
53+
ready := make(chan struct{})
54+
exit := make(chan struct{})
55+
56+
connectSockets := []struct {
57+
s socketConfig
58+
err error
59+
c int
60+
}{
61+
{ // correct central server pubkey and registered station key pair. Should work.
62+
s: socketConfig{
63+
address: serverAddr,
64+
authenticationType: "CURVE",
65+
centralPublicKey: serverPubkeyZ85,
66+
stationPrivkeyZ85: clientPrivkeyZ85,
67+
},
68+
err: nil,
69+
c: nMessages,
70+
},
71+
{ // correct central server pubkey, but non-registered station key pair
72+
s: socketConfig{
73+
address: serverAddr,
74+
authenticationType: "CURVE",
75+
centralPublicKey: serverPubkeyZ85,
76+
stationPrivkeyZ85: otherPrivkeyZ85,
77+
},
78+
err: nil,
79+
c: 0,
80+
},
81+
{ // missing central server pubkey and properly registered station key pair
82+
s: socketConfig{
83+
address: serverAddr,
84+
authenticationType: "CURVE",
85+
centralPublicKey: "",
86+
stationPrivkeyZ85: clientPrivkeyZ85,
87+
},
88+
err: ErrZmqFault, //
89+
c: 0,
90+
},
91+
{ // incorrect central server pubkey and properly registered station public key, using CURVE
92+
s: socketConfig{
93+
address: serverAddr,
94+
authenticationType: "CURVE",
95+
centralPublicKey: otherPubkeyZ85,
96+
stationPrivkeyZ85: clientPrivkeyZ85,
97+
},
98+
err: nil,
99+
c: 0,
100+
},
101+
{ // incorrect central server pubkey and non-registered station public key, using CURVE
102+
s: socketConfig{
103+
address: serverAddr,
104+
authenticationType: "CURVE",
105+
centralPublicKey: otherPubkeyZ85,
106+
stationPrivkeyZ85: otherPrivkeyZ85,
107+
},
108+
err: nil,
109+
c: 0,
110+
},
111+
{ // missing central server pubkey and properly registered station public key, NULL
112+
s: socketConfig{
113+
address: serverAddr,
114+
authenticationType: "NULL",
115+
centralPublicKey: "",
116+
stationPrivkeyZ85: clientPrivkeyZ85,
117+
},
118+
err: nil,
119+
c: 0,
120+
},
121+
{ // incorrect central server pubkey and non-registered station public key, using NULL
122+
s: socketConfig{
123+
address: serverAddr,
124+
authenticationType: "NULL",
125+
centralPublicKey: "",
126+
stationPrivkeyZ85: otherPrivkeyZ85,
127+
},
128+
err: nil,
129+
c: 0,
130+
},
131+
}
132+
133+
// Run the RegProcessor as a thread listening on localhost. Sleep for one second then send
134+
// messages that we expect the station to hear. in production this will be new registrations,
135+
// here we don't care about the message contents.
136+
go func() {
137+
regProcessor, err := newRegProcessor(zmqBindAddr, zmqPort, serverPrivkeyZ85, true, stationPublicKeys)
138+
require.Nil(t, err)
139+
defer regProcessor.Close()
140+
errStation := regProcessor.AddTransport(pb.TransportType_Min, min.Transport{})
141+
if errStation != nil {
142+
t.Failed()
143+
done <- struct{}{}
144+
exit <- struct{}{}
145+
return
146+
}
147+
ready <- struct{}{}
148+
for j := 0; j < len(connectSockets); j++ {
149+
time.Sleep(1 * time.Second)
150+
151+
for i := 0; i < nMessages; i++ {
152+
message := fmt.Sprintf("encrypted??: %d", i+j*nMessages)
153+
// fmt.Printf("sending - %s\n", message)
154+
_, err := regProcessor.sock.SendBytes([]byte(message), zmq.DONTWAIT)
155+
if err != nil {
156+
panic(fmt.Errorf("Publish Error: %w", err))
157+
}
158+
}
159+
time.Sleep(1 * time.Second)
160+
next <- struct{}{}
161+
}
162+
time.Sleep(1 * time.Second)
163+
done <- struct{}{}
164+
// fmt.Println("server complete")
165+
exit <- struct{}{}
166+
}()
167+
168+
<-ready
169+
for i, peerCase := range connectSockets {
170+
// t.Log("STARTING ", i)
171+
connectSocket := peerCase.s
172+
173+
sock, err := zmq.NewSocket(zmq.SUB)
174+
require.Nil(t, err, "case %d: %s", i, err)
175+
176+
err = sock.SetHeartbeatIvl(30000 * time.Millisecond)
177+
require.Nil(t, err, "case %d: %s", i, err)
178+
179+
err = sock.SetHeartbeatTimeout(1000 * time.Millisecond)
180+
require.Nil(t, err, "case %d: %s", i, err)
181+
182+
stationPubkeyZ85, err := zmq.AuthCurvePublic(connectSocket.stationPrivkeyZ85)
183+
require.Nil(t, err, "case %d: %s", i, err)
184+
185+
if connectSocket.authenticationType == "CURVE" {
186+
err = sock.ClientAuthCurve(connectSocket.centralPublicKey, stationPubkeyZ85, connectSocket.stationPrivkeyZ85)
187+
if peerCase.err != nil {
188+
require.ErrorIs(t, err, peerCase.err, "case %d: %s", i, err)
189+
<-next
190+
// t.Log("NEXT eauth", i)
191+
continue
192+
}
193+
require.Nil(t, err, "case %d: %s", i, err)
194+
}
195+
196+
err = sock.SetSubscribe("")
197+
require.Nil(t, err, "case %d: %s", i, err)
198+
199+
err = sock.Connect(connectSocket.address)
200+
if peerCase.err == nil {
201+
require.Nil(t, err, "case %d: %s", i, err)
202+
} else {
203+
require.ErrorIs(t, err, peerCase.err, "expected: %s\n got: %s", peerCase.err, err)
204+
<-next
205+
// t.Log("NEXT econn", i)
206+
continue
207+
}
208+
209+
c := make(chan []byte)
210+
go func() {
211+
// This go-routine will. live to the end of the test, so as more things are pushed into
212+
// the zmq publish this will still receive it, HOWEVER, the client(station) portion will
213+
// have moved on.
214+
defer sock.Close()
215+
var err error = nil
216+
var msg []byte
217+
for err == nil {
218+
msg, err = sock.RecvBytes(0)
219+
if err != nil {
220+
break
221+
}
222+
c <- msg
223+
}
224+
}()
225+
226+
var j int
227+
L:
228+
for {
229+
select {
230+
case <-c:
231+
j++
232+
// t.Logf("%s\n", string(msg))
233+
case <-next:
234+
// t.Log("NEXT ", i)
235+
break L
236+
case <-done:
237+
t.Fatal("failed to receive messages")
238+
}
239+
}
240+
assert.Equal(t, peerCase.c, j)
241+
}
242+
// t.Log("Client complete")
243+
<-done
244+
<-exit
245+
}
246+
247+
func repSocketMonitor(addr string) {
248+
s, err := zmq.NewSocket(zmq.PAIR)
249+
if err != nil {
250+
log.Fatalln(err)
251+
}
252+
err = s.Connect(addr)
253+
if err != nil {
254+
log.Fatalln(err)
255+
}
256+
for {
257+
a, b, c, err := s.RecvEvent(0)
258+
if err != nil {
259+
log.Println(err)
260+
break
261+
}
262+
log.Println(a, b, c)
263+
}
264+
s.Close()
265+
}
266+
267+
func TestZmqMonitor(t *testing.T) {
268+
269+
// REP socket
270+
rep, err := zmq.NewSocket(zmq.REP)
271+
if err != nil {
272+
log.Fatalln(err)
273+
}
274+
275+
// REP socket monitor, all events
276+
err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL)
277+
if err != nil {
278+
log.Fatalln(err)
279+
}
280+
go repSocketMonitor("inproc://monitor.rep")
281+
282+
// Generate an event
283+
err = rep.Bind("tcp://*:5555")
284+
if err != nil {
285+
log.Fatalln(err)
286+
}
287+
288+
// Allow some time for event detection
289+
time.Sleep(time.Second)
290+
291+
rep.Close()
292+
}

0 commit comments

Comments
 (0)