Skip to content

Commit b3cc4f3

Browse files
committed
Add victoriametrics database driver
1 parent 990fb51 commit b3cc4f3

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

database/victoria/victoria.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package victoria
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"context"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"net/url"
12+
"strings"
13+
"time"
14+
15+
"github.com/golang-migrate/migrate/v4/database"
16+
)
17+
18+
func init() {
19+
database.Register("victoria", &Victoria{})
20+
}
21+
22+
// Victoria implements the database.Driver interface for VictoriaMetrics time series database
23+
type Victoria struct {
24+
client *http.Client
25+
url string
26+
isLocked bool
27+
isOpen bool
28+
config *Config
29+
importURL string
30+
exportURL string
31+
}
32+
33+
// Config holds the configuration parameters for VictoriaMetrics connection
34+
type Config struct {
35+
URL string
36+
LabelFilter string
37+
StartTime string
38+
EndTime string
39+
Timeout time.Duration
40+
}
41+
42+
// Open initializes the VictoriaMetrics driver
43+
func (v *Victoria) Open(dsn string) (database.Driver, error) {
44+
if v.client == nil {
45+
v.client = &http.Client{
46+
Timeout: 30 * time.Second,
47+
}
48+
}
49+
50+
config, err := parseConfig(dsn)
51+
if err != nil {
52+
return nil, err
53+
}
54+
55+
v.config = config
56+
v.url = config.URL
57+
v.importURL = config.URL + "/api/v1/import"
58+
v.exportURL = config.URL + "/api/v1/export"
59+
v.isOpen = true
60+
61+
return v, nil
62+
}
63+
64+
// parseConfig parses the DSN into a Config struct
65+
func parseConfig(dsn string) (*Config, error) {
66+
u, err := url.Parse(dsn)
67+
if err != nil {
68+
return nil, fmt.Errorf("invalid VictoriaMetrics DSN: %w", err)
69+
}
70+
71+
if u.Scheme != "victoria" {
72+
return nil, fmt.Errorf("invalid scheme for VictoriaMetrics: %s", u.Scheme)
73+
}
74+
75+
// Construct the base URL with scheme, host, and port
76+
baseURL := "http://" + u.Host
77+
if u.User != nil {
78+
// Handle authentication if provided
79+
password, _ := u.User.Password()
80+
baseURL = "http://" + u.User.Username() + ":" + password + "@" + u.Host
81+
}
82+
83+
// Extract query parameters
84+
timeout := 30 * time.Second
85+
if timeoutStr := u.Query().Get("timeout"); timeoutStr != "" {
86+
timeoutVal, err := time.ParseDuration(timeoutStr)
87+
if err == nil && timeoutVal > 0 {
88+
timeout = timeoutVal
89+
}
90+
}
91+
92+
return &Config{
93+
URL: baseURL,
94+
LabelFilter: u.Query().Get("label_filter"),
95+
StartTime: u.Query().Get("start"),
96+
EndTime: u.Query().Get("end"),
97+
Timeout: timeout,
98+
}, nil
99+
}
100+
101+
// Close closes the connection to VictoriaMetrics
102+
func (v *Victoria) Close() error {
103+
v.isOpen = false
104+
if v.client != nil {
105+
v.client.CloseIdleConnections()
106+
}
107+
return nil
108+
}
109+
110+
// Lock acquires a database lock (no-op for VictoriaMetrics)
111+
func (v *Victoria) Lock() error {
112+
if !v.isOpen {
113+
return database.ErrLocked
114+
}
115+
v.isLocked = true
116+
return nil
117+
}
118+
119+
// Unlock releases a database lock (no-op for VictoriaMetrics)
120+
func (v *Victoria) Unlock() error {
121+
if !v.isOpen {
122+
return database.ErrLocked
123+
}
124+
v.isLocked = false
125+
return nil
126+
}
127+
128+
// Run executes a migration by importing data into VictoriaMetrics
129+
func (v *Victoria) Run(migration io.Reader) error {
130+
if !v.isOpen {
131+
return database.ErrClosed
132+
}
133+
134+
if !v.isLocked {
135+
return database.ErrLocked
136+
}
137+
138+
// Buffer to collect migration data
139+
var migrationBuffer bytes.Buffer
140+
141+
// Read migration content
142+
scanner := bufio.NewScanner(migration)
143+
scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) // 4MB buffer
144+
145+
for scanner.Scan() {
146+
line := scanner.Text()
147+
line = strings.TrimSpace(line)
148+
if line == "" || strings.HasPrefix(line, "--") {
149+
continue // Skip empty lines and comments
150+
}
151+
migrationBuffer.WriteString(line)
152+
migrationBuffer.WriteString("\n")
153+
}
154+
155+
if err := scanner.Err(); err != nil {
156+
return fmt.Errorf("error reading migration: %w", err)
157+
}
158+
159+
// If we have content to import
160+
if migrationBuffer.Len() > 0 {
161+
// Send data to VictoriaMetrics
162+
req, err := http.NewRequest(http.MethodPost, v.importURL, &migrationBuffer)
163+
if err != nil {
164+
return fmt.Errorf("failed to create import request: %w", err)
165+
}
166+
req.Header.Set("Content-Type", "application/json")
167+
168+
resp, err := v.client.Do(req)
169+
if err != nil {
170+
return fmt.Errorf("failed to import data: %w", err)
171+
}
172+
defer resp.Body.Close()
173+
174+
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
175+
bodyBytes, _ := io.ReadAll(resp.Body)
176+
return fmt.Errorf("import failed with status %d: %s", resp.StatusCode, string(bodyBytes))
177+
}
178+
}
179+
180+
return nil
181+
}
182+
183+
// SetVersion sets the migration version (no-op for VictoriaMetrics)
184+
func (v *Victoria) SetVersion(version int, dirty bool) error {
185+
// VictoriaMetrics doesn't have schema version tracking
186+
return nil
187+
}
188+
189+
// Version returns the current migration version (no-op for VictoriaMetrics)
190+
func (v *Victoria) Version() (int, bool, error) {
191+
// VictoriaMetrics doesn't support version tracking
192+
return -1, false, nil
193+
}
194+
195+
// Drop clears all data (not supported in VictoriaMetrics)
196+
func (v *Victoria) Drop() error {
197+
return errors.New("drop operation is not supported in VictoriaMetrics")
198+
}
199+
200+
// Ensure Victoria implements the database.Driver interface
201+
var _ database.Driver = (*Victoria)(nil)
202+
var _ database.Locker = (*Victoria)(nil)

0 commit comments

Comments
 (0)