Skip to content

Commit 830157c

Browse files
author
anahan
committed
Add pipe proxy for logs
1 parent 62a3fe4 commit 830157c

File tree

5 files changed

+75
-19
lines changed

5 files changed

+75
-19
lines changed

cmd/docker-fpm-wrapper/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Config struct {
2020
FpmNoSlowlogProxy bool `mapstructure:"fpm-no-slowlog"`
2121

2222
// Logging proxy section
23+
WrapperPipe string `mapstructure:"wrapper-pipe"`
2324
WrapperSocket string `mapstructure:"wrapper-socket"`
2425
LineBufferSize int `mapstructure:"line-buffer-size"`
2526

@@ -41,6 +42,7 @@ func parseCommandLineFlags() {
4142
pflag.Bool("fpm-no-slowlog", false, "Disable php-fpm slowlog parsing and proxy")
4243

4344
// Logging proxy section
45+
pflag.StringP("wrapper-pipe", "p", "/tmp/fpm-wrapper-pipe", "path to logging pipe, set '' to disable")
4446
pflag.StringP("wrapper-socket", "s", "/tmp/fpm-wrapper.sock", "path to logging socket, set null to disable")
4547
pflag.Uint("line-buffer-size", 16*1024, "Max log line size (in bytes)")
4648

cmd/docker-fpm-wrapper/main.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,26 @@ func main() {
5656

5757
if cfg.WrapperSocket != "null" {
5858
env = append(env, fmt.Sprintf("FPM_WRAPPER_SOCK=unix://%s", cfg.WrapperSocket))
59-
dataListener := applog.NewSockDataListener(cfg.WrapperSocket, breader.NewPool(cfg.LineBufferSize), syncStderr, errCh)
59+
sockDataListener := applog.NewSockDataListener(cfg.WrapperSocket, breader.NewPool(cfg.LineBufferSize), syncStderr, errCh)
6060

61-
if err = dataListener.Start(); err != nil {
61+
if err = sockDataListener.Start(); err != nil {
6262
log.Error("Can't start listen", zap.Error(err))
6363
os.Exit(1)
6464
}
6565

66-
defer dataListener.Stop()
66+
defer sockDataListener.Stop()
67+
}
68+
69+
if cfg.WrapperPipe != "" {
70+
env = append(env, fmt.Sprintf("FPM_WRAPPER_PIPE=%s", cfg.WrapperPipe))
71+
72+
wrapperPipe, err := createFIFOByPathCtx(ctx, cfg.WrapperPipe)
73+
if err != nil {
74+
log.Error("can't create pipe", zap.Error(err), zap.String("path", cfg.WrapperPipe))
75+
os.Exit(1)
76+
}
77+
78+
go applog.NewPipeProxy(log.Named("pipe-proxy"), syncStderr).Proxy(wrapperPipe)
6779
}
6880

6981
fpmConfig, err := phpfpm.ParseConfig(cfg.FpmConfigPath)

internal/applog/data_listener.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,6 @@ func NewSockDataListener(sockPath string, rPool *breader.Pool, writer io.Writer,
2424
return &SockDataListener{socketPath: sockPath, rPool: rPool, writer: writer, errorChan: errorChan}
2525
}
2626

27-
func (l *SockDataListener) normalizeLine(line []byte) []byte {
28-
ll := len(line)
29-
if ll > 0 && line[ll-1] != '\n' {
30-
ll += 1
31-
line = append(line, '\n')
32-
}
33-
34-
if ll > 1 && line[ll-2] == '\r' {
35-
line[ll-2] = line[ll-1]
36-
line = line[:ll-1]
37-
}
38-
39-
return line
40-
}
41-
4227
func (l *SockDataListener) handleConnection(conn net.Conn) {
4328
defer conn.Close()
4429

@@ -48,7 +33,7 @@ func (l *SockDataListener) handleConnection(conn net.Conn) {
4833
for {
4934
buf, err := line.ReadOne(reader, true)
5035
if len(buf) > 0 {
51-
_, _ = l.writer.Write(l.normalizeLine(buf))
36+
_, _ = l.writer.Write(normalizeLine(buf))
5237
}
5338

5439
if err == nil {

internal/applog/normalize.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package applog
2+
3+
func normalizeLine(line []byte) []byte {
4+
ll := len(line)
5+
if ll > 0 && line[ll-1] != '\n' {
6+
ll += 1
7+
line = append(line, '\n')
8+
}
9+
10+
if ll > 1 && line[ll-2] == '\r' {
11+
line[ll-2] = line[ll-1]
12+
line = line[:ll-1]
13+
}
14+
15+
return line
16+
}

internal/applog/pipe_proxy.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package applog
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"io"
7+
8+
"go.uber.org/zap"
9+
10+
"github.com/code-tool/docker-fpm-wrapper/pkg/line"
11+
)
12+
13+
type PipeProxy struct {
14+
log *zap.Logger
15+
writer io.Writer
16+
}
17+
18+
func NewPipeProxy(log *zap.Logger, writer io.Writer) *PipeProxy {
19+
return &PipeProxy{log: log, writer: writer}
20+
}
21+
22+
func (p *PipeProxy) Proxy(r io.Reader) {
23+
bufioReader := bufio.NewReader(r)
24+
25+
for {
26+
buf, err := line.ReadOne(bufioReader, true)
27+
if len(buf) > 0 {
28+
_, _ = p.writer.Write(normalizeLine(buf))
29+
}
30+
31+
if err == nil {
32+
continue
33+
}
34+
35+
if errors.Is(err, io.EOF) {
36+
return
37+
}
38+
39+
p.log.Error("can't read line from pipe", zap.Error(err))
40+
}
41+
}

0 commit comments

Comments
 (0)