Skip to content

Commit b06765c

Browse files
[AUTO-CHERRYPICK] [3.0] Fix CVE-2023-39325 and CVE-2023-44487 for containerized-data-importer - branch 3.0-dev (#12085)
Co-authored-by: Henry Li <[email protected]>
1 parent 7cef86d commit b06765c

File tree

3 files changed

+381
-1
lines changed

3 files changed

+381
-1
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
2+
index 8cb14f3..6000140 100644
3+
--- a/vendor/golang.org/x/net/http2/server.go
4+
+++ b/vendor/golang.org/x/net/http2/server.go
5+
@@ -581,9 +581,11 @@ type serverConn struct {
6+
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
7+
curClientStreams uint32 // number of open streams initiated by the client
8+
curPushedStreams uint32 // number of open streams initiated by server push
9+
+ curHandlers uint32 // number of running handler goroutines
10+
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
11+
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
12+
streams map[uint32]*stream
13+
+ unstartedHandlers []unstartedHandler
14+
initialStreamSendWindowSize int32
15+
maxFrameSize int32
16+
peerMaxHeaderListSize uint32 // zero means unknown (default)
17+
@@ -981,6 +983,8 @@ func (sc *serverConn) serve() {
18+
return
19+
case gracefulShutdownMsg:
20+
sc.startGracefulShutdownInternal()
21+
+ case handlerDoneMsg:
22+
+ sc.handlerDone()
23+
default:
24+
panic("unknown timer")
25+
}
26+
@@ -1028,6 +1032,7 @@ var (
27+
idleTimerMsg = new(serverMessage)
28+
shutdownTimerMsg = new(serverMessage)
29+
gracefulShutdownMsg = new(serverMessage)
30+
+ handlerDoneMsg = new(serverMessage)
31+
)
32+
33+
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
34+
@@ -2022,8 +2027,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
35+
}
36+
}
37+
38+
- go sc.runHandler(rw, req, handler)
39+
- return nil
40+
+ return sc.scheduleHandler(id, rw, req, handler)
41+
}
42+
43+
func (sc *serverConn) upgradeRequest(req *http.Request) {
44+
@@ -2043,6 +2047,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
45+
sc.conn.SetReadDeadline(time.Time{})
46+
}
47+
48+
+ // This is the first request on the connection,
49+
+ // so start the handler directly rather than going
50+
+ // through scheduleHandler.
51+
+ sc.curHandlers++
52+
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
53+
}
54+
55+
@@ -2283,8 +2291,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
56+
return &responseWriter{rws: rws}
57+
}
58+
59+
+type unstartedHandler struct {
60+
+ streamID uint32
61+
+ rw *responseWriter
62+
+ req *http.Request
63+
+ handler func(http.ResponseWriter, *http.Request)
64+
+}
65+
+
66+
+// scheduleHandler starts a handler goroutine,
67+
+// or schedules one to start as soon as an existing handler finishes.
68+
+func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
69+
+ sc.serveG.check()
70+
+ maxHandlers := sc.advMaxStreams
71+
+ if sc.curHandlers < maxHandlers {
72+
+ sc.curHandlers++
73+
+ go sc.runHandler(rw, req, handler)
74+
+ return nil
75+
+ }
76+
+ if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
77+
+ return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
78+
+ }
79+
+ sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
80+
+ streamID: streamID,
81+
+ rw: rw,
82+
+ req: req,
83+
+ handler: handler,
84+
+ })
85+
+ return nil
86+
+}
87+
+
88+
+func (sc *serverConn) handlerDone() {
89+
+ sc.serveG.check()
90+
+ sc.curHandlers--
91+
+ i := 0
92+
+ maxHandlers := sc.advMaxStreams
93+
+ for ; i < len(sc.unstartedHandlers); i++ {
94+
+ u := sc.unstartedHandlers[i]
95+
+ if sc.streams[u.streamID] == nil {
96+
+ // This stream was reset before its goroutine had a chance to start.
97+
+ continue
98+
+ }
99+
+ if sc.curHandlers >= maxHandlers {
100+
+ break
101+
+ }
102+
+ sc.curHandlers++
103+
+ go sc.runHandler(u.rw, u.req, u.handler)
104+
+ sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
105+
+ }
106+
+ sc.unstartedHandlers = sc.unstartedHandlers[i:]
107+
+ if len(sc.unstartedHandlers) == 0 {
108+
+ sc.unstartedHandlers = nil
109+
+ }
110+
+}
111+
+
112+
// Run on its own goroutine.
113+
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
114+
+ defer sc.sendServeMsg(handlerDoneMsg)
115+
didPanic := true
116+
defer func() {
117+
rw.rws.stream.cancelCtx()
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
2+
index 3dd1564..9d9a3fd 100644
3+
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
4+
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
5+
@@ -165,15 +165,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
6+
ID: http2.SettingMaxFrameSize,
7+
Val: http2MaxFrameLen,
8+
}}
9+
- // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
10+
- // permitted in the HTTP2 spec.
11+
- maxStreams := config.MaxStreams
12+
- if maxStreams == 0 {
13+
- maxStreams = math.MaxUint32
14+
- } else {
15+
+ if config.MaxStreams != math.MaxUint32 {
16+
isettings = append(isettings, http2.Setting{
17+
ID: http2.SettingMaxConcurrentStreams,
18+
- Val: maxStreams,
19+
+ Val: config.MaxStreams,
20+
})
21+
}
22+
dynamicWindow := true
23+
@@ -252,7 +247,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
24+
framer: framer,
25+
readerDone: make(chan struct{}),
26+
writerDone: make(chan struct{}),
27+
- maxStreams: maxStreams,
28+
+ maxStreams: config.MaxStreams,
29+
inTapHandle: config.InTapHandle,
30+
fc: &trInFlow{limit: uint32(icwz)},
31+
state: reachable,
32+
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
33+
index f4dde72..98839ad 100644
34+
--- a/vendor/google.golang.org/grpc/server.go
35+
+++ b/vendor/google.golang.org/grpc/server.go
36+
@@ -43,7 +43,6 @@ import (
37+
"google.golang.org/grpc/internal"
38+
"google.golang.org/grpc/internal/binarylog"
39+
"google.golang.org/grpc/internal/channelz"
40+
- "google.golang.org/grpc/internal/grpcrand"
41+
"google.golang.org/grpc/internal/grpcsync"
42+
"google.golang.org/grpc/internal/transport"
43+
"google.golang.org/grpc/keepalive"
44+
@@ -74,10 +73,10 @@ func init() {
45+
srv.drainServerTransports(addr)
46+
}
47+
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
48+
- extraServerOptions = append(extraServerOptions, opt...)
49+
+ globalServerOptions = append(globalServerOptions, opt...)
50+
}
51+
internal.ClearGlobalServerOptions = func() {
52+
- extraServerOptions = nil
53+
+ globalServerOptions = nil
54+
}
55+
internal.BinaryLogger = binaryLogger
56+
internal.JoinServerOptions = newJoinServerOption
57+
@@ -115,12 +114,6 @@ type serviceInfo struct {
58+
mdata interface{}
59+
}
60+
61+
-type serverWorkerData struct {
62+
- st transport.ServerTransport
63+
- wg *sync.WaitGroup
64+
- stream *transport.Stream
65+
-}
66+
-
67+
// Server is a gRPC server to serve RPC requests.
68+
type Server struct {
69+
opts serverOptions
70+
@@ -145,7 +138,7 @@ type Server struct {
71+
channelzID *channelz.Identifier
72+
czData *channelzData
73+
74+
- serverWorkerChannels []chan *serverWorkerData
75+
+ serverWorkerChannel chan func()
76+
}
77+
78+
type serverOptions struct {
79+
@@ -177,13 +170,14 @@ type serverOptions struct {
80+
}
81+
82+
var defaultServerOptions = serverOptions{
83+
+ maxConcurrentStreams: math.MaxUint32,
84+
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
85+
maxSendMessageSize: defaultServerMaxSendMessageSize,
86+
connectionTimeout: 120 * time.Second,
87+
writeBufferSize: defaultWriteBufSize,
88+
readBufferSize: defaultReadBufSize,
89+
}
90+
-var extraServerOptions []ServerOption
91+
+var globalServerOptions []ServerOption
92+
93+
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
94+
type ServerOption interface {
95+
@@ -387,6 +381,9 @@ func MaxSendMsgSize(m int) ServerOption {
96+
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
97+
// of concurrent streams to each ServerTransport.
98+
func MaxConcurrentStreams(n uint32) ServerOption {
99+
+ if n == 0 {
100+
+ n = math.MaxUint32
101+
+ }
102+
return newFuncServerOption(func(o *serverOptions) {
103+
o.maxConcurrentStreams = n
104+
})
105+
@@ -565,42 +562,35 @@ const serverWorkerResetThreshold = 1 << 16
106+
// re-allocations (see the runtime.morestack problem [1]).
107+
//
108+
// [1] https://github.com/golang/go/issues/18138
109+
-func (s *Server) serverWorker(ch chan *serverWorkerData) {
110+
- // To make sure all server workers don't reset at the same time, choose a
111+
- // random number of iterations before resetting.
112+
- threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
113+
- for completed := 0; completed < threshold; completed++ {
114+
- data, ok := <-ch
115+
+func (s *Server) serverWorker() {
116+
+ for completed := 0; completed < serverWorkerResetThreshold; completed++ {
117+
+ f, ok := <-s.serverWorkerChannel
118+
if !ok {
119+
return
120+
}
121+
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
122+
- data.wg.Done()
123+
+ f()
124+
}
125+
- go s.serverWorker(ch)
126+
+ go s.serverWorker()
127+
}
128+
129+
// initServerWorkers creates worker goroutines and channels to process incoming
130+
// connections to reduce the time spent overall on runtime.morestack.
131+
func (s *Server) initServerWorkers() {
132+
- s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
133+
+ s.serverWorkerChannel = make(chan func())
134+
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
135+
- s.serverWorkerChannels[i] = make(chan *serverWorkerData)
136+
- go s.serverWorker(s.serverWorkerChannels[i])
137+
+ go s.serverWorker()
138+
}
139+
}
140+
141+
func (s *Server) stopServerWorkers() {
142+
- for i := uint32(0); i < s.opts.numServerWorkers; i++ {
143+
- close(s.serverWorkerChannels[i])
144+
- }
145+
+ close(s.serverWorkerChannel)
146+
}
147+
148+
// NewServer creates a gRPC server which has no service registered and has not
149+
// started to accept requests yet.
150+
func NewServer(opt ...ServerOption) *Server {
151+
opts := defaultServerOptions
152+
- for _, o := range extraServerOptions {
153+
+ for _, o := range globalServerOptions {
154+
o.apply(&opts)
155+
}
156+
for _, o := range opt {
157+
@@ -945,25 +935,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
158+
defer st.Close()
159+
var wg sync.WaitGroup
160+
161+
- var roundRobinCounter uint32
162+
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
163+
st.HandleStreams(func(stream *transport.Stream) {
164+
wg.Add(1)
165+
+
166+
+ streamQuota.acquire()
167+
+ f := func() {
168+
+ defer streamQuota.release()
169+
+ defer wg.Done()
170+
+ s.handleStream(st, stream, s.traceInfo(st, stream))
171+
+ }
172+
+
173+
if s.opts.numServerWorkers > 0 {
174+
- data := &serverWorkerData{st: st, wg: &wg, stream: stream}
175+
select {
176+
- case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
177+
+ case s.serverWorkerChannel <- f:
178+
+ return
179+
default:
180+
// If all stream workers are busy, fallback to the default code path.
181+
- go func() {
182+
- s.handleStream(st, stream, s.traceInfo(st, stream))
183+
- wg.Done()
184+
- }()
185+
}
186+
} else {
187+
- go func() {
188+
- defer wg.Done()
189+
- s.handleStream(st, stream, s.traceInfo(st, stream))
190+
- }()
191+
+ go f()
192+
}
193+
}, func(ctx context.Context, method string) context.Context {
194+
if !EnableTracing {
195+
@@ -1978,3 +1969,34 @@ type channelzServer struct {
196+
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
197+
return c.s.channelzMetric()
198+
}
199+
+
200+
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
201+
+// called synchronously; release may be called asynchronously.
202+
+type atomicSemaphore struct {
203+
+ n atomic.Int64
204+
+ wait chan struct{}
205+
+}
206+
+
207+
+func (q *atomicSemaphore) acquire() {
208+
+ if q.n.Add(-1) < 0 {
209+
+ // We ran out of quota. Block until a release happens.
210+
+ <-q.wait
211+
+ }
212+
+}
213+
+
214+
+func (q *atomicSemaphore) release() {
215+
+ // N.B. the "<= 0" check below should allow for this to work with multiple
216+
+ // concurrent calls to acquire, but also note that with synchronous calls to
217+
+ // acquire, as our system does, n will never be less than -1. There are
218+
+ // fairness issues (queuing) to consider if this was to be generalized.
219+
+ if q.n.Add(1) <= 0 {
220+
+ // An acquire was waiting on us. Unblock it.
221+
+ q.wait <- struct{}{}
222+
+ }
223+
+}
224+
+
225+
+func newHandlerQuota(n uint32) *atomicSemaphore {
226+
+ a := &atomicSemaphore{wait: make(chan struct{}, 1)}
227+
+ a.n.Store(int64(n))
228+
+ return a
229+
+}
230+
\ No newline at end of file
231+
diff --git a/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go
232+
index d738725..3674914 100644
233+
--- a/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go
234+
+++ b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go
235+
@@ -126,14 +126,17 @@ type rudimentaryErrorBackoff struct {
236+
// OnError will block if it is called more often than the embedded period time.
237+
// This will prevent overly tight hot error loops.
238+
func (r *rudimentaryErrorBackoff) OnError(error) {
239+
+ now := time.Now() // start the timer before acquiring the lock
240+
r.lastErrorTimeLock.Lock()
241+
- defer r.lastErrorTimeLock.Unlock()
242+
- d := time.Since(r.lastErrorTime)
243+
- if d < r.minPeriod {
244+
- // If the time moves backwards for any reason, do nothing
245+
- time.Sleep(r.minPeriod - d)
246+
- }
247+
+ d := now.Sub(r.lastErrorTime)
248+
r.lastErrorTime = time.Now()
249+
+ r.lastErrorTimeLock.Unlock()
250+
+
251+
+ // Do not sleep with the lock held because that causes all callers of HandleError to block.
252+
+ // We only want the current goroutine to block.
253+
+ // A negative or zero duration causes time.Sleep to return immediately.
254+
+ // If the time moves backwards for any reason, do nothing.
255+
+ time.Sleep(r.minPeriod - d)
256+
}
257+
258+
// GetCaller returns the caller of the function that calls it.

0 commit comments

Comments
 (0)