Skip to content

Commit 5111cdd

Browse files
authored
Move event processing to the admin api (#149)
* Move event processing to the admin api Fix cleanup ugh * Cleanup * Remove timestamps, supervisor already provides it
1 parent 25c7515 commit 5111cdd

File tree

4 files changed

+141
-70
lines changed

4 files changed

+141
-70
lines changed

cmd/event_handler/main.go

Lines changed: 24 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,53 @@
11
package main
22

33
import (
4-
"context"
5-
"errors"
4+
"bytes"
5+
"encoding/json"
66
"flag"
77
"fmt"
88
"log"
9-
"os"
9+
"net/http"
1010

11+
"github.com/fly-apps/postgres-flex/internal/api"
1112
"github.com/fly-apps/postgres-flex/internal/flypg"
12-
"github.com/jackc/pgx/v5"
1313
)
1414

15-
const eventLogFile = "/data/event.log"
16-
1715
func main() {
18-
ctx := context.Background()
19-
20-
if err := processEvent(ctx); err != nil {
21-
log.Println(err)
22-
os.Exit(1)
23-
}
24-
}
25-
26-
func processEvent(ctx context.Context) error {
2716
event := flag.String("event", "", "event type")
2817
nodeID := flag.Int("node-id", 0, "the node id")
2918
success := flag.String("success", "", "success (1) failure (0)")
3019
details := flag.String("details", "", "details")
3120
flag.Parse()
3221

33-
logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
34-
if err != nil {
35-
return fmt.Errorf("failed to open event log: %s", err)
22+
succ := true
23+
if *success == "0" {
24+
succ = false
3625
}
37-
defer func() { _ = logFile.Close() }()
3826

39-
log.SetOutput(logFile)
40-
log.Printf("event: %s, node: %d, success: %s, details: %s\n", *event, *nodeID, *success, *details)
27+
req := api.EventRequest{
28+
Name: *event,
29+
NodeID: *nodeID,
30+
Success: succ,
31+
Details: *details,
32+
}
4133

42-
node, err := flypg.NewNode()
34+
reqBytes, err := json.Marshal(req)
4335
if err != nil {
44-
return fmt.Errorf("failed to initialize node: %s", err)
36+
log.Fatalln(err)
4537
}
4638

47-
switch *event {
48-
case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect":
49-
conn, err := node.RepMgr.NewLocalConnection(ctx)
50-
if err != nil {
51-
return fmt.Errorf("failed to open local connection: %s", err)
52-
}
53-
defer func() { _ = conn.Close(ctx) }()
54-
55-
member, err := node.RepMgr.Member(ctx, conn)
56-
if err != nil {
57-
return fmt.Errorf("failed to resolve member: %s", err)
58-
}
59-
60-
if member.Role != flypg.PrimaryRoleName {
61-
// We should never get here.
62-
log.Println("skipping since we are not the primary")
63-
return nil
64-
}
65-
66-
if err := evaluateClusterState(ctx, conn, node); err != nil {
67-
return fmt.Errorf("failed to evaluate cluster state: %s", err)
68-
}
39+
node, err := flypg.NewNode()
40+
if err != nil {
41+
log.Fatalln(err)
6942
}
7043

71-
return logFile.Sync()
72-
}
73-
74-
func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
75-
primary, err := flypg.PerformScreening(ctx, conn, node)
76-
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
77-
if err := flypg.Quarantine(ctx, node, primary); err != nil {
78-
return fmt.Errorf("failed to quarantine failed primary: %s", err)
79-
}
80-
return fmt.Errorf("primary has been quarantined: %s", err)
81-
} else if err != nil {
82-
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
44+
endpoint := fmt.Sprintf("http://[%s]:5500/commands/events/process", node.PrivateIP)
45+
resp, err := http.Post(endpoint, "application/json", bytes.NewReader(reqBytes))
46+
if err != nil {
47+
log.Fatalln(err)
8348
}
8449

85-
// Clear zombie lock if it exists
86-
if flypg.ZombieLockExists() {
87-
log.Println("Clearing zombie lock and re-enabling read/write")
88-
if err := flypg.RemoveZombieLock(); err != nil {
89-
return fmt.Errorf("failed to remove zombie lock: %s", err)
90-
}
91-
92-
log.Println("Broadcasting readonly state change")
93-
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
94-
log.Printf("failed to disable readonly: %s", err)
95-
}
50+
if err := resp.Body.Close(); err != nil {
51+
log.Fatalln(err)
9652
}
97-
98-
return nil
9953
}

cmd/monitor/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ var (
2121
func main() {
2222
ctx := context.Background()
2323

24+
log.SetFlags(0)
25+
2426
node, err := flypg.NewNode()
2527
if err != nil {
2628
panic(fmt.Sprintf("failed to reference node: %s\n", err))

internal/api/handle_event.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"net/http"
10+
11+
"github.com/fly-apps/postgres-flex/internal/flypg"
12+
"github.com/jackc/pgx/v5"
13+
)
14+
15+
type EventRequest struct {
16+
Name string `json:"name"`
17+
NodeID int `json:"nodeID"`
18+
Success bool `json:"success"`
19+
Details string `json:"details"`
20+
}
21+
22+
const (
23+
childNodeDisconnect = "child_node_disconnect"
24+
childNodeReconnect = "child_node_reconnect"
25+
childNodeNewConnect = "child_node_new_connect"
26+
)
27+
28+
func handleEvent(w http.ResponseWriter, r *http.Request) {
29+
var event EventRequest
30+
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
31+
log.Printf("[ERROR] Failed to decode event request: %s\n", err)
32+
renderErr(w, err)
33+
return
34+
}
35+
defer func() { _ = r.Body.Close() }()
36+
37+
if !event.Success {
38+
errMsg := fmt.Sprintf("[ERROR] Event %q failed: %s", event.Name, event.Details)
39+
log.Println(errMsg)
40+
renderErr(w, errors.New(errMsg))
41+
return
42+
}
43+
44+
if err := processEvent(r.Context(), event); err != nil {
45+
log.Printf("[ERROR] Failed to process event: %s\n", err)
46+
renderErr(w, err)
47+
return
48+
}
49+
}
50+
51+
func processEvent(ctx context.Context, event EventRequest) error {
52+
log.Printf("Processing event: %q \n", event.Name)
53+
node, err := flypg.NewNode()
54+
if err != nil {
55+
return fmt.Errorf("failed to initialize node: %s", err)
56+
}
57+
58+
switch event.Name {
59+
case childNodeDisconnect, childNodeReconnect, childNodeNewConnect:
60+
conn, err := node.RepMgr.NewLocalConnection(ctx)
61+
if err != nil {
62+
return fmt.Errorf("failed to open local connection: %s", err)
63+
}
64+
defer func() { _ = conn.Close(ctx) }()
65+
66+
member, err := node.RepMgr.Member(ctx, conn)
67+
if err != nil {
68+
return fmt.Errorf("failed to resolve member: %s", err)
69+
}
70+
71+
if member.Role != flypg.PrimaryRoleName {
72+
return nil
73+
}
74+
75+
if err := evaluateClusterState(ctx, conn, node); err != nil {
76+
return fmt.Errorf("failed to evaluate cluster state: %s", err)
77+
}
78+
}
79+
80+
return nil
81+
}
82+
83+
// TODO Move this into zombie.go
84+
func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
85+
primary, err := flypg.PerformScreening(ctx, conn, node)
86+
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
87+
if err := flypg.Quarantine(ctx, node, primary); err != nil {
88+
return fmt.Errorf("failed to quarantine failed primary: %s", err)
89+
}
90+
log.Println("[WARN] Primary is going read-only to protect against potential split-brain")
91+
return nil
92+
} else if err != nil {
93+
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
94+
}
95+
96+
// Clear zombie lock if it exists
97+
if flypg.ZombieLockExists() {
98+
log.Println("Quorom has been reached. Disabling read-only mode.")
99+
if err := flypg.RemoveZombieLock(); err != nil {
100+
return fmt.Errorf("failed to remove zombie lock file: %s", err)
101+
}
102+
103+
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
104+
log.Printf("failed to disable readonly: %s", err)
105+
}
106+
}
107+
108+
return nil
109+
}

internal/api/handler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"net/http"
78
"time"
89

@@ -15,6 +16,8 @@ import (
1516
const Port = 5500
1617

1718
func StartHttpServer() error {
19+
log.SetFlags(0)
20+
1821
r := chi.NewMux()
1922
r.Mount("/flycheck", flycheck.Handler())
2023
r.Mount("/commands", Handler())
@@ -30,6 +33,9 @@ func StartHttpServer() error {
3033

3134
func Handler() http.Handler {
3235
r := chi.NewRouter()
36+
r.Route("/events", func(r chi.Router) {
37+
r.Post("/process", handleEvent)
38+
})
3339

3440
r.Route("/users", func(r chi.Router) {
3541
r.Get("/{name}", handleGetUser)

0 commit comments

Comments
 (0)