Skip to content

Commit d71cf3f

Browse files
committed
feat: crunchybridge integration
1 parent 90cdda8 commit d71cf3f

File tree

16 files changed

+1479
-504
lines changed

16 files changed

+1479
-504
lines changed

cmd/worker/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ func main() {
6666
// Register task handlers
6767
mux := asynq.NewServeMux()
6868

69-
// Logical restore workflow tasks (local execution)
70-
mux.HandleFunc(tasks.TypeTriggerLogicalRestore, func(ctx context.Context, t *asynq.Task) error {
71-
return workers.HandleTriggerLogicalRestore(ctx, t, asynqClient, db, cfg, log)
69+
// Restore workflow tasks
70+
mux.HandleFunc(tasks.TypeTriggerRestore, func(ctx context.Context, t *asynq.Task) error {
71+
return workers.HandleTriggerRestore(ctx, t, asynqClient, db, cfg, log)
7272
})
73-
mux.HandleFunc(tasks.TypeLogicalRestoreWaitComplete, func(ctx context.Context, t *asynq.Task) error {
74-
return workers.HandleLogicalRestoreWaitComplete(ctx, t, asynqClient, db, log)
73+
mux.HandleFunc(tasks.TypeRestoreWaitComplete, func(ctx context.Context, t *asynq.Task) error {
74+
return workers.HandleRestoreWaitComplete(ctx, t, asynqClient, db, log)
7575
})
7676

7777
// Start refresh scheduler goroutine (checks every hour for instances needing refresh)

internal/models/models.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,16 @@ type Config struct {
3131
// Authentication configuration
3232
JWTSecret string `json:"-" gorm:"type:varchar(64);not null"` // Auto-generated on first setup (64 hex chars)
3333

34-
// Database source configuration
35-
ConnectionString string `json:"connection_string" gorm:"type:text"` // PostgreSQL connection string
34+
// Database source configuration (mutually exclusive: use either ConnectionString OR Crunchy Bridge)
35+
ConnectionString string `json:"connection_string" gorm:"type:text"` // PostgreSQL connection string for logical restore
3636
PostgresVersion string `json:"postgres_version"`
3737
SchemaOnly bool `json:"schema_only" gorm:"not null;default:true"` // If true, only restore schema (no data)
3838

39+
// Crunchy Bridge integration (alternative to ConnectionString)
40+
CrunchyBridgeAPIKey string `json:"crunchy_bridge_api_key" gorm:"type:text"` // Crunchy Bridge API key
41+
CrunchyBridgeClusterName string `json:"crunchy_bridge_cluster_name" gorm:"type:text"` // Cluster name
42+
CrunchyBridgeDatabaseName string `json:"crunchy_bridge_database_name" gorm:"type:text"` // Database name
43+
3944
// PostgreSQL configuration for branches
4045
BranchPostgresqlConf string `json:"branch_postgresql_conf" gorm:"type:text"`
4146

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package providers
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"net/url"
10+
"strings"
11+
"time"
12+
)
13+
14+
const CrunchyBridgeAPIBaseURL = "https://api.crunchybridge.com"
15+
16+
// CrunchyBridge API docs:
17+
// - https://docs.crunchybridge.com/api/cluster
18+
// - https://docs.crunchybridge.com/api/cluster-backup
19+
20+
type CrunchyBridgeClient struct {
21+
APIKey string
22+
BaseURL string
23+
client *http.Client
24+
}
25+
26+
func NewCrunchyBridgeClient(apiKey string) *CrunchyBridgeClient {
27+
return &CrunchyBridgeClient{
28+
APIKey: apiKey,
29+
BaseURL: CrunchyBridgeAPIBaseURL,
30+
client: &http.Client{
31+
Timeout: 30 * time.Second,
32+
},
33+
}
34+
}
35+
36+
type Cluster struct {
37+
ID string `json:"id"`
38+
Name string `json:"name"`
39+
Host string `json:"host"`
40+
ProviderID string `json:"provider_id"`
41+
RegionID string `json:"region_id"`
42+
MajorVersion int `json:"major_version"`
43+
IsHA bool `json:"is_ha"`
44+
TeamID string `json:"team_id"`
45+
State string `json:"state"`
46+
}
47+
48+
type Backup struct {
49+
Name string `json:"name"`
50+
Type string `json:"type"`
51+
StartedAt time.Time `json:"started_at"`
52+
FinishedAt time.Time `json:"finished_at"`
53+
LSNStart string `json:"lsn_start"`
54+
LSNStop string `json:"lsn_stop"`
55+
SizeBytes int64 `json:"size_bytes"`
56+
}
57+
58+
type BackupToken struct {
59+
AWS *AWSConfig `json:"aws,omitempty"`
60+
Azure *AzureConfig `json:"azure,omitempty"`
61+
GCP *GCPConfig `json:"gcp,omitempty"`
62+
RepoPath string `json:"repo_path"`
63+
Type string `json:"type"`
64+
Stanza string `json:"stanza"`
65+
}
66+
67+
type AWSConfig struct {
68+
S3Bucket string `json:"s3_bucket"`
69+
S3Key string `json:"s3_key"`
70+
S3KeySecret string `json:"s3_key_secret"`
71+
S3Region string `json:"s3_region"`
72+
S3Token string `json:"s3_token"`
73+
}
74+
75+
type AzureConfig struct {
76+
StorageAccount string `json:"storage_account"`
77+
StorageKey string `json:"storage_key"`
78+
Container string `json:"container"`
79+
}
80+
81+
type GCPConfig struct {
82+
Bucket string `json:"bucket"`
83+
ServiceAccountKey string `json:"service_account_key"`
84+
}
85+
86+
type PostgresRole struct {
87+
ClusterID string `json:"cluster_id"`
88+
Name string `json:"name"`
89+
Flavor string `json:"flavor"`
90+
Password string `json:"password"`
91+
URI string `json:"uri"`
92+
TeamID string `json:"team_id"`
93+
}
94+
95+
func (c *CrunchyBridgeClient) FindClusterByName(name string) (*Cluster, error) {
96+
clusters, err := c.listClusters()
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
for _, cluster := range clusters {
102+
if cluster.Name == name {
103+
return &cluster, nil
104+
}
105+
}
106+
107+
return nil, fmt.Errorf("cluster with name '%s' not found", name)
108+
}
109+
110+
func (c *CrunchyBridgeClient) listClusters() ([]Cluster, error) {
111+
var allClusters []Cluster
112+
cursor := ""
113+
114+
for {
115+
params := url.Values{}
116+
params.Set("limit", "200") // Max allowed
117+
if cursor != "" {
118+
params.Set("cursor", cursor)
119+
}
120+
121+
url := fmt.Sprintf("%s/clusters?%s", c.BaseURL, params.Encode())
122+
123+
resp, err := c.makeRequest("GET", url, nil)
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to list clusters: %w", err)
126+
}
127+
128+
var response struct {
129+
Clusters []Cluster `json:"clusters"`
130+
HasMore bool `json:"has_more"`
131+
NextCursor string `json:"next_cursor"`
132+
}
133+
134+
if err := json.Unmarshal(resp, &response); err != nil {
135+
return nil, fmt.Errorf("failed to parse clusters response: %w", err)
136+
}
137+
138+
allClusters = append(allClusters, response.Clusters...)
139+
140+
if !response.HasMore {
141+
break
142+
}
143+
cursor = response.NextCursor
144+
}
145+
146+
return allClusters, nil
147+
}
148+
149+
func (c *CrunchyBridgeClient) CreateBackupToken(clusterID string) (*BackupToken, error) {
150+
url := fmt.Sprintf("%s/clusters/%s/backup-tokens", c.BaseURL, clusterID)
151+
152+
resp, err := c.makeRequest("POST", url, nil)
153+
if err != nil {
154+
return nil, fmt.Errorf("failed to create backup token: %w", err)
155+
}
156+
157+
var token BackupToken
158+
if err := json.Unmarshal(resp, &token); err != nil {
159+
return nil, fmt.Errorf("failed to parse backup token response: %w", err)
160+
}
161+
162+
return &token, nil
163+
}
164+
165+
// makeRequest performs HTTP request with authentication
166+
func (c *CrunchyBridgeClient) makeRequest(method, url string, body []byte) ([]byte, error) {
167+
var reqBody io.Reader
168+
if body != nil {
169+
reqBody = bytes.NewReader(body)
170+
}
171+
172+
req, err := http.NewRequest(method, url, reqBody)
173+
if err != nil {
174+
return nil, fmt.Errorf("failed to create request: %w", err)
175+
}
176+
177+
req.Header.Set("Authorization", "Bearer "+c.APIKey)
178+
req.Header.Set("Content-Type", "application/json")
179+
req.Header.Set("Accept", "application/json")
180+
req.Header.Set("User-Agent", "branchd.dev")
181+
182+
resp, err := c.client.Do(req)
183+
if err != nil {
184+
return nil, fmt.Errorf("request failed: %w", err)
185+
}
186+
defer resp.Body.Close()
187+
188+
responseBody, err := io.ReadAll(resp.Body)
189+
if err != nil {
190+
return nil, fmt.Errorf("failed to read response: %w", err)
191+
}
192+
193+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
194+
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(responseBody))
195+
}
196+
197+
return responseBody, nil
198+
}
199+
200+
func (t *BackupToken) GeneratePgBackRestConfig(stanzaName, pgDataPath string) string {
201+
var config strings.Builder
202+
203+
config.WriteString("[global]\n")
204+
config.WriteString("log-path=/var/log/pgbackrest\n")
205+
config.WriteString("spool-path=/var/spool/pgbackrest\n")
206+
config.WriteString("lock-path=/tmp\n")
207+
config.WriteString("\n")
208+
209+
config.WriteString(fmt.Sprintf("[%s]\n", stanzaName))
210+
config.WriteString(fmt.Sprintf("pg1-path=%s\n", pgDataPath))
211+
config.WriteString(fmt.Sprintf("repo1-path=%s\n", t.RepoPath))
212+
213+
switch t.Type {
214+
case "s3":
215+
if t.AWS != nil {
216+
config.WriteString("repo1-type=s3\n")
217+
config.WriteString(fmt.Sprintf("repo1-s3-bucket=%s\n", t.AWS.S3Bucket))
218+
config.WriteString(fmt.Sprintf("repo1-s3-key=%s\n", t.AWS.S3Key))
219+
config.WriteString(fmt.Sprintf("repo1-s3-key-secret=%s\n", t.AWS.S3KeySecret))
220+
config.WriteString(fmt.Sprintf("repo1-s3-region=%s\n", t.AWS.S3Region))
221+
config.WriteString("repo1-s3-endpoint=s3.amazonaws.com\n") // CrunchyBridge S3 endpoint
222+
config.WriteString(fmt.Sprintf("repo1-s3-token=%s\n", t.AWS.S3Token)) // STS session token
223+
}
224+
case "azure":
225+
if t.Azure != nil {
226+
config.WriteString("repo1-type=azure\n")
227+
config.WriteString(fmt.Sprintf("repo1-azure-account=%s\n", t.Azure.StorageAccount))
228+
config.WriteString(fmt.Sprintf("repo1-azure-key=%s\n", t.Azure.StorageKey))
229+
config.WriteString(fmt.Sprintf("repo1-azure-container=%s\n", t.Azure.Container))
230+
}
231+
case "gcs", "gcp":
232+
if t.GCP != nil {
233+
config.WriteString("repo1-type=gcs\n")
234+
config.WriteString(fmt.Sprintf("repo1-gcs-bucket=%s\n", t.GCP.Bucket))
235+
config.WriteString(fmt.Sprintf("repo1-gcs-key=%s\n", t.GCP.ServiceAccountKey))
236+
}
237+
}
238+
239+
return config.String()
240+
}

internal/server/restore_handlers.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,20 @@ func (s *Server) triggerRestore(c *gin.Context) {
131131
return
132132
}
133133

134-
// Validate that connection string is set
135-
if config.ConnectionString == "" {
136-
c.JSON(http.StatusBadRequest, gin.H{"error": "Connection string not configured"})
134+
// Validate that a restore source is configured (either connection string or Crunchy Bridge)
135+
hasConnectionString := config.ConnectionString != ""
136+
hasCrunchyBridge := config.CrunchyBridgeAPIKey != ""
137+
138+
if !hasConnectionString && !hasCrunchyBridge {
139+
c.JSON(http.StatusBadRequest, gin.H{"error": "No restore source configured (need either connection string or Crunchy Bridge credentials)"})
137140
return
138141
}
139142

140-
s.logger.Info().Str("config_id", config.ID).Msg("Manually triggering restore")
143+
s.logger.Info().
144+
Str("config_id", config.ID).
145+
Bool("has_connection_string", hasConnectionString).
146+
Bool("has_crunchy_bridge", hasCrunchyBridge).
147+
Msg("Manually triggering restore")
141148

142149
// Create a new restore record with UTC datetime-based name (e.g., restore_20251017143202)
143150
restore := models.Restore{
@@ -153,7 +160,7 @@ func (s *Server) triggerRestore(c *gin.Context) {
153160
}
154161

155162
// Enqueue restore task
156-
restoreTask, err := tasks.NewTriggerLogicalRestoreTask(restore.ID)
163+
restoreTask, err := tasks.NewTriggerRestoreTask(restore.ID)
157164
if err != nil {
158165
s.logger.Error().Err(err).Str("restore_id", restore.ID).Msg("Failed to create restore task")
159166
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to start restore"})

internal/tasks/tasks.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,35 @@ import (
99

1010
// Task type constants
1111
const (
12-
// Database restore tasks (used for local execution)
13-
TypeTriggerLogicalRestore = "logical_restore:trigger"
14-
TypeLogicalRestoreWaitComplete = "logical_restore:wait_complete"
12+
TypeTriggerRestore = "restore:trigger"
13+
TypeRestoreWaitComplete = "restore:wait_complete"
1514
)
1615

1716
// TaskPayload is the common payload for all tasks
1817
type TaskPayload struct {
1918
RestoreID string `json:"database_id,omitempty"`
2019
}
2120

22-
// NewTriggerLogicalRestoreTask creates a task to execute pg_dump/restore
23-
func NewTriggerLogicalRestoreTask(restoreID string) (*asynq.Task, error) {
21+
// NewTriggerRestoreTask creates a task to trigger a database restore
22+
func NewTriggerRestoreTask(restoreID string) (*asynq.Task, error) {
2423
payload, err := json.Marshal(TaskPayload{
2524
RestoreID: restoreID,
2625
})
2726
if err != nil {
2827
return nil, fmt.Errorf("failed to marshal payload: %w", err)
2928
}
30-
return asynq.NewTask(TypeTriggerLogicalRestore, payload), nil
29+
return asynq.NewTask(TypeTriggerRestore, payload), nil
3130
}
3231

33-
// NewTriggerLogicalRestoreWaitCompleteTask creates a task to wait for pg_dump/restore completion
34-
func NewTriggerLogicalRestoreWaitCompleteTask(restoreID string) (*asynq.Task, error) {
32+
// NewTriggerRestoreWaitCompleteTask creates a task to wait for restore completion
33+
func NewTriggerRestoreWaitCompleteTask(restoreID string) (*asynq.Task, error) {
3534
payload, err := json.Marshal(TaskPayload{
3635
RestoreID: restoreID,
3736
})
3837
if err != nil {
3938
return nil, fmt.Errorf("failed to marshal payload: %w", err)
4039
}
41-
return asynq.NewTask(TypeLogicalRestoreWaitComplete, payload), nil
40+
return asynq.NewTask(TypeRestoreWaitComplete, payload), nil
4241
}
4342

4443
// ParseTaskPayload parses task payload from Asynq task

0 commit comments

Comments
 (0)