-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
106 lines (85 loc) · 3.03 KB
/
main.go
File metadata and controls
106 lines (85 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
"cirello.io/pglock"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
var dsn = "postgres://postgres:@localhost:5433/caddy"
config, err := pgxpool.ParseConfig(dsn)
must(err)
// Create a custom context for the heartbeat, to be able to cancel it from outside pglock
heartbeatContext, cancelHeartbeatSqlCommand := context.WithCancel(context.Background())
// Set up the pgx query tracer, which allows us to:
// - Log queries as they happen
// - Intercept queries and trigger heartbeat query cancel at the right time
config.ConnConfig.Tracer = tracer{cancel: cancelHeartbeatSqlCommand}
// This disables statement cache, it makes it easier to see the commands in Wireshark
// In our case this means: one TCP packet for the request + one TCP packet for the response (delayed by toxiproxy)
config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
db, err := pgxpool.NewWithConfig(context.Background(), config)
must(err)
defer db.Close()
client, err := pglock.UnsafeNew(
stdlib.OpenDBFromPool(db),
)
must(err)
_ = client.DropTable()
must(client.CreateTable())
// Acquire the lock
lock, err := client.AcquireContext(
context.Background(),
"test",
pglock.WithCustomHeartbeatContext(heartbeatContext),
)
must(err)
// Wait a bit before unlocking, to ensure the first heartbeat runs
time.Sleep(500 * time.Millisecond)
// Alternative way to reproduce the issue:
// - Remove the `t.cancel()` in `TraceQueryStart` below
// - Play with the sleep above until you find a value that causes the Release to happen after
// the heartbeat has started but while the SQL command response is still travelling.
// On my machine, a 200ms value works
// Release the lock and expect it to fail
err = lock.Close()
fmt.Println()
switch {
case err == nil:
fmt.Println("OK: unlocked ok")
case errors.Is(err, pglock.ErrLockAlreadyReleased):
fmt.Println("!!! unlocked already released")
default:
fmt.Printf("!!! unlocked other error: %v\n", err)
}
}
type tracer struct {
cancel context.CancelFunc
}
func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
fmt.Println("query start:", data.SQL, data.Args)
// If the query is the heartbeat UPDATE, cancel the context to stop the query
if strings.HasPrefix(strings.TrimSpace(data.SQL), "UPDATE") {
// We wait 10 milliseconds before cancelling, to ensure the command is already sent to
// the server and executed (this happens without delay very quickly if postgres is local),
// but the response is still travelling (toxiproxy adds 100ms delay)
time.AfterFunc(10*time.Millisecond, func() {
fmt.Println("cancelling heartbeat context")
t.cancel()
})
}
return ctx
}
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
fmt.Printf("query end: %s %v\n", data.CommandTag, data.Err)
}
func must(err error) {
if err != nil {
panic(err)
}
}