Skip to content

Commit 1200625

Browse files
authored
Merge branch 'grpc:master' into subchannel-disconnection-unknown-reason
2 parents dcaf507 + e5563c6 commit 1200625

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+2571
-1473
lines changed

.github/workflows/testing.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
- name: Setup Go
2828
uses: actions/setup-go@v5
2929
with:
30-
go-version: '1.25'
30+
go-version: '1.26'
3131
cache-dependency-path: "**/go.sum"
3232

3333
# Run the vet-proto checks.
@@ -45,29 +45,29 @@ jobs:
4545
matrix:
4646
include:
4747
- type: vet
48-
goversion: '1.24'
48+
goversion: '1.25'
4949

5050
- type: extras
51-
goversion: '1.25'
51+
goversion: '1.26'
5252

5353
- type: tests
54-
goversion: '1.25'
54+
goversion: '1.26'
5555

5656
- type: tests
57-
goversion: '1.25'
57+
goversion: '1.26'
5858
testflags: -race
5959

6060
- type: tests
61-
goversion: '1.25'
61+
goversion: '1.26'
6262
goarch: 386
6363

6464
- type: tests
65-
goversion: '1.25'
65+
goversion: '1.26'
6666
goarch: arm64
6767
runner: ubuntu-24.04-arm
6868

6969
- type: tests
70-
goversion: '1.24'
70+
goversion: '1.25'
7171

7272
steps:
7373
# Setup the environment.

balancer/rls/balancer.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ var (
7979
dataCachePurgeHook = func() {}
8080
resetBackoffHook = func() {}
8181

82-
cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
82+
cacheEntriesMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
8383
Name: "grpc.lb.rls.cache_entries",
8484
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
8585
Unit: "{entry}",
8686
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
8787
Default: false,
8888
})
89-
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
89+
cacheSizeMetric = estats.RegisterInt64AsyncGauge(estats.MetricDescriptor{
9090
Name: "grpc.lb.rls.cache_size",
9191
Description: "EXPERIMENTAL. The current size of the RLS cache.",
9292
Unit: "By",
@@ -140,7 +140,9 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
140140
updateCh: buffer.NewUnbounded(),
141141
}
142142
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
143-
lb.dataCache = newDataCache(maxCacheSize, lb.logger, cc.MetricsRecorder(), opts.Target.String())
143+
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.Target.String())
144+
metricsRecorder := cc.MetricsRecorder()
145+
lb.unregisterMetricHandler = metricsRecorder.RegisterAsyncReporter(lb, cacheEntriesMetric, cacheSizeMetric)
144146
lb.bg = balancergroup.New(balancergroup.Options{
145147
CC: cc,
146148
BuildOpts: opts,
@@ -162,6 +164,9 @@ type rlsBalancer struct {
162164
dataCachePurgeHook func()
163165
logger *internalgrpclog.PrefixLogger
164166

167+
// unregisterMetricHandler is the function to deregister the async metric reporter.
168+
unregisterMetricHandler func()
169+
165170
// If both cacheMu and stateMu need to be acquired, the former must be
166171
// acquired first to prevent a deadlock. This order restriction is due to the
167172
// fact that in places where we need to acquire both the locks, we always
@@ -488,6 +493,7 @@ func (b *rlsBalancer) Close() {
488493
if b.ctrlCh != nil {
489494
b.ctrlCh.close()
490495
}
496+
b.unregisterMetricHandler()
491497
b.bg.Close()
492498
b.stateMu.Unlock()
493499

@@ -702,3 +708,23 @@ func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
702708
}
703709
b.stateMu.Unlock()
704710
}
711+
712+
// Report reports the metrics data to the provided recorder.
713+
func (b *rlsBalancer) Report(r estats.AsyncMetricsRecorder) error {
714+
b.cacheMu.Lock()
715+
currentSize := b.dataCache.currentSize
716+
entriesLen := int64(len(b.dataCache.entries))
717+
rlsServerTarget := b.dataCache.rlsServerTarget
718+
grpcTarget := b.dataCache.grpcTarget
719+
uuid := b.dataCache.uuid
720+
shutdown := b.dataCache.shutdown.HasFired()
721+
b.cacheMu.Unlock()
722+
723+
if shutdown {
724+
return nil
725+
}
726+
727+
cacheSizeMetric.Record(r, currentSize, grpcTarget, rlsServerTarget, uuid)
728+
cacheEntriesMetric.Record(r, entriesLen, grpcTarget, rlsServerTarget, uuid)
729+
return nil
730+
}

balancer/rls/cache.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"time"
2424

2525
"github.com/google/uuid"
26-
estats "google.golang.org/grpc/experimental/stats"
2726
"google.golang.org/grpc/internal/backoff"
2827
internalgrpclog "google.golang.org/grpc/internal/grpclog"
2928
"google.golang.org/grpc/internal/grpcsync"
@@ -174,21 +173,19 @@ type dataCache struct {
174173
rlsServerTarget string
175174

176175
// Read only after initialization.
177-
grpcTarget string
178-
uuid string
179-
metricsRecorder estats.MetricsRecorder
176+
grpcTarget string
177+
uuid string
180178
}
181179

182-
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache {
180+
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, grpcTarget string) *dataCache {
183181
return &dataCache{
184-
maxSize: size,
185-
keys: newLRU(),
186-
entries: make(map[cacheKey]*cacheEntry),
187-
logger: logger,
188-
shutdown: grpcsync.NewEvent(),
189-
grpcTarget: grpcTarget,
190-
uuid: uuid.New().String(),
191-
metricsRecorder: metricsRecorder,
182+
maxSize: size,
183+
keys: newLRU(),
184+
entries: make(map[cacheKey]*cacheEntry),
185+
logger: logger,
186+
shutdown: grpcsync.NewEvent(),
187+
grpcTarget: grpcTarget,
188+
uuid: uuid.New().String(),
192189
}
193190
}
194191

@@ -327,8 +324,7 @@ func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled
327324
if dc.currentSize > dc.maxSize {
328325
backoffCancelled = dc.resize(dc.maxSize)
329326
}
330-
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
331-
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
327+
332328
return backoffCancelled, true
333329
}
334330

@@ -338,7 +334,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
338334
dc.currentSize -= entry.size
339335
entry.size = newSize
340336
dc.currentSize += entry.size
341-
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
337+
342338
}
343339

344340
func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
@@ -371,8 +367,7 @@ func (dc *dataCache) deleteAndCleanup(key cacheKey, entry *cacheEntry) {
371367
delete(dc.entries, key)
372368
dc.currentSize -= entry.size
373369
dc.keys.removeEntry(key)
374-
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
375-
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
370+
376371
}
377372

378373
func (dc *dataCache) stop() {

balancer/rls/cache_test.go

Lines changed: 5 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/google/go-cmp/cmp"
2626
"github.com/google/go-cmp/cmp/cmpopts"
2727
"google.golang.org/grpc/internal/backoff"
28-
"google.golang.org/grpc/internal/testutils/stats"
2928
)
3029

3130
var (
@@ -120,7 +119,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {
120119

121120
func (s) TestDataCache_BasicOperations(t *testing.T) {
122121
initCacheEntries()
123-
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
122+
dc := newDataCache(5, nil, "")
124123
for i, k := range cacheKeys {
125124
dc.addEntry(k, cacheEntries[i])
126125
}
@@ -134,7 +133,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {
134133

135134
func (s) TestDataCache_AddForcesResize(t *testing.T) {
136135
initCacheEntries()
137-
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")
136+
dc := newDataCache(1, nil, "")
138137

139138
// The first entry in cacheEntries has a minimum expiry time in the future.
140139
// This entry would stop the resize operation since we do not evict entries
@@ -163,7 +162,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {
163162

164163
func (s) TestDataCache_Resize(t *testing.T) {
165164
initCacheEntries()
166-
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
165+
dc := newDataCache(5, nil, "")
167166
for i, k := range cacheKeys {
168167
dc.addEntry(k, cacheEntries[i])
169168
}
@@ -194,7 +193,7 @@ func (s) TestDataCache_Resize(t *testing.T) {
194193

195194
func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
196195
initCacheEntries()
197-
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
196+
dc := newDataCache(5, nil, "")
198197
for i, k := range cacheKeys {
199198
dc.addEntry(k, cacheEntries[i])
200199
}
@@ -221,7 +220,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
221220
}
222221

223222
initCacheEntries()
224-
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
223+
dc := newDataCache(5, nil, "")
225224
for i, k := range cacheKeys {
226225
dc.addEntry(k, cacheEntries[i])
227226
}
@@ -242,61 +241,3 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
242241
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
243242
}
244243
}
245-
246-
func (s) TestDataCache_Metrics(t *testing.T) {
247-
cacheEntriesMetricsTests := []*cacheEntry{
248-
{size: 1},
249-
{size: 2},
250-
{size: 3},
251-
{size: 4},
252-
{size: 5},
253-
}
254-
tmr := stats.NewTestMetricsRecorder()
255-
dc := newDataCache(50, nil, tmr, "")
256-
257-
dc.updateRLSServerTarget("rls-server-target")
258-
for i, k := range cacheKeys {
259-
dc.addEntry(k, cacheEntriesMetricsTests[i])
260-
}
261-
262-
const cacheEntriesKey = "grpc.lb.rls.cache_entries"
263-
const cacheSizeKey = "grpc.lb.rls.cache_size"
264-
// 5 total entries which add up to 15 size, so should record that.
265-
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
266-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
267-
}
268-
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
269-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
270-
}
271-
272-
// Resize down the cache to 2 entries (deterministic as based of LRU).
273-
dc.resize(9)
274-
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
275-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
276-
}
277-
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
278-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
279-
}
280-
281-
// Update an entry to have size 6. This should reflect in the size metrics,
282-
// which will increase by 1 to 11, while the number of cache entries should
283-
// stay same. This write is deterministic and writes to the last one.
284-
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)
285-
286-
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
287-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
288-
}
289-
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
290-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
291-
}
292-
293-
// Delete this scaled up cache key. This should scale down the cache to 1
294-
// entries, and remove 6 size so cache size should be 4.
295-
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
296-
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
297-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
298-
}
299-
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
300-
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
301-
}
302-
}

benchmark/benchmain/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ var (
117117
sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a comma-separated list")
118118
connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")
119119
recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolSimple, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)
120-
sharedWriteBuffer = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOff,
120+
sharedWriteBuffer = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOn,
121121
fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
122122

123123
logger = grpclog.Component("benchmark")

cmd/protoc-gen-go-grpc/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module google.golang.org/grpc/cmd/protoc-gen-go-grpc
22

3-
go 1.24.0
3+
go 1.25.0
44

55
require (
66
google.golang.org/grpc v1.70.0

credentials/xds/xds.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"crypto/tls"
2626
"errors"
2727
"net"
28-
"sync/atomic"
2928
"time"
3029

3130
"google.golang.org/grpc/credentials"
@@ -114,8 +113,10 @@ func (c *credsImpl) ClientHandshake(ctx context.Context, authority string, rawCo
114113
return c.fallback.ClientHandshake(ctx, authority, rawConn)
115114
}
116115

117-
uPtr := xdsinternal.GetHandshakeInfo(chi.Attributes)
118-
hi := (*xdsinternal.HandshakeInfo)(atomic.LoadPointer(uPtr))
116+
hi := xdsinternal.HandshakeInfoFromAttributes(chi.Attributes).Load()
117+
if hi == nil {
118+
return c.fallback.ClientHandshake(ctx, authority, rawConn)
119+
}
119120
if hi.UseFallbackCreds() {
120121
return c.fallback.ClientHandshake(ctx, authority, rawConn)
121122
}

credentials/xds/xds_client_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sync/atomic"
3131
"testing"
3232
"time"
33-
"unsafe"
3433

3534
"google.golang.org/grpc/credentials"
3635
"google.golang.org/grpc/credentials/tls/certprovider"
@@ -228,9 +227,10 @@ func newTestContextWithHandshakeInfo(parent context.Context, root, identity cert
228227
if sanExactMatch != "" {
229228
sms = []matcher.StringMatcher{matcher.NewExactStringMatcher(sanExactMatch, false)}
230229
}
230+
var hiPtr atomic.Pointer[xdsinternal.HandshakeInfo]
231231
info := xdsinternal.NewHandshakeInfo(root, identity, sms, false)
232-
uPtr := unsafe.Pointer(info)
233-
addr := xdsinternal.SetHandshakeInfo(resolver.Address{}, &uPtr)
232+
hiPtr.Store(info)
233+
addr := xdsinternal.SetHandshakeInfo(resolver.Address{}, &hiPtr)
234234

235235
// Moving the attributes from the resolver.Address to the context passed to
236236
// the handshaker is done in the transport layer. Since we directly call the
@@ -546,8 +546,9 @@ func (s) TestClientCredsProviderSwitch(t *testing.T) {
546546
// We need to repeat most of what newTestContextWithHandshakeInfo() does
547547
// here because we need access to the underlying HandshakeInfo so that we
548548
// can update it before the next call to ClientHandshake().
549-
uPtr := unsafe.Pointer(handshakeInfo)
550-
addr := xdsinternal.SetHandshakeInfo(resolver.Address{}, &uPtr)
549+
var hiPtr atomic.Pointer[xdsinternal.HandshakeInfo]
550+
hiPtr.Store(handshakeInfo)
551+
addr := xdsinternal.SetHandshakeInfo(resolver.Address{}, &hiPtr)
551552
ctx = icredentials.NewClientHandshakeInfoContext(ctx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
552553
if _, _, err := creds.ClientHandshake(ctx, authority, conn); err == nil {
553554
t.Fatal("ClientHandshake() succeeded when expected to fail")
@@ -571,7 +572,7 @@ func (s) TestClientCredsProviderSwitch(t *testing.T) {
571572
handshakeInfo = xdsinternal.NewHandshakeInfo(root2, nil, []matcher.StringMatcher{matcher.NewExactStringMatcher(defaultTestCertSAN, false)}, false)
572573
// Update the existing pointer, which address attribute will continue to
573574
// point to.
574-
atomic.StorePointer(&uPtr, unsafe.Pointer(handshakeInfo))
575+
hiPtr.Store(handshakeInfo)
575576
_, ai, err := creds.ClientHandshake(ctx, authority, conn)
576577
if err != nil {
577578
t.Fatalf("ClientHandshake() returned failed: %q", err)

dialoptions.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -705,10 +705,11 @@ func WithDisableHealthCheck() DialOption {
705705
func defaultDialOptions() dialOptions {
706706
return dialOptions{
707707
copts: transport.ConnectOptions{
708-
ReadBufferSize: defaultReadBufSize,
709-
WriteBufferSize: defaultWriteBufSize,
710-
UserAgent: grpcUA,
711-
BufferPool: mem.DefaultBufferPool(),
708+
ReadBufferSize: defaultReadBufSize,
709+
WriteBufferSize: defaultWriteBufSize,
710+
SharedWriteBuffer: true,
711+
UserAgent: grpcUA,
712+
BufferPool: mem.DefaultBufferPool(),
712713
},
713714
bs: internalbackoff.DefaultExponential,
714715
idleTimeout: 30 * time.Minute,

0 commit comments

Comments
 (0)