@@ -336,11 +336,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
336336 return aggregatedPerfMetrics , nil
337337 },
338338 Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
339- if benchSpec .duration > 0 {
340- runDecommissionBenchLong (ctx , t , c , benchSpec , timeout )
341- } else {
342- runDecommissionBench (ctx , t , c , benchSpec , timeout )
343- }
339+ runDecommissionBench (ctx , t , c , benchSpec , timeout )
344340 },
345341 })
346342}
@@ -709,151 +705,34 @@ func runDecommissionBench(
709705 time .Sleep (1 * time .Minute )
710706 }
711707
712- m .ExpectDeath ()
713- defer m .ResetDeaths ()
714- err := runSingleDecommission (ctx , c , h , pinnedNode , benchSpec .decommissionNode , & targetNodeAtomic , benchSpec .snapshotRate ,
715- benchSpec .whileDown , benchSpec .drainFirst , false /* reuse */ , benchSpec .whileUpreplicating ,
716- true /* estimateDuration */ , benchSpec .slowWrites , tickByName ,
717- )
718-
719- // Include an additional minute of buffer time post-decommission to gather
720- // workload stats.
721- time .Sleep (1 * time .Minute )
722-
723- return err
724- })
725-
726- m .Go (func (ctx context.Context ) error {
727- hists := reg .GetHandle ()
728-
729- db := c .Conn (ctx , t .L (), pinnedNode )
730- defer db .Close ()
731-
732- return trackBytesUsed (ctx , db , & targetNodeAtomic , hists , tickByName )
733- })
734-
735- if err := m .WaitE (); err != nil {
736- t .Fatal (err )
737- }
738- }
739-
740- // runDecommissionBenchLong initializes a cluster with TPCC and attempts to
741- // benchmark the decommissioning of nodes picked at random before subsequently
742- // wiping them and re-adding them to the cluster to continually execute the
743- // decommissioning process over the runtime of the test. The cluster may or may
744- // not be running under load.
745- func runDecommissionBenchLong (
746- ctx context.Context ,
747- t test.Test ,
748- c cluster.Cluster ,
749- benchSpec decommissionBenchSpec ,
750- testTimeout time.Duration ,
751- ) {
752- // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node
753- // through which we run decommissions. The last node is used for the workload.
754- pinnedNode := 1
755- workloadNode := benchSpec .nodes + 1
756- crdbNodes := c .Range (pinnedNode , benchSpec .nodes )
757- t .L ().Printf ("nodes %d - %d are crdb nodes" , crdbNodes [0 ], crdbNodes [len (crdbNodes )- 1 ])
758- t .L ().Printf ("node %d is the workload node" , workloadNode )
759-
760- maxRate := tpccMaxRate (benchSpec .warehouses )
761- rampDuration := 3 * time .Minute
762- rampStarted := make (chan struct {})
763- importCmd := fmt .Sprintf (
764- `./cockroach workload fixtures import tpcc --warehouses=%d` ,
765- benchSpec .warehouses ,
766- )
767- workloadCmd := fmt .Sprintf ("./cockroach workload run tpcc --warehouses=%d --max-rate=%d --duration=%s " +
768- "%s --ramp=%s --tolerate-errors {pgurl:1-%d}" , maxRate , benchSpec .warehouses ,
769- testTimeout , roachtestutil .GetWorkloadHistogramString (t , c , nil , true ), rampDuration , benchSpec .nodes )
770-
771- setupDecommissionBench (ctx , t , c , benchSpec , pinnedNode , importCmd )
772-
773- workloadCtx , workloadCancel := context .WithCancel (ctx )
774- m := c .NewDeprecatedMonitor (workloadCtx , crdbNodes )
775-
776- if ! benchSpec .noLoad {
777- m .Go (
778- func (ctx context.Context ) error {
779- close (rampStarted )
780-
781- // Run workload indefinitely, to be later killed by context
782- // cancellation once decommission has completed.
783- err := c .RunE (ctx , option .WithNodes (c .Node (workloadNode )), workloadCmd )
784- if errors .Is (ctx .Err (), context .Canceled ) {
785- // Workload intentionally cancelled via context, so don't return error.
786- return nil
787- }
708+ if benchSpec .duration > 0 {
709+ for tBegin := timeutil .Now (); timeutil .Since (tBegin ) <= benchSpec .duration ; {
710+ m .ExpectDeath ()
711+ err := runSingleDecommission (ctx , c , h , pinnedNode , benchSpec .decommissionNode , & targetNodeAtomic , benchSpec .snapshotRate ,
712+ benchSpec .whileDown , benchSpec .drainFirst , true /* reuse */ , benchSpec .whileUpreplicating ,
713+ true /* estimateDuration */ , benchSpec .slowWrites , tickByName ,
714+ )
715+ m .ResetDeaths ()
788716 if err != nil {
789- t . L (). Printf ( "workload error: %s" , err )
717+ return err
790718 }
791- return err
792- },
793- )
794- }
795-
796- // Setup Prometheus/Grafana using workload node.
797- cleanupFunc := setupGrafana (ctx , t , c , crdbNodes , workloadNode )
798- defer cleanupFunc ()
799-
800- // Create a histogram registry for recording multiple decommission metrics.
801- // Note that "decommission.*" metrics are special in that they are
802- // long-running metrics measured by the elapsed time between each tick,
803- // as opposed to the histograms of workload operation latencies or other
804- // recorded values that are typically output in a "tick" each second.
805- reg , tickByName , perfBuf , exporter := createDecommissionBenchPerfArtifacts (t , c ,
806- decommissionMetric , upreplicateMetric , bytesUsedMetric ,
807- )
808-
809- defer func () {
810- if err := exporter .Close (func () error {
811- uploadPerfArtifacts (ctx , t , c , workloadNode , perfBuf )
812- return nil
813- }); err != nil {
814- t .Errorf ("error closing perf exporter: %s" , err )
815- }
816- }()
817-
818- // The logical node id of the current decommissioning node.
819- var targetNodeAtomic uint32
820-
821- m .Go (func (ctx context.Context ) error {
822- defer workloadCancel ()
823-
824- h := newDecommTestHelper (t , c )
825- h .blockFromRandNode (workloadNode )
826-
827- // If we are running a workload, wait until it has started and completed its
828- // ramp time before initiating a decommission.
829- if ! benchSpec .noLoad {
830- <- rampStarted
831- t .Status ("Waiting for workload to ramp up..." )
832- select {
833- case <- ctx .Done ():
834- return ctx .Err ()
835- case <- time .After (rampDuration + 1 * time .Minute ):
836- // Workload ramp-up complete, plus 1 minute of recording workload stats.
837719 }
838- }
839-
840- for tBegin := timeutil .Now (); timeutil .Since (tBegin ) <= benchSpec .duration ; {
720+ // Include an additional minute of buffer time post-decommission to gather
721+ // workload stats.
722+ time .Sleep (1 * time .Minute )
723+ return nil
724+ } else {
841725 m .ExpectDeath ()
726+ defer m .ResetDeaths ()
842727 err := runSingleDecommission (ctx , c , h , pinnedNode , benchSpec .decommissionNode , & targetNodeAtomic , benchSpec .snapshotRate ,
843- benchSpec .whileDown , benchSpec .drainFirst , true /* reuse */ , benchSpec .whileUpreplicating ,
728+ benchSpec .whileDown , benchSpec .drainFirst , false /* reuse */ , benchSpec .whileUpreplicating ,
844729 true /* estimateDuration */ , benchSpec .slowWrites , tickByName ,
845730 )
846- m . ResetDeaths ()
847- if err != nil {
848- return err
849- }
731+ // Include an additional minute of buffer time post-decommission to gather
732+ // workload stats.
733+ time . Sleep ( 1 * time . Minute )
734+ return err
850735 }
851-
852- // Include an additional minute of buffer time post-decommission to gather
853- // workload stats.
854- time .Sleep (1 * time .Minute )
855-
856- return nil
857736 })
858737
859738 m .Go (func (ctx context.Context ) error {
@@ -868,7 +747,6 @@ func runDecommissionBenchLong(
868747 if err := m .WaitE (); err != nil {
869748 t .Fatal (err )
870749 }
871-
872750}
873751
874752// runSingleDecommission picks a random node and attempts to decommission that
0 commit comments