Skip to content

Commit acb670c

Browse files
author
saul-data
committed
fix for incorrect error stopping the pipeline
1 parent 7696d52 commit acb670c

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

app/workers/runtask/runtask.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ func worker(ctx context.Context, msg modelmain.WorkerTaskSend) {
8787
log.Println(errl.Error())
8888
}
8989

90-
WSLogError("Lock for run and node exists:"+errl.Error(), msg, TaskUpdate)
90+
log.Println("Lock for run and node exists:", msg.RunID, msg.NodeID)
91+
92+
// == NB: this should be a silent fail and continue, the below will fail the entire graph
93+
SilentWSLogError("Lock for run and node exists:"+errl.Error(), msg)
9194

9295
return
9396
}
@@ -96,14 +99,14 @@ func worker(ctx context.Context, msg modelmain.WorkerTaskSend) {
9699
var lockCheck modelmain.WorkerTasks
97100
err2 := database.DBConn.Select("task_id", "status").Where("task_id = ? and environment_id= ?", msg.TaskID, msg.EnvironmentID).First(&lockCheck).Error
98101
if err2 != nil {
99-
log.Println(err2.Error())
100-
WSLogError("Task already running:"+err2.Error(), msg, TaskUpdate)
102+
log.Println("Task already running", err2.Error())
103+
SilentWSLogError("Task already running:"+err2.Error(), msg)
101104
return
102105
}
103106

104107
if lockCheck.Status != "Queue" {
105108
log.Println("Skipping not in queue", msg.RunID, msg.NodeID)
106-
WSLogError("Skipping not in queue - runid:"+msg.RunID+" - node:"+msg.NodeID, msg, TaskUpdate)
109+
SilentWSLogError("Skipping not in queue - runid:"+msg.RunID+" - node:"+msg.NodeID, msg)
107110
return
108111
}
109112

@@ -114,7 +117,7 @@ func worker(ctx context.Context, msg modelmain.WorkerTaskSend) {
114117
err2 = database.DBConn.Select("run_id", "status").Where("run_id = ?", msg.RunID).First(&pipelineCheck).Error
115118
if err2 != nil {
116119
log.Println(err2.Error())
117-
WSLogError("Skipping not in queue - runid:"+msg.RunID+" - node:"+msg.NodeID, msg, TaskUpdate)
120+
WSLogError("Pipeline marked as failed - runid:"+msg.RunID+" - node:"+msg.NodeID, msg, TaskUpdate)
118121
return
119122
}
120123

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package runtask
2+
3+
import (
4+
"time"
5+
6+
"github.com/dataplane-app/dataplane/app/mainapp/database"
7+
"github.com/dataplane-app/dataplane/app/mainapp/database/models"
8+
"github.com/dataplane-app/dataplane/app/workers/mqworker"
9+
"github.com/google/uuid"
10+
)
11+
12+
/* Return errors to the logging console without killing the pipeline (letting it complete) */
13+
func SilentWSLogError(logline string, msg models.WorkerTaskSend) {
14+
15+
uidstring := uuid.NewString()
16+
sendmsg := models.LogsSend{
17+
CreatedAt: time.Now().UTC(),
18+
UID: uidstring,
19+
Log: logline,
20+
LogType: "error",
21+
EnvironmentID: msg.EnvironmentID,
22+
RunID: msg.RunID,
23+
}
24+
25+
mqworker.MsgSend("workerlogs."+msg.EnvironmentID+"."+msg.RunID+"."+msg.NodeID, sendmsg)
26+
database.DBConn.Create(&sendmsg)
27+
28+
}

0 commit comments

Comments
 (0)