Skip to content

Commit efa1e18

Browse files
committed
Add victoriametrics database driver
1 parent b3cc4f3 commit efa1e18

File tree

7 files changed

+364
-7
lines changed

7 files changed

+364
-7
lines changed

database/victoria/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# VictoriaMetrics
2+
3+
This driver enables golang-migrate to work with [VictoriaMetrics](https://victoriametrics.com/), a high-performance time series database.
4+
5+
## Usage
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- This is just a placeholder as VictoriaMetrics doesn't have a way to revert imports
2+
-- Instead, you would typically filter by time range or labels to exclude the data
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"metric":{"__name__":"up","job":"migrate_test"},"values":[1],"timestamps":[1596698684000]}
2+
{"metric":{"__name__":"cpu_usage","instance":"server1","job":"migrate_test"},"values":[0.45,0.52,0.48],"timestamps":[1596698684000,1596698694000,1596698704000]}
3+
{"metric":{"__name__":"memory_usage","instance":"server1","job":"migrate_test"},"values":[0.25,0.28,0.22],"timestamps":[1596698684000,1596698694000,1596698704000]}

database/victoria/export.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package victoria
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"net/url"
9+
)
10+
11+
// Export reads data from VictoriaMetrics based on label filters and time range
12+
func (v *Victoria) Export(ctx context.Context, w io.Writer) error {
13+
if !v.isOpen {
14+
return ErrClosed
15+
}
16+
17+
// Build query parameters
18+
query := url.Values{}
19+
if v.config.LabelFilter != "" {
20+
query.Set("match[]", v.config.LabelFilter)
21+
}
22+
if v.config.StartTime != "" {
23+
query.Set("start", v.config.StartTime)
24+
}
25+
if v.config.EndTime != "" {
26+
query.Set("end", v.config.EndTime)
27+
}
28+
29+
// Create request with context
30+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, v.exportURL+"?"+query.Encode(), nil)
31+
if err != nil {
32+
return fmt.Errorf("failed to create export request: %w", err)
33+
}
34+
35+
// Set headers
36+
req.Header.Set("Accept", "application/json")
37+
req.Header.Set("Accept-Encoding", "gzip")
38+
39+
// Execute request
40+
resp, err := v.client.Do(req)
41+
if err != nil {
42+
return fmt.Errorf("failed to execute export request: %w", err)
43+
}
44+
defer resp.Body.Close()
45+
46+
// Check response status
47+
if resp.StatusCode != http.StatusOK {
48+
bodyBytes, _ := io.ReadAll(resp.Body)
49+
return fmt.Errorf("export failed with status %d: %s", resp.StatusCode, string(bodyBytes))
50+
}
51+
52+
// Copy response body to writer
53+
_, err = io.Copy(w, resp.Body)
54+
if err != nil {
55+
return fmt.Errorf("failed to read export data: %w", err)
56+
}
57+
58+
return nil
59+
}

database/victoria/export_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package victoria
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestVictoriaExport(t *testing.T) {
15+
// Sample export data
16+
exportData := `{"metric":{"__name__":"up","job":"test"},"values":[1],"timestamps":[1596698684000]}
17+
{"metric":{"__name__":"cpu_usage","instance":"server1"},"values":[0.45],"timestamps":[1596698684000]}`
18+
19+
// Setup test server
20+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
21+
if r.URL.Path == "/api/v1/export" {
22+
// Check query parameters
23+
query := r.URL.Query()
24+
assert.Equal(t, "{__name__=\"up\"}", query.Get("match[]"))
25+
assert.Equal(t, "2020-01-01T00:00:00Z", query.Get("start"))
26+
assert.Equal(t, "2020-01-02T00:00:00Z", query.Get("end"))
27+
28+
// Return export data
29+
w.WriteHeader(http.StatusOK)
30+
w.Write([]byte(exportData))
31+
} else {
32+
w.WriteHeader(http.StatusNotFound)
33+
}
34+
}))
35+
defer testServer.Close()
36+
37+
// Parse the URL for our test server
38+
serverURL := strings.TrimPrefix(testServer.URL, "http://")
39+
40+
// Create driver
41+
d := &Victoria{}
42+
dsn := "victoria://" + serverURL + "?label_filter={__name__=\"up\"}&start=2020-01-01T00:00:00Z&end=2020-01-02T00:00:00Z"
43+
// No need to store the returned driver since we're testing the receiver methods directly
44+
_, err := d.Open(dsn)
45+
assert.NoError(t, err)
46+
47+
// Test export
48+
var buf bytes.Buffer
49+
err = d.Export(context.Background(), &buf)
50+
assert.NoError(t, err)
51+
assert.Equal(t, exportData, buf.String())
52+
53+
// Test export with closed connection
54+
d.Close()
55+
err = d.Export(context.Background(), &buf)
56+
assert.Error(t, err)
57+
assert.Equal(t, ErrClosed, err)
58+
}
59+
60+
func TestVictoriaExportError(t *testing.T) {
61+
// Setup test server that returns an error
62+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
63+
w.WriteHeader(http.StatusInternalServerError)
64+
w.Write([]byte("Internal server error"))
65+
}))
66+
defer testServer.Close()
67+
68+
// Parse the URL for our test server
69+
serverURL := strings.TrimPrefix(testServer.URL, "http://")
70+
71+
// Create driver
72+
d := &Victoria{}
73+
dsn := "victoria://" + serverURL
74+
_, err := d.Open(dsn)
75+
assert.NoError(t, err)
76+
77+
// Test export
78+
var buf bytes.Buffer
79+
err = d.Export(context.Background(), &buf)
80+
assert.Error(t, err)
81+
assert.Contains(t, err.Error(), "status 500")
82+
}

database/victoria/victoria.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package victoria
33
import (
44
"bufio"
55
"bytes"
6-
"context"
76
"errors"
87
"fmt"
98
"io"
@@ -15,6 +14,12 @@ import (
1514
"github.com/golang-migrate/migrate/v4/database"
1615
)
1716

17+
// Define error constants if they don't exist in the database package
18+
var (
19+
ErrLocked = errors.New("database is locked")
20+
ErrClosed = errors.New("database connection is closed")
21+
)
22+
1823
func init() {
1924
database.Register("victoria", &Victoria{})
2025
}
@@ -110,7 +115,7 @@ func (v *Victoria) Close() error {
110115
// Lock acquires a database lock (no-op for VictoriaMetrics)
111116
func (v *Victoria) Lock() error {
112117
if !v.isOpen {
113-
return database.ErrLocked
118+
return ErrLocked
114119
}
115120
v.isLocked = true
116121
return nil
@@ -119,7 +124,7 @@ func (v *Victoria) Lock() error {
119124
// Unlock releases a database lock (no-op for VictoriaMetrics)
120125
func (v *Victoria) Unlock() error {
121126
if !v.isOpen {
122-
return database.ErrLocked
127+
return ErrLocked
123128
}
124129
v.isLocked = false
125130
return nil
@@ -128,16 +133,16 @@ func (v *Victoria) Unlock() error {
128133
// Run executes a migration by importing data into VictoriaMetrics
129134
func (v *Victoria) Run(migration io.Reader) error {
130135
if !v.isOpen {
131-
return database.ErrClosed
136+
return ErrClosed
132137
}
133138

134139
if !v.isLocked {
135-
return database.ErrLocked
140+
return ErrLocked
136141
}
137142

138143
// Buffer to collect migration data
139144
var migrationBuffer bytes.Buffer
140-
145+
141146
// Read migration content
142147
scanner := bufio.NewScanner(migration)
143148
scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) // 4MB buffer
@@ -199,4 +204,3 @@ func (v *Victoria) Drop() error {
199204

200205
// Ensure Victoria implements the database.Driver interface
201206
var _ database.Driver = (*Victoria)(nil)
202-
var _ database.Locker = (*Victoria)(nil)

0 commit comments

Comments
 (0)