Skip to content

Commit 3381205

Browse files
Merge pull request #27061 from Luap99/bindings-upgrade
pkg/bindings: use HTTP 101 upgrade request for attach
2 parents e95fd4f + 2702156 commit 3381205

File tree

1 file changed

+70
-64
lines changed

1 file changed

+70
-64
lines changed

pkg/bindings/containers/attach.go

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -111,37 +111,11 @@ func Attach(ctx context.Context, nameOrID string, stdin io.Reader, stdout io.Wri
111111
}()
112112
}
113113

114-
headers := make(http.Header)
115-
headers.Add("Connection", "Upgrade")
116-
headers.Add("Upgrade", "tcp")
117-
118-
var socket net.Conn
119-
socketSet := false
120-
dialContext := conn.Client.Transport.(*http.Transport).DialContext
121-
t := &http.Transport{
122-
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
123-
c, err := dialContext(ctx, network, address)
124-
if err != nil {
125-
return nil, err
126-
}
127-
if !socketSet {
128-
socket = c
129-
socketSet = true
130-
}
131-
return c, err
132-
},
133-
IdleConnTimeout: time.Duration(0),
134-
}
135-
conn.Client.Transport = t
136-
response, err := conn.DoRequest(ctx, nil, http.MethodPost, "/containers/%s/attach", params, headers, nameOrID)
114+
cw, socket, err := newUpgradeRequest(ctx, conn, nil, fmt.Sprintf("/containers/%s/attach", nameOrID), params)
137115
if err != nil {
138116
return err
139117
}
140-
141-
if !response.IsSuccess() && !response.IsInformational() {
142-
defer response.Body.Close()
143-
return response.Process(nil)
144-
}
118+
defer socket.Close()
145119

146120
if needTTY {
147121
winChange := make(chan os.Signal, 1)
@@ -173,11 +147,7 @@ func Attach(ctx context.Context, nameOrID string, stdin io.Reader, stdout io.Wri
173147
logrus.Errorf("Failed to write input to service: %v", err)
174148
}
175149
if err == nil {
176-
if closeWrite, ok := socket.(CloseWriter); ok {
177-
if err := closeWrite.CloseWrite(); err != nil {
178-
logrus.Warnf("Failed to close STDIN for writing: %v", err)
179-
}
180-
}
150+
cw.CloseWrite()
181151
}
182152
stdinChan <- err
183153
}()
@@ -210,7 +180,7 @@ func Attach(ctx context.Context, nameOrID string, stdin io.Reader, stdout io.Wri
210180
return err
211181
}
212182

213-
return nil
183+
return <-stdoutChan
214184
}
215185
}
216186
} else {
@@ -464,33 +434,11 @@ func ExecStartAndAttach(ctx context.Context, sessionID string, options *ExecStar
464434
return err
465435
}
466436

467-
var socket net.Conn
468-
socketSet := false
469-
dialContext := conn.Client.Transport.(*http.Transport).DialContext
470-
t := &http.Transport{
471-
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
472-
c, err := dialContext(ctx, network, address)
473-
if err != nil {
474-
return nil, err
475-
}
476-
if !socketSet {
477-
socket = c
478-
socketSet = true
479-
}
480-
return c, err
481-
},
482-
IdleConnTimeout: time.Duration(0),
483-
}
484-
conn.Client.Transport = t
485-
response, err := conn.DoRequest(ctx, bytes.NewReader(bodyJSON), http.MethodPost, "/exec/%s/start", nil, nil, sessionID)
437+
cw, socket, err := newUpgradeRequest(ctx, conn, bytes.NewReader(bodyJSON), fmt.Sprintf("/exec/%s/start", sessionID), nil)
486438
if err != nil {
487439
return err
488440
}
489-
defer response.Body.Close()
490-
491-
if !response.IsSuccess() && !response.IsInformational() {
492-
return response.Process(nil)
493-
}
441+
defer socket.Close()
494442

495443
if needTTY {
496444
winChange := make(chan os.Signal, 1)
@@ -513,12 +461,7 @@ func ExecStartAndAttach(ctx context.Context, sessionID string, options *ExecStar
513461
}
514462

515463
if err == nil {
516-
if closeWrite, ok := socket.(CloseWriter); ok {
517-
logrus.Debugf("Closing STDIN")
518-
if err := closeWrite.CloseWrite(); err != nil {
519-
logrus.Warnf("Failed to close STDIN for writing: %v", err)
520-
}
521-
}
464+
cw.CloseWrite()
522465
}
523466
}()
524467
}
@@ -580,3 +523,66 @@ func ExecStartAndAttach(ctx context.Context, sessionID string, options *ExecStar
580523
}
581524
return nil
582525
}
526+
527+
type closeWrite struct {
528+
// sock is the underlying socket of the connection.
529+
// Do not use that field directly.
530+
sock net.Conn
531+
}
532+
533+
func (cw *closeWrite) CloseWrite() {
534+
if closeWrite, ok := cw.sock.(CloseWriter); ok {
535+
logrus.Debugf("Closing STDIN")
536+
if err := closeWrite.CloseWrite(); err != nil {
537+
logrus.Warnf("Failed to close STDIN for writing: %v", err)
538+
}
539+
}
540+
}
541+
542+
// newUpgradeRequest performs a new http Upgrade request, it return the closeWrite which should be used
543+
// to close the STDIN side used and the ReadWriter which MUST be uses to write/read from the connection
544+
// and which must closed when finished. Do not access the new.Conn in closeWrite directly.
545+
func newUpgradeRequest(ctx context.Context, conn *bindings.Connection, body io.Reader, path string, params url.Values) (*closeWrite, io.ReadWriteCloser, error) {
546+
headers := http.Header{
547+
"Connection": []string{"Upgrade"},
548+
"Upgrade": []string{"tcp"},
549+
}
550+
551+
var socket net.Conn
552+
socketSet := false
553+
dialContext := conn.Client.Transport.(*http.Transport).DialContext
554+
t := &http.Transport{
555+
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
556+
c, err := dialContext(ctx, network, address)
557+
if err != nil {
558+
return nil, err
559+
}
560+
if !socketSet {
561+
socket = c
562+
socketSet = true
563+
}
564+
return c, err
565+
},
566+
IdleConnTimeout: time.Duration(0),
567+
}
568+
conn.Client.Transport = t
569+
response, err := conn.DoRequest(ctx, body, http.MethodPost, path, params, headers)
570+
if err != nil {
571+
return nil, nil, err
572+
}
573+
574+
if response.StatusCode != http.StatusSwitchingProtocols {
575+
defer response.Body.Close()
576+
if err := response.Process(nil); err != nil {
577+
return nil, nil, err
578+
}
579+
return nil, nil, fmt.Errorf("incorrect server response code %d, expected %d", response.StatusCode, http.StatusSwitchingProtocols)
580+
}
581+
rw, ok := response.Body.(io.ReadWriteCloser)
582+
if !ok {
583+
response.Body.Close()
584+
return nil, nil, errors.New("internal error: cannot cast to http response Body to io.ReadWriteCloser")
585+
}
586+
587+
return &closeWrite{sock: socket}, rw, nil
588+
}

0 commit comments

Comments
 (0)