@@ -91,6 +91,16 @@ type cdcTester struct {
9191 doneCh chan struct {}
9292}
9393
94+ // The node on which the webhook sink will be installed and run on.
95+ func (ct * cdcTester ) webhookSinkNode () option.NodeListOption {
96+ return ct .cluster .Node (ct .cluster .Spec ().NodeCount )
97+ }
98+
99+ // The node on which the kafka sink will be installed and run on.
100+ func (ct * cdcTester ) kafkaSinkNode () option.NodeListOption {
101+ return ct .cluster .Node (ct .cluster .Spec ().NodeCount )
102+ }
103+
94104// startStatsCollection sets the start point of the stats collection window
95105// and returns a function which should be called at the end of the test to dump a
96106// stats.json file to the artifacts directory.
@@ -158,7 +168,7 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
158168 sinkURI = `experimental-gs://cockroach-tmp/roachtest/` + ts + "?AUTH=implicit"
159169 case webhookSink :
160170 ct .t .Status ("webhook install" )
161- webhookNode := ct .cluster . Node ( ct . cluster . Spec (). NodeCount )
171+ webhookNode := ct .webhookSinkNode ( )
162172 rootFolder := `/home/ubuntu`
163173 nodeIPs , _ := ct .cluster .ExternalIP (ct .ctx , ct .logger , webhookNode )
164174
@@ -184,21 +194,6 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
184194 ct .t .Fatal (err )
185195 }
186196
187- // As seen in #107061, this can hit a 503 Service Unavailable when
188- // trying to download the package, so we retry every 30 seconds
189- // for up to 5 mins below.
190- err = retry .WithMaxAttempts (ct .ctx , retry.Options {
191- InitialBackoff : 30 * time .Second ,
192- Multiplier : 1 ,
193- }, 10 , func () error {
194- err = ct .cluster .RunE (ct .ctx , webhookNode , `sudo apt --yes install golang-go;` )
195- err = errors .Wrap (err , "infrastructure failure; could not install golang" )
196- return err
197- })
198- if err != nil {
199- ct .t .Skip (err )
200- }
201-
202197 // Start the server in its own monitor to not block ct.mon.Wait()
203198 serverExecCmd := fmt .Sprintf (`go run webhook-server-%d.go` , webhookPort )
204199 m := ct .cluster .NewMonitor (ct .ctx , ct .workloadNode )
@@ -219,7 +214,7 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
219214 case pubsubSink :
220215 sinkURI = changefeedccl .GcpScheme + `://cockroach-ephemeral` + "?AUTH=implicit&topic_name=pubsubSink-roachtest®ion=us-east1"
221216 case kafkaSink :
222- kafkaNode := ct .cluster . Node ( ct . cluster . Spec (). NodeCount )
217+ kafkaNode := ct .kafkaSinkNode ( )
223218 kafka := kafkaManager {
224219 t : ct .t ,
225220 c : ct .cluster ,
@@ -1303,6 +1298,13 @@ func registerCDC(r registry.Registry) {
13031298 ct := newCDCTester (ctx , t , c )
13041299 defer ct .Close ()
13051300
1301+ // Consider an installation failure to be a flake which is out of
1302+ // our control. This should be rare.
1303+ err := c .Install (ctx , t .L (), ct .webhookSinkNode (), "go" )
1304+ if err != nil {
1305+ t .Skip (err )
1306+ }
1307+
13061308 ct .runTPCCWorkload (tpccArgs {warehouses : 100 , duration : "30m" })
13071309
13081310 // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses
@@ -1351,7 +1353,7 @@ func registerCDC(r registry.Registry) {
13511353
13521354 ct .runTPCCWorkload (tpccArgs {warehouses : 1 })
13531355
1354- kafkaNode := ct .cluster . Node ( ct . cluster . Spec (). NodeCount )
1356+ kafkaNode := ct .kafkaSinkNode ( )
13551357 kafka := kafkaManager {
13561358 t : ct .t ,
13571359 c : ct .cluster ,
0 commit comments