Skip to content

Commit cb0be5f

Browse files
craig[bot]yuzefovichNukitt
committed
153951: sql/export: refactor processors to implement RowSource interface r=yuzefovich a=yuzefovich **distsql: harden infrastructure in some edge cases** As part of looking into a nil pointer crash when rows produced by the EXPORT were used as an input to the mutation (which is fixed in the following commit), I noticed some problems with the distsql infrastructure. In particular, in f397730 we didn't properly implement `Close` method for some processors (two EXPORT ones and the column backfiller) which would mean possibly incomplete cleanup when the processor never runs. Additionally, the root cause of the panic was `nil` txn that previously silently could've been set at the very end of the flow setup if only at that point we decided that we need to create a leaf txn, yet LeafTxnInputState was nil. That condition is expected in some cases (when the flow is not running under a txn, like some bulk flows), yet for EXPORT which runs under a txn, this was unexpected. This commit adds an assertion for this. Furthermore, if we hit an error at this point, we need to perform cleanup differently from earlier error paths since we've already fully set up the flow. This commit also adds that. **sql/export: refactor processors to implement RowSource interface** We recently found an edge case where the output of EXPORT was used as an input to a mutation. Currently, in such a setup we have two goroutines since the export processors don't implement `execinfra.RowSource` interface, so we cannot fuse them with the mutation processor. Presence of concurrency forces usage of the LeafTxns, yet mutations require access to the RootTxn, thus we have conflicting requirements. Before the previous commit we'd get a nil pointer crash, with the previous commit in place we now get an assertion failure. This commit solves this issue by refactoring both processors to implement the `RowSource` interface. In the problematic query it allows all processors to be fused together to run in a single goroutine, so we can use the RootTxn as required by the mutation. Release note (bug fix): Previously, EXPORT CSV and EXPORT PARQUET stmts could result in a node crash when their result rows were used as the input to a mutation like an INSERT within the same SQL. This bug has been present since before 22.1 version and has now been fixed. Fixes: #153292. 154123: drpc, server: implement filtering rules for server initialization state r=cthumuluru-crdb,shubhamdhama a=Nukitt Previously, in DRPC, the `batch` RPC was able to access `node.Descriptor` which was yet to be initialized. This led to a data race, described in #153948. This wasn't the case for gRPC, because of an interceptor that could filter RPCs during different server states. Particularly, it allows only bootstrap, heartbeat, health and gossip methods during initialization to prevent calls to potentially uninitialized services. This patch adds this interceptor support into drpc as well. In-depth discussion on this: #153948 (comment) Epic: None Fixes: #153948 Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Nukitt <[email protected]>
3 parents 19396ff + 5b31f49 + e005120 commit cb0be5f

File tree

11 files changed

+488
-404
lines changed

11 files changed

+488
-404
lines changed

pkg/server/drpc_server.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,17 @@ type drpcServer struct {
3131
// newDRPCServer creates and configures a new drpcServer instance. It enables
3232
// DRPC if the experimental setting is on, otherwise returns a dummy server.
3333
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
34-
d, err := rpc.NewDRPCServer(ctx, rpcCtx)
34+
drpcServer := &drpcServer{}
35+
drpcServer.setMode(modeInitializing)
36+
37+
d, err := rpc.NewDRPCServer(
38+
ctx,
39+
rpcCtx,
40+
rpc.WithInterceptor(
41+
func(path string) error {
42+
return drpcServer.intercept(path)
43+
}),
44+
)
3545
if err != nil {
3646
return nil, err
3747
}
@@ -41,12 +51,8 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error
4151
return nil, err
4252
}
4353

44-
drpcServer := &drpcServer{
45-
DRPCServer: d,
46-
tlsCfg: tlsCfg,
47-
}
48-
49-
drpcServer.setMode(modeInitializing)
54+
drpcServer.DRPCServer = d
55+
drpcServer.tlsCfg = tlsCfg
5056

5157
if err := rpc.DRPCRegisterHeartbeat(drpcServer, rpcCtx.NewHeartbeatService()); err != nil {
5258
return nil, err

pkg/server/grpc_server.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,24 +68,6 @@ func (s *grpcServer) health(ctx context.Context) error {
6868
}
6969
}
7070

71-
var rpcsAllowedWhileBootstrapping = map[string]struct{}{
72-
"/cockroach.rpc.Heartbeat/Ping": {},
73-
"/cockroach.gossip.Gossip/Gossip": {},
74-
"/cockroach.server.serverpb.Init/Bootstrap": {},
75-
"/cockroach.server.serverpb.Admin/Health": {},
76-
}
77-
78-
// intercept implements filtering rules for each server state.
79-
func (s *grpcServer) intercept(fullName string) error {
80-
if s.operational() {
81-
return nil
82-
}
83-
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
84-
return NewWaitingForInitError(fullName)
85-
}
86-
return nil
87-
}
88-
8971
// NewWaitingForInitError creates an error indicating that the server cannot run
9072
// the specified method until the node has been initialized.
9173
func NewWaitingForInitError(methodName string) error {

pkg/server/serve_mode.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ const (
3636
modeDraining
3737
)
3838

39+
var rpcsAllowedWhileBootstrapping = map[string]struct{}{
40+
"/cockroach.rpc.Heartbeat/Ping": {},
41+
"/cockroach.gossip.Gossip/Gossip": {},
42+
"/cockroach.server.serverpb.Init/Bootstrap": {},
43+
"/cockroach.server.serverpb.Admin/Health": {},
44+
}
45+
46+
// intercept implements filtering rules for each server state.
47+
func (s *serveModeHandler) intercept(fullName string) error {
48+
if s.operational() {
49+
return nil
50+
}
51+
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
52+
return NewWaitingForInitError(fullName)
53+
}
54+
return nil
55+
}
56+
3957
func (s *serveMode) set(mode serveMode) {
4058
atomic.StoreInt32((*int32)(s), int32(mode))
4159
}

pkg/sql/backfill/backfill.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,12 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
280280

281281
// Close frees the resources used by the ColumnBackfiller.
282282
func (cb *ColumnBackfiller) Close(ctx context.Context) {
283-
cb.fetcher.Close(ctx)
284283
if cb.mon != nil {
284+
// fetcher is only initialized when mon has been set. If mon is nil,
285+
// then Close has already been called.
286+
cb.fetcher.Close(ctx)
285287
cb.mon.Stop(ctx)
288+
cb.mon = nil
286289
}
287290
}
288291

pkg/sql/distsql/server.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -200,26 +200,29 @@ func (ds *ServerImpl) setupFlow(
200200
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
201201
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
202202
var onFlowCleanupEnd func(context.Context) // will be called at the very end of Flow.Cleanup()
203+
var cleanupPerformed bool
203204
// Make sure that we clean up all resources (which in the happy case are
204205
// cleaned up in Flow.Cleanup()) if an error is encountered.
205206
defer func() {
206207
if retErr != nil {
207-
if monitor != nil {
208-
monitor.Stop(ctx)
209-
}
210-
if diskMonitor != nil {
211-
diskMonitor.Stop(ctx)
212-
}
213-
if onFlowCleanupEnd != nil {
214-
onFlowCleanupEnd(ctx)
215-
} else {
216-
reserved.Close(ctx)
217-
onFlowCleanup.Do()
218-
}
219-
// We finish the span after performing other cleanup in case that
220-
// cleanup accesses the context with the span.
221-
if sp != nil {
222-
sp.Finish()
208+
if !cleanupPerformed {
209+
if monitor != nil {
210+
monitor.Stop(ctx)
211+
}
212+
if diskMonitor != nil {
213+
diskMonitor.Stop(ctx)
214+
}
215+
if onFlowCleanupEnd != nil {
216+
onFlowCleanupEnd(ctx)
217+
} else {
218+
reserved.Close(ctx)
219+
onFlowCleanup.Do()
220+
}
221+
// We finish the span after performing other cleanup in case that
222+
// cleanup accesses the context with the span.
223+
if sp != nil {
224+
sp.Finish()
225+
}
223226
}
224227
retCtx = tracing.ContextWithSpan(ctx, nil)
225228
}
@@ -267,6 +270,9 @@ func (ds *ServerImpl) setupFlow(
267270
makeLeaf := func(ctx context.Context) (*kv.Txn, error) {
268271
tis := req.LeafTxnInputState
269272
if tis == nil {
273+
if localState.Txn != nil {
274+
return nil, errors.AssertionFailedf("nil LeafTxnInputState when trying to create the LeafTxn")
275+
}
270276
// This must be a flow running for some bulk-io operation that doesn't use
271277
// a txn.
272278
return nil, nil
@@ -447,6 +453,12 @@ func (ds *ServerImpl) setupFlow(
447453
if leafTxn == nil {
448454
leafTxn, err = makeLeaf(ctx)
449455
if err != nil {
456+
// Given that we've already fully set up the flow, we must do
457+
// the full cleanup. This supersedes the cleanup done in the
458+
// defer at the beginning of the method, so we mark
459+
// cleanupPerformed accordingly.
460+
f.Cleanup(ctx)
461+
cleanupPerformed = true
450462
return nil, nil, nil, err
451463
}
452464
}

pkg/sql/execinfra/base.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -332,45 +332,6 @@ func DrainAndClose(
332332
dst.ProducerDone()
333333
}
334334

335-
// NoMetadataRowSource is a wrapper on top of a RowSource that automatically
336-
// forwards metadata to a RowReceiver. Data rows are returned through an
337-
// interface similar to RowSource, except that, since metadata is taken care of,
338-
// only the data rows are returned.
339-
//
340-
// The point of this struct is that it'd be burdensome for some row consumers to
341-
// have to deal with metadata.
342-
type NoMetadataRowSource struct {
343-
src RowSource
344-
metadataSink RowReceiver
345-
}
346-
347-
// MakeNoMetadataRowSource builds a NoMetadataRowSource.
348-
func MakeNoMetadataRowSource(src RowSource, sink RowReceiver) NoMetadataRowSource {
349-
return NoMetadataRowSource{src: src, metadataSink: sink}
350-
}
351-
352-
// NextRow is analogous to RowSource.Next. If the producer sends an error, we
353-
// can't just forward it to metadataSink. We need to let the consumer know so
354-
// that it's not under the impression that everything is hunky-dory and it can
355-
// continue consuming rows. So, this interface returns the error. Just like with
356-
// a raw RowSource, the consumer should generally call ConsumerDone() and drain.
357-
func (rs *NoMetadataRowSource) NextRow() (rowenc.EncDatumRow, error) {
358-
for {
359-
row, meta := rs.src.Next()
360-
if meta == nil {
361-
return row, nil
362-
}
363-
if meta.Err != nil {
364-
return nil, meta.Err
365-
}
366-
// We forward the metadata and ignore the returned ConsumerStatus. There's
367-
// no good way to use that status here; eventually the consumer of this
368-
// NoMetadataRowSource will figure out the same status and act on it as soon
369-
// as a non-metadata row is received.
370-
_ = rs.metadataSink.Push(nil /* row */, meta)
371-
}
372-
}
373-
374335
// RowChannelMsg is the message used in the channels that implement
375336
// local physical streams (i.e. the RowChannel's).
376337
type RowChannelMsg struct {

pkg/sql/export/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@ go_library(
1515
"//pkg/settings",
1616
"//pkg/sql/catalog/colinfo",
1717
"//pkg/sql/execinfra",
18+
"//pkg/sql/execinfra/execopnode",
1819
"//pkg/sql/execinfrapb",
1920
"//pkg/sql/pgwire/pgcode",
2021
"//pkg/sql/pgwire/pgerror",
2122
"//pkg/sql/rowenc",
2223
"//pkg/sql/sem/tree",
2324
"//pkg/sql/types",
2425
"//pkg/util/encoding/csv",
26+
"//pkg/util/log",
2527
"//pkg/util/mon",
2628
"//pkg/util/parquet",
27-
"//pkg/util/tracing",
2829
"//pkg/util/unique",
2930
"@com_github_cockroachdb_errors//:errors",
3031
],

0 commit comments

Comments
 (0)