Skip to content

Commit 732f097

Browse files
authored
[-] add additional checks to LogParse() (#784)
Fix CPU consumption introduced in #fb33d09d. Add additional timer check. Add `pprof` profile to demo Docker image. Add additional check if pgwatch and monitored Postgres are running on the same host. Do not start `LogParse` if pgwatch is unable to read logs.
1 parent c5383f6 commit 732f097

File tree

7 files changed

+199
-4
lines changed

7 files changed

+199
-4
lines changed

docker/demo/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ ARG GIT_TIME
1616

1717
COPY . /pgwatch
1818
COPY --from=uibuilder /webui/build /pgwatch/internal/webui/build
19-
RUN cd /pgwatch && CGO_ENABLED=0 go build -ldflags "-X 'main.commit=${GIT_HASH}' -X 'main.date=${GIT_TIME}' -X 'main.version=${VERSION}'" ./cmd/pgwatch
19+
RUN cd /pgwatch && CGO_ENABLED=0 go build -tags=pprof -ldflags "-X 'main.commit=${GIT_HASH}' -X 'main.date=${GIT_TIME}' -X 'main.version=${VERSION}'" ./cmd/pgwatch
2020

2121
# ----------------------------------------------------------------
2222
# 3. Build the final image
@@ -80,6 +80,8 @@ EXPOSE 5432
8080
EXPOSE 3000
8181
# Prometheus scraping port
8282
EXPOSE 9187
83+
# pprof port
84+
EXPOSE 6060
8385

8486
### Volumes for easier updating to newer to newer pgwatch containers
8587
### Backwards compatibility is not 100% guaranteed so a backup

docs/tutorial/docker_installation.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ As mentioned in the [Components](../concept/components.md) chapter, remember tha
192192
one example how your monitoring setup around the pgwatch metrics
193193
collector could be organized. For another example how various components
194194
(as Docker images here) can work together, see a *Docker Compose*
195-
example with loosely coupled components
196-
[here](https://github.com/cybertec-postgresql/pgwatch/blob/master/docker-compose.yml).
195+
[example](https://github.com/cybertec-postgresql/pgwatch/blob/master/docker-compose.yml)
196+
with loosely coupled components.
197197

198198
## Example of advanced setup using YAML files and dual sinks
199199

internal/db/conn.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package db
22

33
import (
44
"context"
5+
"encoding/binary"
6+
"os"
7+
"path/filepath"
58
"reflect"
69

710
jsoniter "github.com/json-iterator/go"
@@ -63,3 +66,41 @@ func MarshallParamToJSONB(v any) any {
6366
}
6467
return nil
6568
}
69+
70+
// Function to determine if the client is connected to the same host as the PostgreSQL server
71+
func IsClientOnSameHost(conn PgxIface) (bool, error) {
72+
ctx := context.Background()
73+
74+
// Step 1: Check connection type using SQL
75+
var isUnixSocket bool
76+
err := conn.QueryRow(ctx, "SELECT COALESCE(inet_client_addr(), inet_server_addr()) IS NULL").Scan(&isUnixSocket)
77+
if err != nil || isUnixSocket {
78+
return isUnixSocket, err
79+
}
80+
81+
// Step 2: Retrieve unique cluster identifier
82+
var dataDirectory string
83+
if err := conn.QueryRow(ctx, "SHOW data_directory").Scan(&dataDirectory); err != nil {
84+
return false, err
85+
}
86+
87+
var systemIdentifier uint64
88+
if err := conn.QueryRow(ctx, "SELECT system_identifier FROM pg_control_system()").Scan(&systemIdentifier); err != nil {
89+
return false, err
90+
}
91+
92+
// Step 3: Compare system identifier from file system
93+
pgControlFile := filepath.Join(dataDirectory, "global", "pg_control")
94+
file, err := os.Open(pgControlFile)
95+
if err != nil {
96+
return false, err
97+
}
98+
defer file.Close()
99+
100+
var fileSystemIdentifier uint64
101+
if err := binary.Read(file, binary.LittleEndian, &fileSystemIdentifier); err != nil {
102+
return false, err
103+
}
104+
105+
return fileSystemIdentifier == systemIdentifier, nil
106+
}

internal/db/conn_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package db_test
22

33
import (
4+
"encoding/binary"
5+
"os"
6+
"path/filepath"
47
"reflect"
58
"testing"
69

710
"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
11+
"github.com/pashagolub/pgxmock/v4"
12+
"github.com/stretchr/testify/require"
813
)
914

1015
func TestMarshallParam(t *testing.T) {
@@ -62,3 +67,133 @@ func TestMarshallParam(t *testing.T) {
6267
})
6368
}
6469
}
70+
71+
func TestIsClientOnSameHost(t *testing.T) {
72+
// Create a pgxmock pool
73+
mock, err := pgxmock.NewPool()
74+
if err != nil {
75+
t.Fatalf("failed to create pgxmock pool: %v", err)
76+
}
77+
defer mock.Close()
78+
dataDir := t.TempDir()
79+
pgControl := filepath.Join(dataDir, "global")
80+
require.NoError(t, os.MkdirAll(pgControl, 0755))
81+
file, err := os.OpenFile(filepath.Join(pgControl, "pg_control"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
82+
require.NoError(t, err)
83+
err = binary.Write(file, binary.LittleEndian, uint64(12345))
84+
require.NoError(t, err)
85+
require.NoError(t, file.Close())
86+
87+
// Test cases
88+
tests := []struct {
89+
name string
90+
setupMock func()
91+
want bool
92+
wantErr bool
93+
}{
94+
{
95+
name: "UNIX socket connection",
96+
setupMock: func() {
97+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
98+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true),
99+
)
100+
},
101+
want: true,
102+
wantErr: false,
103+
},
104+
{
105+
name: "Matching system identifier",
106+
setupMock: func() {
107+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
108+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false),
109+
)
110+
mock.ExpectQuery(`SHOW`).WillReturnRows(
111+
pgxmock.NewRows([]string{"data_directory"}).AddRow(dataDir),
112+
)
113+
mock.ExpectQuery(`SELECT`).WillReturnRows(
114+
pgxmock.NewRows([]string{"system_identifier"}).AddRow(uint64(12345)),
115+
)
116+
},
117+
want: true,
118+
wantErr: false,
119+
},
120+
{
121+
name: "Non-matching system identifier",
122+
setupMock: func() {
123+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
124+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false),
125+
)
126+
mock.ExpectQuery(`SHOW`).WillReturnRows(
127+
pgxmock.NewRows([]string{"data_directory"}).AddRow(dataDir),
128+
)
129+
mock.ExpectQuery(`SELECT`).WillReturnRows(
130+
pgxmock.NewRows([]string{"system_identifier"}).AddRow(uint64(42)),
131+
)
132+
},
133+
want: false,
134+
wantErr: false,
135+
},
136+
{
137+
name: "Error on COALESCE query",
138+
setupMock: func() {
139+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnError(os.ErrInvalid)
140+
},
141+
want: false,
142+
wantErr: true,
143+
},
144+
{
145+
name: "Error on SHOW query",
146+
setupMock: func() {
147+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
148+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false),
149+
)
150+
mock.ExpectQuery(`SHOW`).WillReturnError(os.ErrInvalid)
151+
},
152+
want: false,
153+
wantErr: true,
154+
},
155+
{
156+
name: "Error on SELECT system_identifier query",
157+
setupMock: func() {
158+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
159+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false),
160+
)
161+
mock.ExpectQuery(`SHOW`).WillReturnRows(
162+
pgxmock.NewRows([]string{"data_directory"}).AddRow(dataDir),
163+
)
164+
mock.ExpectQuery(`SELECT`).WillReturnError(os.ErrInvalid)
165+
},
166+
want: false,
167+
wantErr: true,
168+
},
169+
{
170+
name: "Error on os.Open",
171+
setupMock: func() {
172+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
173+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false),
174+
)
175+
mock.ExpectQuery(`SHOW`).WillReturnRows(
176+
pgxmock.NewRows([]string{"data_directory"}).AddRow("invalid/path"),
177+
)
178+
mock.ExpectQuery(`SELECT`).WillReturnRows(
179+
pgxmock.NewRows([]string{"system_identifier"}).AddRow(uint64(12345)),
180+
)
181+
},
182+
want: false,
183+
wantErr: true,
184+
},
185+
}
186+
187+
for _, tt := range tests {
188+
t.Run(tt.name, func(t *testing.T) {
189+
tt.setupMock()
190+
got, err := db.IsClientOnSameHost(mock)
191+
if (err != nil) != tt.wantErr {
192+
t.Errorf("IsClientOnSameHost() error = %v, wantErr %v", err, tt.wantErr)
193+
}
194+
if got != tt.want {
195+
t.Errorf("IsClientOnSameHost() = %v, want %v", got, tt.want)
196+
}
197+
})
198+
}
199+
}

internal/metrics/logparse.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
126126

127127
logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
128128

129+
if ok, err := db.IsClientOnSameHost(mdb.Conn); !ok || err != nil {
130+
if err != nil {
131+
logger = logger.WithError(err)
132+
}
133+
logger.Warning("Cannot parse logs, client is not on the same host as the Postgres server")
134+
return
135+
}
136+
129137
csvlogRegex, err = regexp.Compile(cmp.Or(mdb.HostConfig.LogsMatchRegex, CSVLogDefaultRegEx))
130138
if err != nil {
131139
logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
@@ -220,6 +228,12 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
220228
}
221229

222230
if err == io.EOF {
231+
// // EOF reached, wait for new files to be added
232+
select {
233+
case <-ctx.Done():
234+
return
235+
case <-time.After(currInterval):
236+
}
223237
// check for newly opened logfiles
224238
file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
225239
if file != "" {

internal/metrics/logparse_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ func TestLogParse(t *testing.T) {
384384
require.NoError(t, err)
385385
defer mock.Close()
386386

387+
// pretend we're connected via UNIX socket
388+
mock.ExpectQuery(`SELECT COALESCE`).WillReturnRows(
389+
pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
387390
// Mock the language detection query
388391
mock.ExpectQuery(`select current_setting\('lc_messages'\)::varchar\(2\) as lc_messages;`).
389392
WillReturnRows(pgxmock.NewRows([]string{"lc_messages"}).AddRow("en"))

internal/reaper/reaper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
290290

291291
l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
292292
if metricName == specialMetricServerLogEventCounts {
293-
metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh) // no return
293+
metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh)
294294
return
295295
}
296296

0 commit comments

Comments
 (0)