Skip to content

Commit 0683eeb

Browse files
committed
Merge remote-tracking branch 'origin/develop'
2 parents 8d73b4c + cd2e4e4 commit 0683eeb

File tree

13 files changed

+229
-118
lines changed

13 files changed

+229
-118
lines changed

.github/workflows/golang.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ jobs:
3636
- name: Build
3737
run: go build -v ./...
3838
- name: Test
39-
env:
40-
TEST_DATABASE_URL: "postgres://postgres:password@localhost:5432/jaeger"
41-
run: go test -test.parallel=1 ./...
39+
uses: nick-fields/retry@v2
40+
with:
41+
max_attempts: 3
42+
timeout_minutes: 5
43+
command: |
44+
TEST_DATABASE_URL='postgres://postgres:password@localhost:5432/jaeger' go test -test.parallel=1 ./...

.github/workflows/release.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ jobs:
2222
env:
2323
flags: ""
2424
steps:
25-
- if: ${{ !startsWith(github.ref, 'refs/tags/v') }}
26-
run: echo "flags=--snapshot" >> $GITHUB_ENV
2725
- name: Checkout
2826
uses: actions/checkout@v4
2927
with:

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ vendor/
1616

1717
jaeger/
1818
jaeger-all-in-one
19-
jaeger-postgresql
19+
20+
jaeger-postgresql
21+
jaeger-postgresql-cleaner

.goreleaser.yaml

Lines changed: 0 additions & 41 deletions
This file was deleted.
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log/slog"
8+
"time"
9+
10+
"github.com/jackc/pgx/v5/pgtype"
11+
"github.com/jackc/pgx/v5/pgxpool"
12+
"github.com/robbert229/fxslog"
13+
"github.com/robbert229/jaeger-postgresql/internal/logger"
14+
"github.com/robbert229/jaeger-postgresql/internal/sql"
15+
"go.uber.org/fx"
16+
)
17+
18+
var (
19+
databaseURLFlag = flag.String("database.url", "", "the postgres connection url to use to connect to the database")
20+
databaseMaxConnsFlag = flag.Int("database.max-conns", 20, "Max number of database connections of which the plugin will try to maintain at any given time")
21+
loglevelFlag = flag.String("log-level", "warn", "Minimal allowed log level")
22+
maxSpanAgeFlag = flag.Duration("max-span-age", time.Hour*24, "Maximum age of a span before it will be cleaned")
23+
)
24+
25+
// ProvideLogger returns a function that provides a logger
26+
func ProvideLogger() any {
27+
return func() (*slog.Logger, error) {
28+
return logger.New(loglevelFlag)
29+
}
30+
}
31+
32+
// ProvidePgxPool returns a function that provides a pgx pool
33+
func ProvidePgxPool() any {
34+
return func(logger *slog.Logger, lc fx.Lifecycle) (*pgxpool.Pool, error) {
35+
if databaseURLFlag == nil {
36+
return nil, fmt.Errorf("invalid database url")
37+
}
38+
39+
databaseURL := *databaseURLFlag
40+
if databaseURL == "" {
41+
return nil, fmt.Errorf("invalid database url")
42+
}
43+
44+
err := sql.Migrate(logger, databaseURL)
45+
if err != nil {
46+
return nil, fmt.Errorf("failed to migrate database: %w", err)
47+
}
48+
49+
pgxconfig, err := pgxpool.ParseConfig(databaseURL)
50+
if err != nil {
51+
return nil, fmt.Errorf("failed to parse database url")
52+
}
53+
54+
// handle max conns
55+
{
56+
var maxConns int32
57+
if databaseMaxConnsFlag == nil {
58+
maxConns = 20
59+
} else {
60+
maxConns = int32(*databaseMaxConnsFlag)
61+
}
62+
63+
pgxconfig.MaxConns = maxConns
64+
}
65+
66+
// handle timeout duration
67+
connectTimeoutDuration := time.Second * 10
68+
pgxconfig.ConnConfig.ConnectTimeout = connectTimeoutDuration
69+
70+
ctx, cancelFn := context.WithTimeout(context.Background(), connectTimeoutDuration)
71+
defer cancelFn()
72+
73+
pool, err := pgxpool.NewWithConfig(ctx, pgxconfig)
74+
if err != nil {
75+
return nil, fmt.Errorf("failed to connect to the postgres database: %w", err)
76+
}
77+
78+
logger.Info("connected to postgres")
79+
80+
lc.Append(fx.Hook{
81+
OnStop: func(ctx context.Context) error {
82+
pool.Close()
83+
return nil
84+
},
85+
})
86+
87+
return pool, nil
88+
}
89+
}
90+
91+
// clean purges the old roles from the database
92+
func clean(ctx context.Context, pool *pgxpool.Pool) (int64, error) {
93+
q := sql.New(pool)
94+
result, err := q.CleanSpans(ctx, pgtype.Timestamp{Time: time.Now().Add(-1 * *maxSpanAgeFlag), Valid: true})
95+
if err != nil {
96+
return 0, err
97+
}
98+
99+
return result, nil
100+
}
101+
102+
func main() {
103+
flag.Parse()
104+
105+
fx.New(
106+
fxslog.WithLogger(func(logger *slog.Logger) *slog.Logger {
107+
return logger.With("component", "uber/fx")
108+
}),
109+
fx.Provide(
110+
ProvideLogger(),
111+
ProvidePgxPool(),
112+
),
113+
fx.Invoke(func(pool *pgxpool.Pool, lc fx.Lifecycle, logger *slog.Logger, stopper fx.Shutdowner) error {
114+
go func(ctx context.Context) {
115+
ctx, cancelFn := context.WithTimeout(ctx, time.Minute)
116+
defer cancelFn()
117+
118+
count, err := clean(ctx, pool)
119+
if err != nil {
120+
logger.Error("failed to clean database", "err", err)
121+
stopper.Shutdown(fx.ExitCode(1))
122+
return
123+
}
124+
125+
logger.Info("successfully cleaned database", "spans", count)
126+
stopper.Shutdown(fx.ExitCode(0))
127+
}(context.Background())
128+
return nil
129+
}),
130+
).Run()
131+
}

main.go renamed to cmd/jaeger-postgresql/main.go

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"log/slog"
88
"net"
99
"net/http"
10-
"os"
1110
"time"
1211

1312
"github.com/jackc/pgx/v5/pgxpool"
@@ -16,40 +15,17 @@ import (
1615
"github.com/jaegertracing/jaeger/storage/spanstore"
1716
"github.com/prometheus/client_golang/prometheus/promhttp"
1817
"github.com/robbert229/fxslog"
18+
"github.com/robbert229/jaeger-postgresql/internal/logger"
1919
"github.com/robbert229/jaeger-postgresql/internal/sql"
2020
"github.com/robbert229/jaeger-postgresql/internal/store"
21-
"google.golang.org/grpc"
22-
2321
"go.uber.org/fx"
22+
"google.golang.org/grpc"
2423
)
2524

2625
// ProvideLogger returns a function that provides a logger
2726
func ProvideLogger() any {
2827
return func() (*slog.Logger, error) {
29-
levelFn := func() (slog.Level, error) {
30-
if loglevelFlag == nil {
31-
return slog.LevelWarn, nil
32-
}
33-
34-
switch *loglevelFlag {
35-
case "info":
36-
return slog.LevelInfo, nil
37-
case "warn":
38-
return slog.LevelWarn, nil
39-
case "error":
40-
return slog.LevelError, nil
41-
case "debug":
42-
return slog.LevelDebug, nil
43-
default:
44-
return 0, fmt.Errorf("invalid log level: %s", *loglevelFlag)
45-
}
46-
}
47-
level, err := levelFn()
48-
if err != nil {
49-
return nil, fmt.Errorf("failed to build logger: %w", err)
50-
}
51-
52-
return slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level})), nil
28+
return logger.New(loglevelFlag)
5329
}
5430
}
5531

internal/logger/logger.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package logger
2+
3+
import (
4+
"fmt"
5+
"log/slog"
6+
"os"
7+
)
8+
9+
// New returns a new logger.
10+
func New(loglevelStr *string) (*slog.Logger, error) {
11+
levelFn := func() (slog.Level, error) {
12+
if loglevelStr == nil {
13+
return slog.LevelWarn, nil
14+
}
15+
16+
switch *loglevelStr {
17+
case "info":
18+
return slog.LevelInfo, nil
19+
case "warn":
20+
return slog.LevelWarn, nil
21+
case "error":
22+
return slog.LevelError, nil
23+
case "debug":
24+
return slog.LevelDebug, nil
25+
default:
26+
return 0, fmt.Errorf("invalid log level: %s", *loglevelStr)
27+
}
28+
}
29+
level, err := levelFn()
30+
if err != nil {
31+
return nil, fmt.Errorf("failed to build logger: %w", err)
32+
}
33+
34+
return slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level})), nil
35+
}

internal/sql/query.sql

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,7 @@ VALUES(
119119
)
120120
RETURNING spans.hack_id;
121121

122-
-- -- name: InsertSpanRefs :copyfrom
123-
-- INSERT INTO spanrefs (
124-
-- source_span_id,
125-
-- child_span_id,
126-
-- trace_id,
127-
-- ref_type
128-
-- )
129-
-- VALUES (
130-
-- sqlc.arg(source_span_id),
131-
-- sqlc.arg(child_span_id),
132-
-- sqlc.arg(trace_id),
133-
-- sqlc.arg(ref_type)
134-
-- );
122+
-- name: CleanSpans :execrows
123+
124+
DELETE FROM spans
125+
WHERE spans.start_time < sqlc.arg(prune_before)::TIMESTAMP;

internal/sql/query.sql.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/store/reader.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ func (r *Reader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa
184184
StartTimeMinimum: EncodeTimestamp(query.StartTimeMin),
185185
StartTimeMinimumEnable: query.StartTimeMin.After(time.Time{}),
186186
StartTimeMaximum: EncodeTimestamp(query.StartTimeMax),
187-
// StartTimeMaximumEnable: query.StartTimeMax.After(time.Time{}),
188-
StartTimeMaximumEnable: false, // maintaining feature parity for now.
187+
StartTimeMaximumEnable: query.StartTimeMax.After(time.Time{}),
189188
DurationMinimum: EncodeInterval(query.DurationMin),
190189
DurationMinimumEnable: query.DurationMin > 0*time.Second,
191190
DurationMaximum: EncodeInterval(query.DurationMax),

0 commit comments

Comments
 (0)