Skip to content

Commit 79fb2a9

Browse files
committed
sql: create new innerInsertStatementDiagnostics function
The original implementation of InsertStatementDiagnostics now lives in a new `innerInsertStatementDiagnostics` func that takes an addition `isql.Txn` argument. Now, `InsertStatementDiagnostics` starts a new transaction and calls `innerInsertStatementDiagnostics`, maintaining the same functionality. This is being done in preperation for transaction diagnostics which need to insert multiple statement diagnostics within the same transasction. Part of: CRDB-5342 Epic: None Release note: None
1 parent d8d00d8 commit 79fb2a9

File tree

1 file changed

+144
-107
lines changed

1 file changed

+144
-107
lines changed

pkg/sql/stmtdiagnostics/statement_diagnostics.go

Lines changed: 144 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,33 @@ func (r *Registry) poll(ctx context.Context) {
210210
}
211211
}
212212

213+
type StmtDiagnostic struct {
214+
requestID RequestID
215+
req Request
216+
stmtFingerprint string
217+
stmt string
218+
bundle []byte
219+
collectionErr error
220+
}
221+
222+
func NewStmtDiagnostic(
223+
requestID RequestID,
224+
req Request,
225+
stmtFingerprint string,
226+
stmt string,
227+
bundle []byte,
228+
collectionErr error,
229+
) StmtDiagnostic {
230+
return StmtDiagnostic{
231+
requestID: requestID,
232+
req: req,
233+
stmtFingerprint: stmtFingerprint,
234+
stmt: stmt,
235+
bundle: bundle,
236+
collectionErr: collectionErr,
237+
}
238+
}
239+
213240
// RequestID is the ID of a diagnostics request, corresponding to the id
214241
// column in statement_diagnostics_requests.
215242
// A zero ID is invalid.
@@ -553,127 +580,137 @@ func (r *Registry) InsertStatementDiagnostics(
553580
var diagID CollectedInstanceID
554581
err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
555582
txn.KV().SetDebugName("stmt-diag-insert-bundle")
556-
if requestID != 0 {
557-
row, err := txn.QueryRowEx(ctx, "stmt-diag-check-completed", txn.KV(),
558-
sessiondata.NodeUserSessionDataOverride,
559-
"SELECT count(1) FROM system.statement_diagnostics_requests WHERE id = $1 AND completed = false",
560-
requestID)
561-
if err != nil {
562-
return err
563-
}
564-
if row == nil {
565-
return errors.New("failed to check completed statement diagnostics")
566-
}
567-
cnt := int(*row[0].(*tree.DInt))
568-
if cnt == 0 {
569-
// Someone else already marked the request as completed. We've traced for nothing.
570-
// This can only happen once per node, per request since we're going to
571-
// remove the request from the registry.
572-
return nil
573-
}
583+
id, err := r.innerInsertStatementDiagnostics(ctx, NewStmtDiagnostic(requestID, req, stmtFingerprint, stmt, bundle, collectionErr), txn)
584+
if err != nil {
585+
return err
574586
}
587+
diagID = id
588+
return nil
589+
})
575590

576-
// Generate the values that will be inserted.
577-
errorVal := tree.DNull
578-
if collectionErr != nil {
579-
errorVal = tree.NewDString(collectionErr.Error())
580-
}
591+
return diagID, err
592+
}
581593

582-
bundleChunksVal := tree.NewDArray(types.Int)
583-
bundleToUpload := bundle
584-
for len(bundleToUpload) > 0 {
585-
chunkSize := int(bundleChunkSize.Get(&r.st.SV))
586-
chunk := bundleToUpload
587-
if len(chunk) > chunkSize {
588-
chunk = chunk[:chunkSize]
589-
}
590-
bundleToUpload = bundleToUpload[len(chunk):]
591-
592-
// Insert the chunk into system.statement_bundle_chunks.
593-
row, err := txn.QueryRowEx(
594-
ctx, "stmt-bundle-chunks-insert", txn.KV(),
595-
sessiondata.NodeUserSessionDataOverride,
596-
"INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id",
597-
"statement diagnostics bundle",
598-
tree.NewDBytes(tree.DBytes(chunk)),
599-
)
600-
if err != nil {
601-
return err
602-
}
603-
if row == nil {
604-
return errors.New("failed to check statement bundle chunk")
605-
}
606-
chunkID := row[0].(*tree.DInt)
607-
if err := bundleChunksVal.Append(chunkID); err != nil {
608-
return err
609-
}
594+
func (r *Registry) innerInsertStatementDiagnostics(
595+
ctx context.Context, diagnostic StmtDiagnostic, txn isql.Txn,
596+
) (CollectedInstanceID, error) {
597+
var diagID CollectedInstanceID
598+
if diagnostic.requestID != 0 {
599+
row, err := txn.QueryRowEx(ctx, "stmt-diag-check-completed", txn.KV(),
600+
sessiondata.NodeUserSessionDataOverride,
601+
"SELECT count(1) FROM system.statement_diagnostics_requests WHERE id = $1 AND completed = false",
602+
diagnostic.requestID)
603+
if err != nil {
604+
return diagID, err
605+
}
606+
if row == nil {
607+
return diagID, errors.New("failed to check completed statement diagnostics")
610608
}
609+
cnt := int(*row[0].(*tree.DInt))
610+
if cnt == 0 {
611+
// Someone else already marked the request as completed. We've traced for nothing.
612+
// This can only happen once per node, per request since we're going to
613+
// remove the request from the registry.
614+
return diagID, nil
615+
}
616+
}
617+
618+
// Generate the values that will be inserted.
619+
errorVal := tree.DNull
620+
if diagnostic.collectionErr != nil {
621+
errorVal = tree.NewDString(diagnostic.collectionErr.Error())
622+
}
611623

612-
collectionTime := timeutil.Now()
624+
bundleChunksVal := tree.NewDArray(types.Int)
625+
bundleToUpload := diagnostic.bundle
626+
for len(bundleToUpload) > 0 {
627+
chunkSize := int(bundleChunkSize.Get(&r.st.SV))
628+
chunk := bundleToUpload
629+
if len(chunk) > chunkSize {
630+
chunk = chunk[:chunkSize]
631+
}
632+
bundleToUpload = bundleToUpload[len(chunk):]
613633

614-
// Insert the collection metadata into system.statement_diagnostics.
634+
// Insert the chunk into system.statement_bundle_chunks.
615635
row, err := txn.QueryRowEx(
616-
ctx, "stmt-diag-insert", txn.KV(),
636+
ctx, "stmt-bundle-chunks-insert", txn.KV(),
617637
sessiondata.NodeUserSessionDataOverride,
618-
"INSERT INTO system.statement_diagnostics "+
619-
"(statement_fingerprint, statement, collected_at, bundle_chunks, error) "+
620-
"VALUES ($1, $2, $3, $4, $5) RETURNING id",
621-
stmtFingerprint, stmt, collectionTime, bundleChunksVal, errorVal,
638+
"INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id",
639+
"statement diagnostics bundle",
640+
tree.NewDBytes(tree.DBytes(chunk)),
622641
)
623642
if err != nil {
624-
return err
643+
return diagID, err
625644
}
626645
if row == nil {
627-
return errors.New("failed to insert statement diagnostics")
628-
}
629-
diagID = CollectedInstanceID(*row[0].(*tree.DInt))
630-
631-
if requestID != 0 {
632-
// Link the request from system.statement_diagnostics_request to the
633-
// diagnostic ID we just collected, marking it as completed if we're
634-
// able.
635-
shouldMarkCompleted := true
636-
if collectUntilExpiration.Get(&r.st.SV) {
637-
// Two other conditions need to hold true for us to continue
638-
// capturing future traces, i.e. not mark this request as
639-
// completed.
640-
// - Requests need to be of the sampling sort (also implies
641-
// there's a latency threshold) -- a crude measure to prevent
642-
// against unbounded collection;
643-
// - Requests need to have an expiration set -- same reason as
644-
// above.
645-
if req.samplingProbability > 0 && !req.expiresAt.IsZero() {
646-
shouldMarkCompleted = false
647-
}
648-
}
649-
_, err := txn.ExecEx(ctx, "stmt-diag-mark-completed", txn.KV(),
650-
sessiondata.NodeUserSessionDataOverride,
651-
"UPDATE system.statement_diagnostics_requests "+
652-
"SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3",
653-
shouldMarkCompleted, diagID, requestID)
654-
if err != nil {
655-
return err
656-
}
657-
} else {
658-
// Insert a completed request into system.statement_diagnostics_request.
659-
// This is necessary because the UI uses this table to discover completed
660-
// diagnostics.
661-
//
662-
// This bundle was collected via explicit EXPLAIN ANALYZE (DEBUG).
663-
_, err := txn.ExecEx(ctx, "stmt-diag-add-completed", txn.KV(),
664-
sessiondata.NodeUserSessionDataOverride,
665-
"INSERT INTO system.statement_diagnostics_requests"+
666-
" (completed, statement_fingerprint, statement_diagnostics_id, requested_at)"+
667-
" VALUES (true, $1, $2, $3)",
668-
stmtFingerprint, diagID, collectionTime)
669-
if err != nil {
670-
return err
671-
}
646+
return diagID, errors.New("failed to check statement bundle chunk")
672647
}
673-
return nil
674-
})
648+
chunkID := row[0].(*tree.DInt)
649+
if err := bundleChunksVal.Append(chunkID); err != nil {
650+
return diagID, err
651+
}
652+
}
653+
654+
collectionTime := timeutil.Now()
655+
656+
// Insert the collection metadata into system.statement_diagnostics.
657+
row, err := txn.QueryRowEx(
658+
ctx, "stmt-diag-insert", txn.KV(),
659+
sessiondata.NodeUserSessionDataOverride,
660+
"INSERT INTO system.statement_diagnostics "+
661+
"(statement_fingerprint, statement, collected_at, bundle_chunks, error) "+
662+
"VALUES ($1, $2, $3, $4, $5) RETURNING id",
663+
diagnostic.stmtFingerprint, diagnostic.stmt, collectionTime, bundleChunksVal, errorVal,
664+
)
675665
if err != nil {
676-
return 0, err
666+
return diagID, err
667+
}
668+
if row == nil {
669+
return diagID, errors.New("failed to insert statement diagnostics")
670+
}
671+
diagID = CollectedInstanceID(*row[0].(*tree.DInt))
672+
673+
if diagnostic.requestID != 0 {
674+
// Link the request from system.statement_diagnostics_request to the
675+
// diagnostic ID we just collected, marking it as completed if we're
676+
// able.
677+
shouldMarkCompleted := true
678+
if collectUntilExpiration.Get(&r.st.SV) {
679+
// Two other conditions need to hold true for us to continue
680+
// capturing future traces, i.e. not mark this request as
681+
// completed.
682+
// - Requests need to be of the sampling sort (also implies
683+
// there's a latency threshold) -- a crude measure to prevent
684+
// against unbounded collection;
685+
// - Requests need to have an expiration set -- same reason as
686+
// above.
687+
if diagnostic.req.samplingProbability > 0 && !diagnostic.req.expiresAt.IsZero() {
688+
shouldMarkCompleted = false
689+
}
690+
}
691+
_, err := txn.ExecEx(ctx, "stmt-diag-mark-completed", txn.KV(),
692+
sessiondata.NodeUserSessionDataOverride,
693+
"UPDATE system.statement_diagnostics_requests "+
694+
"SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3",
695+
shouldMarkCompleted, diagID, diagnostic.requestID)
696+
if err != nil {
697+
return diagID, err
698+
}
699+
} else {
700+
// Insert a completed request into system.statement_diagnostics_request.
701+
// This is necessary because the UI uses this table to discover completed
702+
// diagnostics.
703+
//
704+
// This bundle was collected via explicit EXPLAIN ANALYZE (DEBUG).
705+
_, err := txn.ExecEx(ctx, "stmt-diag-add-completed", txn.KV(),
706+
sessiondata.NodeUserSessionDataOverride,
707+
"INSERT INTO system.statement_diagnostics_requests"+
708+
" (completed, statement_fingerprint, statement_diagnostics_id, requested_at)"+
709+
" VALUES (true, $1, $2, $3)",
710+
diagnostic.stmtFingerprint, diagID, collectionTime)
711+
if err != nil {
712+
return diagID, err
713+
}
677714
}
678715
return diagID, nil
679716
}

0 commit comments

Comments
 (0)