@@ -17,8 +17,6 @@ import (
17
17
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
18
18
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
19
19
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
20
- "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
21
- "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
22
20
"github.com/cockroachdb/cockroach/pkg/roachpb"
23
21
"github.com/cockroachdb/cockroach/pkg/settings"
24
22
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -173,11 +171,6 @@ type Controller interface {
173
171
// replicated to a raft follower, that have not been subject to admission
174
172
// control.
175
173
FollowerStoreWriteBytes (roachpb.StoreID , FollowerStoreWriteBytes )
176
- // AdmitRaftEntry informs admission control of a raft log entry being
177
- // written to storage.
178
- AdmitRaftEntry (
179
- _ context.Context , _ roachpb.TenantID , _ roachpb.StoreID , _ roachpb.RangeID , _ roachpb.ReplicaID ,
180
- leaderTerm uint64 , _ raftpb.Entry )
181
174
replica_rac2.ACWorkQueue
182
175
// GetSnapshotQueue returns the SnapshotQueue which is used for ingesting raft
183
176
// snapshots.
@@ -570,88 +563,6 @@ func (n *controllerImpl) FollowerStoreWriteBytes(
570
563
followerWriteBytes .NumEntries , followerWriteBytes .StoreWorkDoneInfo )
571
564
}
572
565
573
- // AdmitRaftEntry implements the Controller interface. It is only used for the
574
- // RACv1 protocol.
575
- func (n * controllerImpl ) AdmitRaftEntry (
576
- ctx context.Context ,
577
- tenantID roachpb.TenantID ,
578
- storeID roachpb.StoreID ,
579
- rangeID roachpb.RangeID ,
580
- replicaID roachpb.ReplicaID ,
581
- leaderTerm uint64 ,
582
- entry raftpb.Entry ,
583
- ) {
584
- typ , _ , err := raftlog .EncodingOf (entry )
585
- if err != nil {
586
- log .Errorf (ctx , "unable to determine raft command encoding: %v" , err )
587
- return
588
- }
589
- if ! typ .UsesAdmissionControl () {
590
- return // nothing to do
591
- }
592
- meta , err := raftlog .DecodeRaftAdmissionMeta (entry .Data )
593
- if err != nil {
594
- log .Errorf (ctx , "unable to decode raft command admission data: %v" , err )
595
- return
596
- }
597
-
598
- if log .V (1 ) {
599
- log .Infof (ctx , "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n%d,s%s] tenant=t%d tokens≈%d sideloaded=%t raft-entry=%d/%d" ,
600
- admissionpb .WorkPriority (meta .AdmissionPriority ),
601
- meta .AdmissionCreateTime ,
602
- meta .AdmissionOriginNode ,
603
- n .nodeID .Get (),
604
- storeID ,
605
- tenantID .ToUint64 (),
606
- kvflowcontrol .Tokens (len (entry .Data )),
607
- typ .IsSideloaded (),
608
- entry .Term ,
609
- entry .Index ,
610
- )
611
- }
612
-
613
- storeAdmissionQ := n .storeGrantCoords .TryGetQueueForStore (storeID )
614
- if storeAdmissionQ == nil {
615
- log .Errorf (ctx , "unable to find queue for store: %s" , storeID )
616
- return // nothing to do
617
- }
618
-
619
- if len (entry .Data ) == 0 {
620
- log .Fatal (ctx , "found (unexpected) empty raft command for below-raft admission" )
621
- }
622
- wi := admission.WorkInfo {
623
- TenantID : tenantID ,
624
- Priority : admissionpb .WorkPriority (meta .AdmissionPriority ),
625
- CreateTime : meta .AdmissionCreateTime ,
626
- BypassAdmission : false ,
627
- RequestedCount : int64 (len (entry .Data )),
628
- }
629
- wi .ReplicatedWorkInfo = admission.ReplicatedWorkInfo {
630
- Enabled : true ,
631
- RangeID : rangeID ,
632
- ReplicaID : replicaID ,
633
- LeaderTerm : leaderTerm ,
634
- LogPosition : admission.LogPosition {
635
- Term : entry .Term ,
636
- Index : entry .Index ,
637
- },
638
- Origin : meta .AdmissionOriginNode ,
639
- IsV2Protocol : false ,
640
- Ingested : typ .IsSideloaded (),
641
- }
642
-
643
- handle , err := storeAdmissionQ .Admit (ctx , admission.StoreWriteWorkInfo {
644
- WorkInfo : wi ,
645
- })
646
- if err != nil {
647
- log .Errorf (ctx , "error while admitting to store admission queue: %v" , err )
648
- return
649
- }
650
- if handle .UseAdmittedWorkDone () {
651
- log .Fatalf (ctx , "unexpected handle.UseAdmittedWorkDone" )
652
- }
653
- }
654
-
655
566
var _ replica_rac2.ACWorkQueue = & controllerImpl {}
656
567
657
568
// Admit implements replica_rac2.ACWorkQueue. It is only used for the RACv2 protocol.
@@ -681,10 +592,8 @@ func (n *controllerImpl) Admit(ctx context.Context, entry replica_rac2.EntryForA
681
592
Term : 0 , // Ignored by callback in RACv2.
682
593
Index : entry .CallbackState .Mark .Index ,
683
594
},
684
- Origin : 0 ,
685
- RaftPri : entry .CallbackState .Priority ,
686
- IsV2Protocol : true ,
687
- Ingested : entry .Ingested ,
595
+ RaftPri : entry .CallbackState .Priority ,
596
+ Ingested : entry .Ingested ,
688
597
}
689
598
690
599
handle , err := storeAdmissionQ .Admit (ctx , admission.StoreWriteWorkInfo {
0 commit comments