Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit c31b741

Browse files
author
David Chung
authored
Mux reverse proxy support for events (#454)
Signed-off-by: David Chung <[email protected]>
1 parent 74b6130 commit c31b741

File tree

16 files changed

+304
-112
lines changed

16 files changed

+304
-112
lines changed

cmd/cli/event/event.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,13 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
282282

283283
log.Info("Subscribing", "topic", eventTopic)
284284

285-
stream, err := client.SubscribeOn(eventTopic)
285+
stream, stop, err := client.SubscribeOn(eventTopic)
286286
if err != nil {
287287
return fmt.Errorf("cannot subscribe: %s, err=%v", topic, err)
288288
}
289289

290+
defer close(stop)
291+
290292
go func() {
291293
defer func() { done <- -1 }()
292294

pkg/broker/client/sse.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ import (
88
"net/http"
99
"net/url"
1010
"os"
11-
"path/filepath"
11+
"path"
1212

13+
logutil "github.com/docker/infrakit/pkg/log"
1314
"github.com/docker/infrakit/pkg/types"
1415
)
1516

1617
var (
1718
headerData = []byte("data:")
19+
20+
log = logutil.New("module", "broker/client")
1821
)
1922

2023
// Options contain client options
@@ -48,61 +51,74 @@ func trimHeader(size int, data []byte) []byte {
4851
return data
4952
}
5053

51-
func socketClient(u *url.URL, socketDir string) (*http.Client, error) {
52-
socketPath := filepath.Join(socketDir, u.Host)
54+
func socketClient(u *url.URL, socketDir string) (*http.Client, *http.Transport, error) {
55+
socketPath := path.Join(socketDir, u.Host)
5356
if f, err := os.Stat(socketPath); err != nil {
54-
return nil, err
57+
return nil, nil, err
5558
} else if f.Mode()&os.ModeSocket == 0 {
56-
return nil, fmt.Errorf("not-a-socket:%v", socketPath)
59+
return nil, nil, fmt.Errorf("not-a-socket:%v", socketPath)
5760
}
58-
return &http.Client{
59-
Transport: &http.Transport{
60-
Dial: func(proto, addr string) (conn net.Conn, err error) {
61-
return net.Dial("unix", socketPath)
62-
},
61+
62+
tsport := http.Transport{
63+
Dial: func(proto, addr string) (conn net.Conn, err error) {
64+
return net.Dial("unix", socketPath)
6365
},
64-
}, nil
66+
}
67+
return &http.Client{
68+
Transport: &tsport,
69+
}, &tsport, nil
6570
}
6671

67-
func httpClient(urlString string, opt Options) (*url.URL, *http.Client, error) {
72+
func httpClient(urlString string, opt Options) (*url.URL, *http.Client, *http.Transport, error) {
6873
u, err := url.Parse(urlString)
6974
if err != nil {
70-
return nil, nil, err
75+
return nil, nil, nil, err
7176
}
7277
switch u.Scheme {
7378

7479
case "http", "https":
75-
return u, &http.Client{}, nil
80+
tsport := http.DefaultTransport
81+
return u, &http.Client{
82+
Transport: tsport,
83+
}, tsport.(*http.Transport), nil
7684
case "unix":
7785
// unix: will look for a socket that matches the host name at a
7886
// directory path set by environment variable.
79-
c, err := socketClient(u, opt.SocketDir)
87+
c, tsport, err := socketClient(u, opt.SocketDir)
8088
if err != nil {
81-
return nil, nil, err
89+
return nil, nil, tsport, err
8290
}
8391
u.Scheme = "http"
92+
u.Host = "e"
8493
u.Path = "/"
85-
return u, c, nil
94+
return u, c, tsport, nil
8695
}
8796

88-
return nil, nil, fmt.Errorf("unsupported url:%s", urlString)
97+
return nil, nil, nil, fmt.Errorf("unsupported url:%s", urlString)
8998

9099
}
91100

92101
// Subscribe subscribes to a topic hosted at given url. It returns a channel of incoming events and errors
93-
func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error, error) {
94-
u, connection, err := httpClient(url, opt)
102+
// as well as done which will close the connection and exit the subscriber.
103+
func Subscribe(url, topic string, opt Options) (
104+
messages <-chan *types.Any,
105+
errors <-chan error,
106+
done chan<- struct{},
107+
err error,
108+
) {
109+
110+
u, connection, tsport, err := httpClient(url, opt)
95111
if err != nil {
96-
return nil, nil, err
112+
return nil, nil, nil, err
97113
}
98114

99115
if opt.Path != "" {
100-
u.Path = opt.Path
116+
u.Path = path.Join(u.Path, opt.Path)
101117
}
102118

103119
req, err := http.NewRequest("GET", u.String(), nil)
104120
if err != nil {
105-
return nil, nil, err
121+
return nil, nil, nil, err
106122
}
107123

108124
// Setup request, specify stream to connect to
@@ -117,6 +133,7 @@ func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error,
117133
req.Header.Set("Connection", "keep-alive")
118134

119135
streamCh := make(chan *types.Any)
136+
doneCh := make(chan struct{})
120137
errCh := make(chan error)
121138

122139
go func() {
@@ -131,6 +148,8 @@ func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error,
131148

132149
defer func() {
133150
resp.Body.Close()
151+
log.Debug("canceling request", "req", req)
152+
tsport.CancelRequest(req)
134153
close(streamCh)
135154
close(errCh)
136155
}()
@@ -143,8 +162,18 @@ func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error,
143162
reader := bufio.NewReader(resp.Body)
144163

145164
for {
165+
select {
166+
167+
case <-doneCh:
168+
log.Info("close", "connection", connection)
169+
return
170+
171+
default:
172+
}
173+
146174
// Read each new line and process the type of event
147175
line, err := reader.ReadBytes('\n')
176+
148177
if err != nil {
149178
errCh <- err
150179
return
@@ -167,5 +196,5 @@ func Subscribe(url, topic string, opt Options) (<-chan *types.Any, <-chan error,
167196
}
168197
}()
169198

170-
return streamCh, errCh, nil
199+
return streamCh, errCh, doneCh, nil
171200
}

pkg/broker/client/sse_test.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestBrokerMultiSubscribersEarlyDisconnects(t *testing.T) {
5353

5454
// Note that two clients are subscribing to the same topic:
5555

56-
topic0, _, err := Subscribe(socket, "local/", opts)
56+
topic0, _, done0, err := Subscribe(socket, "local/", opts)
5757
require.NoError(t, err)
5858
go func() {
5959
<-sync
@@ -69,7 +69,7 @@ func TestBrokerMultiSubscribersEarlyDisconnects(t *testing.T) {
6969
}
7070
}
7171
}()
72-
topic1, _, err := Subscribe(socket, "local/", opts)
72+
topic1, _, done1, err := Subscribe(socket, "local/", opts)
7373
require.NoError(t, err)
7474
go func() {
7575
<-sync
@@ -86,7 +86,7 @@ func TestBrokerMultiSubscribersEarlyDisconnects(t *testing.T) {
8686
}
8787
}()
8888

89-
topic2, _, err := Subscribe(socket+"/?topic=/local/time/", "", opts)
89+
topic2, _, done2, err := Subscribe(socket+"/?topic=/local/time/", "", opts)
9090
require.NoError(t, err)
9191
go func() {
9292
<-sync
@@ -130,6 +130,10 @@ func TestBrokerMultiSubscribersEarlyDisconnects(t *testing.T) {
130130
require.Equal(t, 20, len(values2))
131131
require.Equal(t, 10, len(values1))
132132
require.Equal(t, 10, len(values0))
133+
134+
close(done0)
135+
close(done1)
136+
close(done2)
133137
}
134138

135139
func TestBrokerMultiSubscriberCustomObject(t *testing.T) {
@@ -150,7 +154,7 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) {
150154

151155
opts := Options{SocketDir: filepath.Dir(socketFile)}
152156

153-
topic1, errs1, err := Subscribe(socket, "local/", opts)
157+
topic1, errs1, done1, err := Subscribe(socket, "local/", opts)
154158
require.NoError(t, err)
155159
go func() {
156160
for {
@@ -170,7 +174,7 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) {
170174
}
171175
}()
172176

173-
topic2, errs2, err := Subscribe(socket, "local/instance1", opts)
177+
topic2, errs2, done2, err := Subscribe(socket, "local/instance1", opts)
174178
require.NoError(t, err)
175179
go func() {
176180
for {
@@ -209,6 +213,9 @@ func TestBrokerMultiSubscriberCustomObject(t *testing.T) {
209213
require.Equal(t, a, b)
210214
}
211215

216+
close(done1)
217+
close(done2)
218+
212219
broker.Stop()
213220

214221
}
@@ -231,12 +238,15 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
231238
opts := Options{SocketDir: filepath.Dir(socketFile)}
232239

233240
start := make(chan struct{})
241+
234242
go func() {
235243
<-start
236244

237-
topic1, errs1, err := Subscribe(socket, "local/instance", opts)
245+
topic1, errs1, done1, err := Subscribe(socket, "local/instance", opts)
238246
require.NoError(t, err)
239247

248+
defer close(done1)
249+
240250
for {
241251
select {
242252
case e := <-errs1:
@@ -256,9 +266,11 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
256266
go func() {
257267
<-start
258268

259-
topic2, errs2, err := Subscribe(socket, "local/instancetest", opts)
269+
topic2, errs2, done2, err := Subscribe(socket, "local/instancetest", opts)
260270
require.NoError(t, err)
261271

272+
defer close(done2)
273+
262274
for {
263275
select {
264276
case e := <-errs2:
@@ -273,6 +285,7 @@ func TestBrokerMultiSubscriberPartialMatchTopic(t *testing.T) {
273285
}
274286
}
275287
}
288+
276289
}()
277290

278291
go func() {
@@ -320,7 +333,7 @@ func TestBrokerSubscriberExactMatchTopic(t *testing.T) {
320333

321334
opts := Options{SocketDir: filepath.Dir(socketFile)}
322335

323-
topic1, errs1, err := Subscribe(socket, "local/instance", opts)
336+
topic1, errs1, done1, err := Subscribe(socket, "local/instance", opts)
324337
require.NoError(t, err)
325338
go func() {
326339
for {
@@ -339,7 +352,7 @@ func TestBrokerSubscriberExactMatchTopic(t *testing.T) {
339352
}
340353
}()
341354

342-
topic2, errs2, err := Subscribe(socket, "local/", opts)
355+
topic2, errs2, done2, err := Subscribe(socket, "local/", opts)
343356
require.NoError(t, err)
344357
go func() {
345358
for {
@@ -358,7 +371,7 @@ func TestBrokerSubscriberExactMatchTopic(t *testing.T) {
358371
}
359372
}()
360373

361-
topic3, errs3, err := Subscribe(socket, "local", opts)
374+
topic3, errs3, done3, err := Subscribe(socket, "local", opts)
362375
require.NoError(t, err)
363376
go func() {
364377
for {
@@ -401,6 +414,11 @@ func TestBrokerSubscriberExactMatchTopic(t *testing.T) {
401414
require.NotEqual(t, b, c)
402415
require.NotEqual(t, a, b)
403416
}
417+
418+
close(done1)
419+
close(done2)
420+
close(done3)
421+
404422
broker.Stop()
405423
}
406424

@@ -425,7 +443,7 @@ func TestBrokerMultiSubscriberCustomObjectConnectAtURLPrefix(t *testing.T) {
425443

426444
opts := Options{SocketDir: filepath.Dir(socketFile)}
427445

428-
topic1, errs1, err := Subscribe(socket, "local/", opts)
446+
topic1, errs1, done1, err := Subscribe(socket, "local/", opts)
429447
require.NoError(t, err)
430448
go func() {
431449
for {
@@ -440,7 +458,7 @@ func TestBrokerMultiSubscriberCustomObjectConnectAtURLPrefix(t *testing.T) {
440458
}()
441459

442460
opts.Path = "/events"
443-
topic2, errs2, err := Subscribe(socket, "local/instance1", opts)
461+
topic2, errs2, done2, err := Subscribe(socket, "local/instance1", opts)
444462
require.NoError(t, err)
445463
go func() {
446464
for {
@@ -470,6 +488,9 @@ func TestBrokerMultiSubscriberCustomObjectConnectAtURLPrefix(t *testing.T) {
470488

471489
<-got404
472490
<-gotData
491+
492+
close(done1)
493+
close(done2)
473494
broker.Stop()
474495

475496
}

pkg/broker/server/server_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestListenAndServeOnSocket(t *testing.T) {
3131
received2 := make(chan interface{})
3232

3333
opts := client.Options{SocketDir: filepath.Dir(socketFile)}
34-
topic1, _, err := client.Subscribe(socket, "local/", opts)
34+
topic1, _, stop1, err := client.Subscribe(socket, "local/", opts)
3535
require.NoError(t, err)
3636
go func() {
3737
for {
@@ -42,7 +42,7 @@ func TestListenAndServeOnSocket(t *testing.T) {
4242
}
4343
}()
4444

45-
topic2, _, err := client.Subscribe(socket, "local/time/", opts)
45+
topic2, _, stop2, err := client.Subscribe(socket, "local/time/", opts)
4646
require.NoError(t, err)
4747
go func() {
4848
for {
@@ -69,6 +69,8 @@ func TestListenAndServeOnSocket(t *testing.T) {
6969
require.Equal(t, a, b)
7070
}
7171

72+
close(stop1)
73+
close(stop2)
7274
broker.Stop()
7375

7476
}

0 commit comments

Comments
 (0)