Skip to content

Commit deccf63

Browse files
committed
sql: clean up some uses of Unlock
This is a minor cleanup change to use `defer.Unlock` when there are multiple exit paths that need to unlock. Epic: none Release note: None
1 parent f295bd8 commit deccf63

File tree

8 files changed

+77
-58
lines changed

8 files changed

+77
-58
lines changed

pkg/sql/flowinfra/flow_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,13 +366,13 @@ func (fr *FlowRegistry) cancelPendingStreams(
366366
// ConnectInboundStream calls for the flow will fail to find it and time out.
367367
func (fr *FlowRegistry) UnregisterFlow(id execinfrapb.FlowID) {
368368
fr.Lock()
369+
defer fr.Unlock()
369370
entry := fr.flows[id]
370371
if entry.streamTimer != nil {
371372
entry.streamTimer.Stop()
372373
entry.streamTimer = nil
373374
}
374375
fr.releaseEntryLocked(id)
375-
fr.Unlock()
376376
}
377377

378378
// waitForFlow waits until the flow with the given id gets registered - up to

pkg/sql/pgwire/server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,14 @@ func (s *Server) setDrainingLocked(drain bool) bool {
450450
return true
451451
}
452452

453+
// setDraining sets the server's draining state and returns whether the
454+
// state changed (i.e. drain != s.mu.draining).
455+
func (s *Server) setDraining(drain bool) bool {
456+
s.mu.Lock()
457+
defer s.mu.Unlock()
458+
return s.setDrainingLocked(drain)
459+
}
460+
453461
// setRejectNewConnectionsLocked sets the server's rejectNewConnections state.
454462
// s.mu must be locked when setRejectNewConnectionsLocked is called.
455463
func (s *Server) setRejectNewConnectionsLocked(rej bool) {
@@ -567,13 +575,10 @@ func (s *Server) drainImpl(
567575
stopper *stop.Stopper,
568576
) error {
569577

570-
s.mu.Lock()
571-
if !s.setDrainingLocked(true) {
578+
if !s.setDraining(true) {
572579
// We are already draining.
573-
s.mu.Unlock()
574580
return nil
575581
}
576-
s.mu.Unlock()
577582

578583
// If there is no open SQL connections to drain, just return.
579584
if s.GetConnCancelMapLen() == 0 {

pkg/sql/sqlliveness/slstorage/slstorage.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -202,33 +202,41 @@ const (
202202
func (s *Storage) isAlive(
203203
ctx context.Context, sid sqlliveness.SessionID, syncOrAsync readType,
204204
) (alive bool, _ error) {
205-
s.mu.Lock()
206-
if !s.mu.started {
207-
s.mu.Unlock()
208-
return false, sqlliveness.NotStartedError
209-
}
210-
if _, ok := s.mu.deadSessions.Get(sid); ok {
211-
s.mu.Unlock()
212-
s.metrics.IsAliveCacheHits.Inc(1)
213-
return false, nil
214-
}
215-
if expiration, ok := s.mu.liveSessions.Get(sid); ok {
216-
expiration := expiration.(hlc.Timestamp)
217-
// The record exists and is valid.
218-
if s.clock.Now().Less(expiration) {
219-
s.mu.Unlock()
205+
206+
// If wait is false, alive is set and future is unset.
207+
// If wait is true, alive is unset and future is set.
208+
alive, wait, future, err := func() (bool, bool, singleflight.Future, error) {
209+
s.mu.Lock()
210+
defer s.mu.Unlock()
211+
212+
if !s.mu.started {
213+
return false, false, singleflight.Future{}, sqlliveness.NotStartedError
214+
}
215+
if _, ok := s.mu.deadSessions.Get(sid); ok {
220216
s.metrics.IsAliveCacheHits.Inc(1)
221-
return true, nil
217+
return false, false, singleflight.Future{}, nil
222218
}
223-
}
219+
if expiration, ok := s.mu.liveSessions.Get(sid); ok {
220+
expiration := expiration.(hlc.Timestamp)
221+
// The record exists and is valid.
222+
if s.clock.Now().Less(expiration) {
223+
s.metrics.IsAliveCacheHits.Inc(1)
224+
return true, false, singleflight.Future{}, nil
225+
}
226+
}
227+
228+
// We think that the session is expired; check, and maybe delete it.
229+
future := s.deleteOrFetchSessionSingleFlightLocked(ctx, sid)
224230

225-
// We think that the session is expired; check, and maybe delete it.
226-
future := s.deleteOrFetchSessionSingleFlightLocked(ctx, sid)
231+
// At this point, we know that the singleflight goroutine has been launched.
232+
// Releasing the lock when we return ensures that callers will either join
233+
// the singleflight or see the result.
234+
return false, true, future, nil
235+
}()
236+
if err != nil || !wait {
237+
return alive, err
238+
}
227239

228-
// At this point, we know that the singleflight goroutine has been launched.
229-
// Releasing the lock here ensures that callers will either join the single-
230-
// flight or see the result.
231-
s.mu.Unlock()
232240
s.metrics.IsAliveCacheMisses.Inc(1)
233241

234242
// If we do not want to wait for the result, assume that the session is

pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -664,9 +664,11 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) {
664664
}
665665
var buf bytes.Buffer
666666
for key, stats := range s.mu.stmts {
667-
stats.mu.Lock()
668-
json, err := json.Marshal(stats.mu.data)
669-
stats.mu.Unlock()
667+
json, err := func() ([]byte, error) {
668+
stats.mu.Lock()
669+
defer stats.mu.Unlock()
670+
return json.Marshal(stats.mu.data)
671+
}()
670672
if err != nil {
671673
log.Errorf(ctx, "error while marshaling stats for %q // %q: %v", appName, key.String(), err)
672674
continue

pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -335,17 +335,21 @@ func (s *Container) RecordTransaction(
335335
if created {
336336
estimatedMemAllocBytes :=
337337
stats.sizeUnsafe() + key.Size() + 8 /* hash of transaction key */
338-
s.mu.Lock()
339-
340-
// If the monitor is nil, we do not track memory usage.
341-
if s.mu.acc.Monitor() != nil {
342-
if err := s.mu.acc.Grow(ctx, estimatedMemAllocBytes); err != nil {
343-
delete(s.mu.txns, key)
344-
s.mu.Unlock()
345-
return ErrMemoryPressure
338+
if err := func() error {
339+
s.mu.Lock()
340+
defer s.mu.Unlock()
341+
342+
// If the monitor is nil, we do not track memory usage.
343+
if s.mu.acc.Monitor() != nil {
344+
if err := s.mu.acc.Grow(ctx, estimatedMemAllocBytes); err != nil {
345+
delete(s.mu.txns, key)
346+
return ErrMemoryPressure
347+
}
346348
}
349+
return nil
350+
}(); err != nil {
351+
return err
347352
}
348-
s.mu.Unlock()
349353
}
350354

351355
stats.mu.data.Count++

pkg/util/tracing/tracer.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -527,17 +527,19 @@ func (r *SpanRegistry) testingAll() []*crdbSpan {
527527
// concurrently with this call. swap takes ownership of the spanRefs, and will
528528
// release() them.
529529
func (r *SpanRegistry) swap(parentID tracingpb.SpanID, children []spanRef) {
530-
r.mu.Lock()
531-
r.removeSpanLocked(parentID)
532-
for _, c := range children {
533-
sp := c.Span.i.crdb
534-
sp.withLock(func() {
535-
if !sp.mu.finished {
536-
r.addSpanLocked(sp)
537-
}
538-
})
539-
}
540-
r.mu.Unlock()
530+
func() {
531+
r.mu.Lock()
532+
defer r.mu.Unlock()
533+
r.removeSpanLocked(parentID)
534+
for _, c := range children {
535+
sp := c.Span.i.crdb
536+
sp.withLock(func() {
537+
if !sp.mu.finished {
538+
r.addSpanLocked(sp)
539+
}
540+
})
541+
}
542+
}()
541543
for _, c := range children {
542544
c.release()
543545
}

pkg/workload/histogram/histogram.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,16 @@ func (w *Registry) newNamedHistogramLocked(name string) *NamedHistogram {
7474
func (w *NamedHistogram) Record(elapsed time.Duration) {
7575
w.prometheusHistogram.Observe(float64(elapsed.Nanoseconds()) / float64(time.Second))
7676
w.mu.Lock()
77+
defer w.mu.Unlock()
78+
7779
maxLatency := time.Duration(w.mu.current.HighestTrackableValue())
7880
if elapsed < minLatency {
7981
elapsed = minLatency
8082
} else if elapsed > maxLatency {
8183
elapsed = maxLatency
8284
}
8385

84-
err := w.mu.current.RecordValue(elapsed.Nanoseconds())
85-
w.mu.Unlock()
86-
87-
if err != nil {
86+
if err := w.mu.current.RecordValue(elapsed.Nanoseconds()); err != nil {
8887
// Note that a histogram only drops recorded values that are out of range,
8988
// but we clamp the latency value to the configured range to prevent such
9089
// drops. This code path should never happen.

pkg/workload/ycsb/zipfgenerator.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func computeZetaFromScratch(n uint64, theta float64) (float64, error) {
129129
// according to the Zipf distribution.
130130
func (z *ZipfGenerator) Uint64() uint64 {
131131
z.zipfGenMu.mu.Lock()
132+
defer z.zipfGenMu.mu.Unlock()
132133
u := z.zipfGenMu.r.Float64()
133134
uz := u * z.zipfGenMu.zetaN
134135
var result uint64
@@ -143,24 +144,22 @@ func (z *ZipfGenerator) Uint64() uint64 {
143144
if z.verbose {
144145
fmt.Printf("Uint64[%d, %d] -> %d\n", z.iMin, z.zipfGenMu.iMax, result)
145146
}
146-
z.zipfGenMu.mu.Unlock()
147147
return result
148148
}
149149

150150
// IncrementIMax increments iMax by count and recomputes the internal values
151151
// that depend on it. It throws an error if the recomputation failed.
152152
func (z *ZipfGenerator) IncrementIMax(count uint64) error {
153153
z.zipfGenMu.mu.Lock()
154+
defer z.zipfGenMu.mu.Unlock()
154155
zetaN, err := computeZetaIncrementally(
155156
z.zipfGenMu.iMax+1-z.iMin, z.zipfGenMu.iMax+count+1-z.iMin, z.theta, z.zipfGenMu.zetaN)
156157
if err != nil {
157-
z.zipfGenMu.mu.Unlock()
158158
return errors.Wrap(err, "Could not incrementally compute zeta")
159159
}
160160
z.zipfGenMu.iMax += count
161161
eta := (1 - math.Pow(2.0/float64(z.zipfGenMu.iMax+1-z.iMin), 1.0-z.theta)) / (1.0 - z.zeta2/zetaN)
162162
z.zipfGenMu.eta = eta
163163
z.zipfGenMu.zetaN = zetaN
164-
z.zipfGenMu.mu.Unlock()
165164
return nil
166165
}

0 commit comments

Comments
 (0)