Skip to content

Commit b464ea9

Browse files
craig[bot]arulajmani
andcommitted
Merge #148534
148534: kv: add more tracing on the commit path r=arulajmani a=arulajmani See individual commits for details. `@tbg` I don't imagine any of this to have made a big difference in the investigation FWIW. I went through the entire KV commit path and added tracing for bits that may be interesting -- happy to take things out if any of them seem overkill or useless. Co-authored-by: Arul Ajmani <[email protected]>
2 parents f217a8f + 848fd26 commit b464ea9

File tree

5 files changed

+53
-24
lines changed

5 files changed

+53
-24
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
14721472
}
14731473
if swapIdx == -1 {
14741474
// No pre-commit QueryIntents. Nothing to split.
1475+
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
14751476
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
14761477
}
14771478

@@ -1504,6 +1505,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15041505
qiResponseCh := make(chan response, 1)
15051506

15061507
sendPreCommit := func(ctx context.Context) {
1508+
log.VEvent(ctx, 3, "sending split out pre-commit QueryIntent batch")
15071509
// Map response index to the original un-swapped batch index.
15081510
// Remember that we moved the last QueryIntent in this batch
15091511
// from swapIdx to the end.
@@ -1562,7 +1564,9 @@ func (ds *DistSender) divideAndSendParallelCommit(
15621564

15631565
// Wait for the QueryIntent-only batch to complete and stitch
15641566
// the responses together.
1567+
log.VEvent(ctx, 3, "waiting for pre-commit QueryIntent batch response")
15651568
qiReply := <-qiResponseCh
1569+
log.VEventf(ctx, 3, "received pre-commit QueryIntent batch response")
15661570

15671571
// Handle error conditions.
15681572
if pErr != nil {

pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func (tc *txnCommitter) SendLocked(
195195
// so interceptors above the txnCommitter in the stack don't need to be
196196
// made aware that the record is staging.
197197
pErr = maybeRemoveStagingStatusInErr(pErr)
198+
log.VEventf(ctx, 2, "batch with EndTxn(commit=true) failed: %v", pErr)
198199
return nil, pErr
199200
}
200201

@@ -216,6 +217,7 @@ func (tc *txnCommitter) SendLocked(
216217
// the EndTxn request, either because canCommitInParallel returned false
217218
// or because there were no unproven in-flight writes (see txnPipeliner)
218219
// and there were no writes in the batch request.
220+
log.VEventf(ctx, 2, "parallel commit attempt for transaction %s resulted in explicit commit", br.Txn)
219221
return br, nil
220222
default:
221223
return nil, kvpb.NewErrorf("unexpected response status without error: %v", br.Txn)
@@ -296,6 +298,8 @@ func (tc *txnCommitter) validateEndTxnBatch(ba *kvpb.BatchRequest) error {
296298
func (tc *txnCommitter) sendLockedWithElidedEndTxn(
297299
ctx context.Context, ba *kvpb.BatchRequest, et *kvpb.EndTxnRequest,
298300
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
301+
log.VEventf(ctx, 2, "eliding EndTxn request for read-only, non-locking transaction")
302+
299303
// Send the batch without its final request, which we know to be the EndTxn
300304
// request that we're eliding. If this would result in us sending an empty
301305
// batch, mock out a reply instead of sending anything.

pkg/kv/kvpb/api.proto

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,18 +1027,24 @@ message EndTxnResponse {
10271027
// The commit timestamp of the STAGING transaction record written
10281028
// by the request. Only set if the transaction record was staged.
10291029
util.hlc.Timestamp staging_timestamp = 5 [(gogoproto.nullable) = false];
1030-
// ReplicatedLocksReleasedOnCommit, if non-empty, indicate that replicated
1031-
// locks with strength Shared or Exclusive were released in the referenced key
1032-
// spans when committing this transaction. Notably, this field is left unset
1033-
// if only write intents were resolved. The field is also left unset for
1034-
// transactions that aborted.
1030+
// ReplicatedLocalLocksReleasedOnCommit, if non-empty, indicate that
1031+
// replicated locks with strength Shared or Exclusive were released in the
1032+
// referenced key spans when committing this transaction. These locks are
1033+
// local to the range on which the EndTxn request evaluated. Notably, this
1034+
// field is left unset if only write intents were resolved. The field is only
1035+
// set when transactions are explicitly marked as committed.
10351036
//
10361037
// The caller must bump the timestamp cache across these spans to the
1037-
// transaction's commit timestamp. Doing so ensures that the released locks
1038-
// (acquired by the now committed transaction) continue to provide protection
1039-
// against other writers up to the commit timestamp, even after the locks have
1040-
// been released.
1041-
repeated Span replicated_locks_released_on_commit = 6 [(gogoproto.nullable) = false];
1038+
// transaction's commit timestamp. Doing so ensures that the released local[1]
1039+
// locks (acquired by the now committed transaction) continue to provide
1040+
// protection against other writers up to the commit timestamp, even after the
1041+
// locks have been released.
1042+
//
1043+
// [1] Non-local replicated locks provide the same protection, however, the
1044+
// mechanism of bumping the timestamp cache is different there. See the
1045+
// ReplicatedLocksReleasedCommitTimestamp field on
1046+
// ResolveIntent{,Range}Response.
1047+
repeated Span replicated_local_locks_released_on_commit = 6 [(gogoproto.nullable) = false];
10421048
}
10431049

10441050
// An AdminSplitRequest is the argument to the AdminSplit() method. The

pkg/kv/kvserver/batcheval/cmd_end_transaction.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ func EndTxn(
273273

274274
// Fetch existing transaction.
275275
var existingTxn roachpb.Transaction
276+
log.VEventf(
277+
ctx, 2, "checking to see if transaction record already exists for txn: %s", h.Txn,
278+
)
276279
recordAlreadyExisted, err := storage.MVCCGetProto(
277280
ctx, readWriter, key, hlc.Timestamp{}, &existingTxn, storage.MVCCGetOptions{
278281
ReadCategory: fs.BatchEvalReadCategory,
@@ -281,6 +284,7 @@ func EndTxn(
281284
if err != nil {
282285
return result.Result{}, err
283286
} else if !recordAlreadyExisted {
287+
log.VEvent(ctx, 2, "no existing txn record found")
284288
// No existing transaction record was found - create one by writing it
285289
// below in updateFinalizedTxn.
286290
reply.Txn = h.Txn.Clone()
@@ -290,10 +294,12 @@ func EndTxn(
290294
// an aborted txn record.
291295
if args.Commit {
292296
if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, reply.Txn); err != nil {
297+
log.VEventf(ctx, 2, "cannot create transaction record: %v", err)
293298
return result.Result{}, err
294299
}
295300
}
296301
} else {
302+
log.VEventf(ctx, 2, "existing transaction record found: %s", existingTxn)
297303
// We're using existingTxn on the reply, although it can be stale
298304
// compared to the Transaction in the request (e.g. the Sequence,
299305
// and various timestamps). We must be careful to update it with the
@@ -317,8 +323,11 @@ func EndTxn(
317323
"already committed")
318324

319325
case roachpb.ABORTED:
326+
// The transaction has already been aborted by someone else.
327+
log.VEventf(
328+
ctx, 2, "transaction %s found to have be already aborted (by someone else)", reply.Txn,
329+
)
320330
if !args.Commit {
321-
// The transaction has already been aborted by other.
322331
// Do not return TransactionAbortedError since the client anyway
323332
// wanted to abort the transaction.
324333
resolvedLocks, _, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
@@ -379,6 +388,7 @@ func EndTxn(
379388
// not consider the transaction to be performing a parallel commit and
380389
// potentially already implicitly committed because we know that the
381390
// transaction restarted since entering the STAGING state.
391+
log.VEventf(ctx, 2, "request with newer epoch %d than STAGING txn record; parallel commit must have failed", h.Txn.Epoch)
382392
reply.Txn.Status = roachpb.PENDING
383393
default:
384394
panic("unreachable")
@@ -543,13 +553,18 @@ func EndTxn(
543553
txnResult.Local.ResolvedLocks = resolvedLocks
544554

545555
if reply.Txn.Status == roachpb.COMMITTED {
546-
// Return whether replicated {shared, exclusive} locks were released by
547-
// the committing transaction. If such locks were released, we still
548-
// need to make sure other transactions can't write underneath the
549-
// transaction's commit timestamp to the key spans previously protected
550-
// by the locks. We return the spans on the response and update the
551-
// timestamp cache a few layers above to ensure this.
552-
reply.ReplicatedLocksReleasedOnCommit = releasedReplLocks
556+
if len(releasedReplLocks) != 0 {
557+
// Return that local replicated {shared, exclusive} locks were released by
558+
// the committing transaction. If such locks were released, we still need
559+
// to make sure other transactions can't write underneath the
560+
// transaction's commit timestamp to the key spans previously protected by
561+
// the locks. We return the spans on the response and update the timestamp
562+
// cache a few layers above to ensure this.
563+
reply.ReplicatedLocalLocksReleasedOnCommit = releasedReplLocks
564+
log.VEventf(
565+
ctx, 2, "committed transaction released local replicated shared/exclusive locks",
566+
)
567+
}
553568

554569
// Run the commit triggers if successfully committed.
555570
triggerResult, err := RunCommitTrigger(

pkg/kv/kvserver/replica_tscache.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ func (r *Replica) updateTimestampCache(
170170
// transaction's MinTimestamp, which is consulted in CanCreateTxnRecord.
171171
key := transactionTombstoneMarker(start, txnID)
172172
addToTSCache(key, nil, ts, txnID)
173-
// Additionally, EndTxn requests that release replicated locks for
174-
// committed transactions bump the timestamp cache over those lock
175-
// spans to the commit timestamp of the transaction to ensure that
176-
// the released locks continue to provide protection against writes
177-
// underneath the transaction's commit timestamp.
178-
for _, sp := range resp.(*kvpb.EndTxnResponse).ReplicatedLocksReleasedOnCommit {
173+
// Additionally, EndTxn requests that release local replicated locks for
174+
// committed transactions bump the timestamp cache over those lock spans
175+
// to the commit timestamp of the transaction to ensure that the released
176+
// locks continue to provide protection against writes underneath the
177+
// transaction's commit timestamp.
178+
for _, sp := range resp.(*kvpb.EndTxnResponse).ReplicatedLocalLocksReleasedOnCommit {
179179
addToTSCache(sp.Key, sp.EndKey, br.Txn.WriteTimestamp, txnID)
180180
}
181181
case *kvpb.HeartbeatTxnRequest:

0 commit comments

Comments
 (0)