@@ -25,6 +25,7 @@ import (
2525
2626 "github.com/codefresh-io/go/venona/pkg/codefresh"
2727 "github.com/codefresh-io/go/venona/pkg/logger"
28+ "github.com/codefresh-io/go/venona/pkg/monitoring"
2829 "github.com/codefresh-io/go/venona/pkg/runtime"
2930 "github.com/codefresh-io/go/venona/pkg/task"
3031 retryablehttp "github.com/hashicorp/go-retryablehttp"
3940 errIDRequired = errors .New ("ID options is required" )
4041 errRuntimesRequired = errors .New ("Runtimes options is required" )
4142 errLoggerRequired = errors .New ("Logger options is required" )
43+ errRuntimeNotFound = errors .New ("Runtime environment not found" )
4244 errFailedToParseAgentTask = errors .New ("Failed to parse agent task spec" )
4345 errUknownAgentTaskType = errors .New ("Agent task has unknown type" )
4446 errAgentTaskMalformedParams = errors .New ("failed to marshal agent task params" )
4951const (
5052 defaultTaskPullingInterval = time .Second * 3
5153 defaultStatusReportingInterval = time .Second * 10
52- defaultProxyRequestTimeout = time .Second * 10
54+ defaultProxyRequestTimeout = time .Second * 20
5355 defaultProxyRequestRetries = 3
5456)
5557
6264 Logger logger.Logger
6365 TaskPullingSecondsInterval time.Duration
6466 StatusReportingSecondsInterval time.Duration
67+ Monitor monitoring.Monitor
6568 }
6669
6770 // Agent holds all the references from Codefresh
7780 lastStatus Status
7881 terminationChan chan struct {}
7982 wg * sync.WaitGroup
83+ monitor monitoring.Monitor
8084 }
8185
8286 // Status of the agent
@@ -121,6 +125,11 @@ func New(opt *Options) (*Agent, error) {
121125 terminationChan := make (chan struct {})
122126 wg := & sync.WaitGroup {}
123127
128+ if opt .Monitor == nil {
129+ opt .Monitor = monitoring .NewEmpty ()
130+ }
131+ httpClient .HTTPClient .Transport = opt .Monitor .NewRoundTripper (httpClient .HTTPClient .Transport )
132+
124133 return & Agent {
125134 id ,
126135 cf ,
@@ -132,6 +141,7 @@ func New(opt *Options) (*Agent, error) {
132141 Status {},
133142 terminationChan ,
134143 wg ,
144+ opt .Monitor ,
135145 }, nil
136146}
137147
@@ -180,12 +190,12 @@ func (a *Agent) startTaskPullerRoutine() {
180190 return
181191 case <- a .taskPullerTicker .C :
182192 a .wg .Add (1 )
183- go func (client codefresh.Codefresh , runtimes map [string ]runtime.Runtime , wg * sync.WaitGroup , logger logger.Logger ) {
193+ go func (client codefresh.Codefresh , runtimes map [string ]runtime.Runtime , wg * sync.WaitGroup , logger logger.Logger , monitor monitoring. Monitor ) {
184194 tasks := pullTasks (client , logger )
185- startTasks (tasks , runtimes , logger )
195+ startTasks (tasks , runtimes , logger , monitor )
186196 time .Sleep (time .Second * 10 )
187197 wg .Done ()
188- }(a .cf , a .runtimes , a .wg , a .log )
198+ }(a .cf , a .runtimes , a .wg , a .log , a . monitor )
189199 }
190200 }
191201}
@@ -229,7 +239,7 @@ func pullTasks(client codefresh.Codefresh, logger logger.Logger) []task.Task {
229239 return tasks
230240}
231241
232- func startTasks (tasks []task.Task , runtimes map [string ]runtime.Runtime , logger logger.Logger ) {
242+ func startTasks (tasks []task.Task , runtimes map [string ]runtime.Runtime , logger logger.Logger , monitor monitoring. Monitor ) {
233243 creationTasks := []task.Task {}
234244 deletionTasks := []task.Task {}
235245 agentTasks := []task.Task {}
@@ -253,39 +263,54 @@ func startTasks(tasks []task.Task, runtimes map[string]runtime.Runtime, logger l
253263 for i := range agentTasks {
254264 t := agentTasks [i ]
255265 logger .Info ("executing agent task" , "tid" , t .Metadata .Workflow )
266+ txn := newTransaction (monitor , t .Type , t .Metadata .Workflow , t .Metadata .ReName )
256267 if err := executeAgentTask (& t , logger ); err != nil {
257268 logger .Error (err .Error ())
269+ noticeError (txn , err , logger )
258270 }
271+ endTransaction (txn , logger )
259272 }
260273
261274 // process creation tasks
262275 for _ , tasks := range groupTasks (creationTasks ) {
263276 reName := tasks [0 ].Metadata .ReName
264277 runtime , ok := runtimes [reName ]
278+ txn := newTransaction (monitor , "start-workflow" , tasks [0 ].Metadata .Workflow , reName )
279+
265280 if ! ok {
266281 logger .Error ("Runtime not found" , "workflow" , tasks [0 ].Metadata .Workflow , "runtime" , reName )
282+ noticeError (txn , errRuntimeNotFound , logger )
283+ endTransaction (txn , logger )
267284 continue
268285 }
269286 logger .Info ("Starting workflow" , "workflow" , tasks [0 ].Metadata .Workflow , "runtime" , reName )
270287 if err := runtime .StartWorkflow (tasks ); err != nil {
271288 logger .Error (err .Error ())
289+ noticeError (txn , err , logger )
272290 }
291+ endTransaction (txn , logger )
273292 }
274293
275294 // process deletion tasks
276295 for _ , tasks := range groupTasks (deletionTasks ) {
277296 reName := tasks [0 ].Metadata .ReName
278297 runtime , ok := runtimes [reName ]
298+ txn := newTransaction (monitor , "terminate-workflow" , tasks [0 ].Metadata .Workflow , reName )
299+
279300 if ! ok {
280301 logger .Error ("Runtime not found" , "workflow" , tasks [0 ].Metadata .Workflow , "runtime" , reName )
302+ noticeError (txn , errRuntimeNotFound , logger )
303+ endTransaction (txn , logger )
281304 continue
282305 }
283306 logger .Info ("Terminating workflow" , "workflow" , tasks [0 ].Metadata .Workflow , "runtime" , reName )
284307 if errs := runtime .TerminateWorkflow (tasks ); len (errs ) != 0 {
285308 for _ , err := range errs {
286309 logger .Error (err .Error ())
310+ noticeError (txn , err , logger )
287311 }
288312 }
313+ endTransaction (txn , logger )
289314 }
290315}
291316
@@ -388,6 +413,26 @@ func checkOptions(opt *Options) error {
388413 return nil
389414}
390415
416+ func newTransaction (monitor monitoring.Monitor , taskType , tid , runtime string ) monitoring.Transaction {
417+ txn := monitor .NewTransaction ("runner-tasks-execution" , nil , nil )
418+ _ = txn .AddAttribute ("task-type" , taskType )
419+ _ = txn .AddAttribute ("tid" , tid )
420+ _ = txn .AddAttribute ("runtime-environment" , runtime )
421+ return txn
422+ }
423+
424+ func noticeError (txn monitoring.Transaction , error error , log logger.Logger ) {
425+ if err := txn .NoticeError (error ); err != nil {
426+ log .Error ("Failed to report error to monitor" , "err" , err )
427+ }
428+ }
429+
430+ func endTransaction (txn monitoring.Transaction , log logger.Logger ) {
431+ if err := txn .End (); err != nil {
432+ log .Error ("Failed to end transaction" , "err" , err )
433+ }
434+ }
435+
391436func init () {
392437 httpClient .RetryMax = defaultProxyRequestRetries
393438 httpClient .HTTPClient .Timeout = defaultProxyRequestTimeout
0 commit comments