Skip to content

Commit 92d33c3

Browse files
authored
Merge branch 'develop' into chore/clean-codes
2 parents 64e5d38 + 62c52c0 commit 92d33c3

File tree

13 files changed

+31322
-31163
lines changed

13 files changed

+31322
-31163
lines changed

.github/workflows/release.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ jobs:
164164
with:
165165
builder: ${{ steps.buildx.outputs.name }}
166166
context: .
167+
build-args: |
168+
ANSIBLE_VERSION=9.4.0
167169
file: deployment/docker/runner/Dockerfile
168170
platforms: linux/amd64,linux/arm64 #,linux/arm/v6
169171
push: ${{ github.event_name != 'pull_request' }}

api/sockets/handler.go

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/gorilla/context"
1111
"github.com/gorilla/websocket"
12-
"github.com/semaphoreui/semaphore/util"
1312
log "github.com/sirupsen/logrus"
1413
)
1514

@@ -33,6 +32,10 @@ const (
3332

3433
// Maximum message size allowed from peer.
3534
maxMessageSize = 512
35+
36+
// Maximum size of the connection.send channel.
37+
// When the channel is full, the hub closes it (see method hub.run).
38+
connectionChannelSize = 256
3639
)
3740

3841
type connection struct {
@@ -41,6 +44,25 @@ type connection struct {
4144
userID int
4245
}
4346

47+
func (c *connection) log(level log.Level, err error, msg string) {
48+
log.WithError(err).WithFields(log.Fields{
49+
"context": "websocket",
50+
"user_id": c.userID,
51+
}).Log(level, msg)
52+
}
53+
54+
func (c *connection) logError(err error, msg string) {
55+
c.log(log.ErrorLevel, err, msg)
56+
}
57+
58+
func (c *connection) logWarn(err error, msg string) {
59+
c.log(log.DebugLevel, err, msg)
60+
}
61+
62+
func (c *connection) logDebug(err error, msg string) {
63+
c.log(log.DebugLevel, err, msg)
64+
}
65+
4466
// readPump pumps messages from the websocket connection to the hub.
4567
func (c *connection) readPump() {
4668
defer func() {
@@ -50,11 +72,17 @@ func (c *connection) readPump() {
5072

5173
c.ws.SetReadLimit(maxMessageSize)
5274

53-
util.LogErrorF(c.ws.SetReadDeadline(tz.Now().Add(pongWait)), log.Fields{"error": "Cannot set read deadline"})
75+
if err := c.ws.SetReadDeadline(tz.Now().Add(pongWait)); err != nil {
76+
c.logWarn(err, "Failed to set read deadline")
77+
}
5478

5579
c.ws.SetPongHandler(func(string) error {
56-
err := c.ws.SetReadDeadline(tz.Now().Add(pongWait))
57-
util.LogErrorF(err, log.Fields{"error": "Cannot set read deadline"})
80+
deadline := tz.Now().Add(pongWait)
81+
82+
if err := c.ws.SetReadDeadline(deadline); err != nil {
83+
c.logWarn(err, "Failed to set read deadline")
84+
}
85+
5886
return nil
5987
})
6088

@@ -64,7 +92,7 @@ func (c *connection) readPump() {
6492

6593
if err != nil {
6694
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
67-
util.LogError(err)
95+
c.logDebug(err, "Failed to read message from client")
6896
}
6997
break
7098
}
@@ -74,9 +102,11 @@ func (c *connection) readPump() {
74102
// write writes a message with the given message type and payload.
75103
func (c *connection) write(mt int, payload []byte) error {
76104

77-
err := c.ws.SetWriteDeadline(tz.Now().Add(writeWait))
105+
deadline := tz.Now().Add(writeWait)
78106

79-
util.LogErrorF(err, log.Fields{"error": "Cannot set write deadline"})
107+
if err := c.ws.SetWriteDeadline(deadline); err != nil {
108+
c.logWarn(err, "Cannot set write deadline")
109+
}
80110

81111
return c.ws.WriteMessage(mt, payload)
82112
}
@@ -93,28 +123,23 @@ func (c *connection) writePump() {
93123
for {
94124
select {
95125
case message, ok := <-c.send:
126+
96127
if !ok {
97128
if err := c.write(websocket.CloseMessage, []byte{}); err != nil {
98-
log.WithError(err).WithFields(log.Fields{
99-
"context": "websocket",
100-
"user_id": c.userID,
101-
}).Debug("Cannot send close message")
129+
c.logDebug(err, "Failed to write close message to client")
102130
}
103131
return
104132
}
133+
105134
if err := c.write(websocket.TextMessage, message); err != nil {
106-
log.WithError(err).WithFields(log.Fields{
107-
"context": "websocket",
108-
"user_id": c.userID,
109-
}).Debug("Cannot send text message")
135+
c.logDebug(err, "Failed to write message to client")
110136
return
111137
}
138+
112139
case <-ticker.C:
140+
113141
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
114-
log.WithError(err).WithFields(log.Fields{
115-
"context": "websocket",
116-
"user_id": c.userID,
117-
}).Debug("Cannot send ping message")
142+
c.logDebug(err, "Failed to write ping message to client")
118143
return
119144
}
120145
}
@@ -131,13 +156,18 @@ func Handler(w http.ResponseWriter, r *http.Request) {
131156
user := usr.(*db.User)
132157
ws, err := upgrader.Upgrade(w, r, nil)
133158
if err != nil {
134-
log.Error(err)
159+
160+
log.WithError(err).WithFields(log.Fields{
161+
"context": "websocket",
162+
"user_id": user.ID,
163+
}).Error("Failed to upgrade connection to websocket")
164+
135165
w.WriteHeader(http.StatusInternalServerError)
136166
return
137167
}
138168

139169
c := &connection{
140-
send: make(chan []byte, 256),
170+
send: make(chan []byte, connectionChannelSize),
141171
ws: ws,
142172
userID: user.ID,
143173
}

api/sockets/pool.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package sockets
22

3+
import log "github.com/sirupsen/logrus"
4+
35
// hub maintains the set of active connections and broadcasts messages to the
46
// connections.
57
type hub struct {
6-
// Registered connections.
8+
// Registered websocket connections.
79
connections map[*connection]bool
810

911
// Inbound messages from the connections.
@@ -28,7 +30,6 @@ var h = hub{
2830
connections: make(map[*connection]bool),
2931
}
3032

31-
// nolint: gocyclo
3233
func (h *hub) run() {
3334
for {
3435
select {
@@ -40,16 +41,23 @@ func (h *hub) run() {
4041
close(c.send)
4142
}
4243
case m := <-h.broadcast:
43-
for c := range h.connections {
44-
if m.userID > 0 && m.userID != c.userID {
44+
for conn := range h.connections {
45+
if m.userID > 0 && m.userID != conn.userID {
4546
continue
4647
}
4748

4849
select {
49-
case c.send <- m.msg:
50+
case conn.send <- m.msg:
5051
default:
51-
close(c.send)
52-
delete(h.connections, c)
52+
53+
log.WithFields(log.Fields{
54+
"context": "websocket",
55+
"user_id": conn.userID,
56+
}).Error("Connection send channel is full, connection closing")
57+
58+
close(conn.send)
59+
delete(h.connections, conn)
60+
_ = conn.ws.Close() // Close the WebSocket connection first
5361
}
5462
}
5563
}

cli/cmd/root.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,22 @@ Complete documentation is available at https://semaphoreui.com.`,
3636
_ = cmd.Help()
3737
os.Exit(0)
3838
},
39+
3940
PersistentPreRun: func(cmd *cobra.Command, args []string) {
40-
if persistentFlags.logLevel == "" {
41+
str := persistentFlags.logLevel
42+
if str == "" {
43+
str = os.Getenv("SEMAPHORE_LOG_LEVEL")
44+
}
45+
if str == "" {
4146
return
4247
}
4348

44-
lvl, err := log.ParseLevel(persistentFlags.logLevel)
49+
lvl, err := log.ParseLevel(str)
4550
if err != nil {
4651
log.Panic(err)
4752
}
4853

54+
fmt.Println("Log level set to", lvl)
4955
log.SetLevel(lvl)
5056
},
5157
}

services/runners/running_job.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,37 @@ func (p *runningJob) logPipe(reader io.Reader) {
9696
defer p.logWG.Done()
9797

9898
scanner := bufio.NewScanner(reader)
99+
const maxCapacity = 10 * 1024 * 1024 // 10 MB
100+
buf := make([]byte, maxCapacity)
101+
scanner.Buffer(buf, maxCapacity)
99102

100103
for scanner.Scan() {
101104
line := scanner.Text()
102105
p.Log(line)
103106
}
104107

105-
if scanner.Err() != nil && scanner.Err().Error() != "EOF" {
106-
//don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error
107-
log.WithError(scanner.Err()).WithFields(log.Fields{
108-
"context": "task_log",
108+
err := scanner.Err()
109+
110+
if err != nil {
111+
msg := "Failed to read TaskRunner output"
112+
113+
switch err.Error() {
114+
case "EOF",
115+
"os: process already finished",
116+
"read |0: file already closed":
117+
return // it is ok
118+
case "bufio.Scanner: token too long":
119+
msg = "TaskRunner output exceeds the maximum allowed size of 10MB"
120+
break
121+
}
122+
123+
p.job.Kill() // kill the job because stdout cannot be read.
124+
125+
log.WithError(err).WithFields(log.Fields{
109126
"task_id": p.job.Task.ID,
110-
}).Debug("failed to read log")
127+
"context": "task_logger",
128+
}).Error(msg)
129+
130+
p.Log("Fatal error: " + msg)
111131
}
112132
}

services/tasks/TaskRunner.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,11 @@ func (t *TaskRunner) run() {
192192
if t.job.IsKilled() {
193193
t.SetStatus(task_logger.TaskStoppedStatus)
194194
} else {
195-
log.WithError(err).Warn("Failed to run task")
195+
log.WithError(err).WithFields(log.Fields{
196+
"task_id": t.Task.ID,
197+
"context": "task_runner",
198+
"task_status": t.Task.Status,
199+
}).Warn("Failed to run task")
196200
t.Log("Failed to run task: " + err.Error())
197201
t.SetStatus(task_logger.TaskFailStatus)
198202
}

services/tasks/TaskRunner_logging.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ func (t *TaskRunner) logPipe(reader io.Reader) {
153153
}()
154154

155155
scanner := bufio.NewScanner(reader)
156+
const maxCapacity = 10 * 1024 * 1024 // 10 MB
157+
buf := make([]byte, maxCapacity)
158+
scanner.Buffer(buf, maxCapacity)
156159

157160
for scanner.Scan() {
158161
line := scanner.Text()
@@ -161,7 +164,28 @@ func (t *TaskRunner) logPipe(reader io.Reader) {
161164

162165
close(linesCh)
163166

164-
if scanner.Err() != nil && scanner.Err().Error() != "EOF" {
165-
util.LogDebugF(scanner.Err(), log.Fields{"error": "Failed to read TaskRunner output"})
167+
err := scanner.Err()
168+
169+
if err != nil {
170+
msg := "Failed to read TaskRunner output"
171+
172+
switch err.Error() {
173+
case "EOF",
174+
"os: process already finished",
175+
"read |0: file already closed":
176+
return // it is ok
177+
case "bufio.Scanner: token too long":
178+
msg = "TaskRunner output exceeds the maximum allowed size of 10MB"
179+
break
180+
}
181+
182+
t.kill() // kill the job because stdout cannot be read.
183+
184+
log.WithError(err).WithFields(log.Fields{
185+
"task_id": t.Task.ID,
186+
"context": "task_logger",
187+
}).Error(msg)
188+
189+
t.Log("Fatal error: " + msg)
166190
}
167191
}

0 commit comments

Comments
 (0)