@@ -8,17 +8,17 @@ import (
88 "sync"
99 "time"
1010
11- "github.com/getsentry/sentry-go"
12- "go.opentelemetry.io/otel/attribute"
13- "go.opentelemetry.io/otel/codes"
14- "go.opentelemetry.io/otel/trace"
15- clientset "k8s.io/client-go/kubernetes"
11+ "github.com/opslevel/opslevel-runner/signal"
1612
13+ "github.com/getsentry/sentry-go"
1714 "github.com/opslevel/opslevel-go/v2024"
1815 "github.com/opslevel/opslevel-runner/pkg"
1916 "github.com/rs/zerolog/log"
2017 "github.com/spf13/cobra"
2118 "github.com/spf13/viper"
19+ "go.opentelemetry.io/otel/attribute"
20+ "go.opentelemetry.io/otel/codes"
21+ "go.opentelemetry.io/otel/trace"
2222)
2323
2424// previewCmd represents the preview command
@@ -58,20 +58,17 @@ func doRun(cmd *cobra.Command, args []string) {
5858
5959 pkg .StartMetricsServer (string (runner .Id ), viper .GetInt ("metrics-port" ))
6060
61- if viper .GetBool ("scaling-enabled" ) {
62- config , err := pkg .GetKubernetesConfig ()
63- pkg .CheckErr (err )
64-
65- k8sClient := clientset .NewForConfigOrDie (config )
61+ ctx := signal .Init (context .Background ())
6662
67- log .Info ().Msgf ("electing leader..." )
68- go electLeader (k8sClient , runner .Id )
63+ if viper .GetBool ("scaling-enabled" ) {
64+ leaseLockName := viper .GetString ("runner-deployment" )
65+ leaseLockNamespace := viper .GetString ("runner-pod-namespace" )
66+ lockIdentity := viper .GetString ("runner-pod-name" )
67+ cobra .CheckErr (pkg .RunLeaderElection (ctx , runner .Id , leaseLockName , lockIdentity , leaseLockNamespace ))
6968 }
7069
71- stop := pkg .InitSignalHandler ()
72- wg := startWorkers (runner .Id , stop )
73- <- stop // Enter Forever Loop
74- log .Info ().Msgf ("interupt - waiting for jobs to complete ..." )
70+ wg := startWorkers (ctx , runner .Id )
71+ time .Sleep (1 * time .Second )
7572 wg .Wait ()
7673 log .Info ().Msgf ("Unregister runner for id '%s'..." , runner .Id )
7774 err = client .RunnerUnregister (runner .Id )
@@ -81,23 +78,15 @@ func doRun(cmd *cobra.Command, args []string) {
8178 }
8279}
8380
84- func electLeader (k8sClient * clientset.Clientset , runnerId opslevel.ID ) {
85- leaseLockName := viper .GetString ("runner-deployment" )
86- leaseLockNamespace := viper .GetString ("runner-pod-namespace" )
87- lockIdentity := viper .GetString ("runner-pod-name" )
88-
89- pkg .RunLeaderElection (k8sClient , runnerId , leaseLockName , lockIdentity , leaseLockNamespace )
90- }
91-
92- func startWorkers (runnerId opslevel.ID , stop <- chan struct {}) * sync.WaitGroup {
81+ func startWorkers (ctx context.Context , runnerId opslevel.ID ) * sync.WaitGroup {
9382 wg := sync.WaitGroup {}
9483 concurrency := getConcurrency ()
9584 wg .Add (concurrency )
9685 jobQueue := make (chan opslevel.RunnerJob )
9786 for w := 1 ; w <= concurrency ; w ++ {
98- go jobWorker (& wg , w , runnerId , jobQueue )
87+ go jobWorker (ctx , & wg , w , runnerId , jobQueue )
9988 }
100- go jobPoller (runnerId , stop , jobQueue )
89+ go jobPoller (ctx , runnerId , jobQueue )
10190 return & wg
10291}
10392
@@ -109,7 +98,7 @@ func getConcurrency() int {
10998 return concurrency
11099}
111100
112- func jobWorker (wg * sync.WaitGroup , index int , runnerId opslevel.ID , jobQueue <- chan opslevel.RunnerJob ) {
101+ func jobWorker (ctx context. Context , wg * sync.WaitGroup , index int , runnerId opslevel.ID , jobQueue <- chan opslevel.RunnerJob ) {
113102 logMaxBytes := viper .GetInt ("job-pod-log-max-size" )
114103 logMaxDuration := time .Duration (viper .GetInt ("job-pod-log-max-interval" )) * time .Second
115104 logPrefix := func () string { return fmt .Sprintf ("%s [%d] " , time .Now ().UTC ().Format (time .RFC3339 ), index ) }
@@ -122,7 +111,6 @@ func jobWorker(wg *sync.WaitGroup, index int, runnerId opslevel.ID, jobQueue <-c
122111 logger .Info ().Msgf ("Starting job processor %d ..." , index )
123112 defer wg .Done ()
124113 for job := range jobQueue {
125- ctx := context .Background ()
126114 jobId := job .Id
127115 jobNumber := job .Number ()
128116
@@ -142,13 +130,13 @@ func jobWorker(wg *sync.WaitGroup, index int, runnerId opslevel.ID, jobQueue <-c
142130 pkg .MetricJobsProcessing .Inc ()
143131 logger .Info ().Msgf ("Starting job '%s'" , jobNumber )
144132
145- go streamer .Run ()
146- ctx , spanStart := tracer .Start (ctx , "start-job" ,
133+ go streamer .Run (ctx )
134+ traceCtx , spanStart := tracer .Start (ctx , "start-job" ,
147135 trace .WithSpanKind (trace .SpanKindConsumer ),
148136 trace .WithAttributes (attribute .String ("job" , jobNumber )),
149137 )
150- outcome := runner .Run (job , streamer .Stdout , streamer .Stderr )
151- _ , spanFinish := tracer .Start (ctx ,
138+ outcome := runner .Run (ctx , job , streamer .Stdout , streamer .Stderr )
139+ _ , spanFinish := tracer .Start (traceCtx ,
152140 "finish-job" ,
153141 trace .WithSpanKind (trace .SpanKindConsumer ),
154142 trace .WithAttributes (attribute .String ("job" , jobNumber )))
@@ -185,15 +173,15 @@ func jobWorker(wg *sync.WaitGroup, index int, runnerId opslevel.ID, jobQueue <-c
185173 logger .Info ().Msgf ("Shutting down job processor %d ..." , index )
186174}
187175
188- func jobPoller (runnerId opslevel. ID , stop <- chan struct {} , jobQueue chan <- opslevel.RunnerJob ) {
176+ func jobPoller (ctx context. Context , runnerId opslevel. ID , jobQueue chan <- opslevel.RunnerJob ) {
189177 logger := log .With ().Int ("worker" , 0 ).Logger ()
190178 client := pkg .NewGraphClient ()
191179 token := opslevel .ID ("" )
192180 pollWaitTime := time .Second * time .Duration (viper .GetInt ("poll-interval" ))
193181 logger .Info ().Msg ("Starting polling for jobs" )
194182 for {
195183 select {
196- case <- stop :
184+ case <- ctx . Done () :
197185 logger .Info ().Msg ("Stopped Polling for jobs ..." )
198186 close (jobQueue )
199187 return
0 commit comments