Skip to content

Commit 100f8f0

Browse files
authored
feat: add milvus support (#31)
1 parent ffee3d8 commit 100f8f0

File tree

12 files changed

+1191
-7
lines changed

12 files changed

+1191
-7
lines changed

go.mod

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/go-sql-driver/mysql v1.9.3
1212
github.com/lib/pq v1.10.9
1313
github.com/microsoft/go-mssqldb v1.9.3
14+
github.com/milvus-io/milvus-sdk-go/v2 v2.4.2
1415
github.com/prometheus/client_golang v1.23.0
1516
github.com/prometheus/common v0.65.0
1617
github.com/redis/go-redis/v9 v9.17.1
@@ -28,23 +29,37 @@ require (
2829
filippo.io/edwards25519 v1.1.0 // indirect
2930
github.com/beorn7/perks v1.0.1 // indirect
3031
github.com/cespare/xxhash/v2 v2.3.0 // indirect
32+
github.com/cockroachdb/errors v1.9.1 // indirect
33+
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
34+
github.com/cockroachdb/redact v1.1.3 // indirect
3135
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3236
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
37+
github.com/getsentry/sentry-go v0.12.0 // indirect
3338
github.com/goccy/go-json v0.10.0 // indirect
39+
github.com/gogo/protobuf v1.3.2 // indirect
3440
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
3541
github.com/golang-sql/sqlexp v0.1.0 // indirect
42+
github.com/golang/protobuf v1.5.4 // indirect
3643
github.com/google/flatbuffers v23.1.21+incompatible // indirect
3744
github.com/google/uuid v1.6.0 // indirect
45+
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
3846
github.com/inconshreveable/mousetrap v1.1.0 // indirect
3947
github.com/klauspost/compress v1.18.0 // indirect
4048
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
49+
github.com/kr/pretty v0.3.1 // indirect
50+
github.com/kr/text v0.2.0 // indirect
51+
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.10-0.20240819025435-512e3b98866a // indirect
4152
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4253
github.com/pierrec/lz4/v4 v4.1.17 // indirect
54+
github.com/pkg/errors v0.9.1 // indirect
4355
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
4456
github.com/prometheus/client_model v0.6.2 // indirect
4557
github.com/prometheus/procfs v0.16.1 // indirect
4658
github.com/rogpeppe/go-internal v1.13.1 // indirect
4759
github.com/spf13/pflag v1.0.9 // indirect
60+
github.com/tidwall/gjson v1.14.4 // indirect
61+
github.com/tidwall/match v1.1.1 // indirect
62+
github.com/tidwall/pretty v1.2.0 // indirect
4863
github.com/yuin/gopher-lua v1.1.1 // indirect
4964
github.com/zeebo/xxh3 v1.0.2 // indirect
5065
go.uber.org/multierr v1.11.0 // indirect

go.sum

Lines changed: 358 additions & 0 deletions
Large diffs are not rendered by default.

internal/cmd/version/version.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717
* under the License.
1818
*/
1919

20-
// Copyright Envoy Gateway Authors
21-
// SPDX-License-Identifier: Apache-2.0
22-
// The full text of the Apache license is available in the LICENSE file at
23-
// the root of the repo.
24-
2520
package version
2621

2722
import (

internal/collector/basic/redis/redis_collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/redis/go-redis/v9"
3232
"golang.org/x/crypto/ssh"
3333

34-
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
34+
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
3535
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
3636
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
3737
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"

internal/collector/basic/ssh/ssh_collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"strings"
2727
"time"
2828

29-
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
3029
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
3130
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
3231
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
3332
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
33+
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
3434
)
3535

3636
const (

internal/collector/common/collect/dispatch/metrics_collector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
3535
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
3636
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
37+
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus"
3738
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
3839
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
3940
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"

internal/collector/common/types/job/metrics_types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type Metrics struct {
5151
JMX *JMXProtocol `json:"jmx,omitempty"`
5252
Redis *RedisProtocol `json:"redis,omitempty"`
5353
MongoDB *MongoDBProtocol `json:"mongodb,omitempty"`
54+
Milvus *MilvusProtocol `json:"milvus,omitempty"`
5455
}
5556

5657
// Field represents a metric field
@@ -280,6 +281,15 @@ type MongoDBProtocol struct {
280281
Timeout int `json:"timeout"`
281282
}
282283

284+
// MilvusProtocol represents Milvus protocol configuration
285+
type MilvusProtocol struct {
286+
Host string `json:"host"`
287+
Port string `json:"port"`
288+
Username string `json:"username"`
289+
Password string `json:"password"`
290+
Timeout string `json:"timeout"`
291+
}
292+
283293
// GetInterval returns the interval for the metric, using default if not set
284294
func (m *Metrics) GetInterval() time.Duration {
285295
if m.Interval > 0 {
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package milvus
21+
22+
import (
23+
"context"
24+
"fmt"
25+
"strconv"
26+
"strings"
27+
"time"
28+
29+
"github.com/milvus-io/milvus-sdk-go/v2/client"
30+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
31+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
32+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
33+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
34+
)
35+
36+
func init() {
37+
strategy.RegisterFactory(ProtocolMilvus, func(logger logger.Logger) strategy.Collector {
38+
return NewMilvusCollector(logger)
39+
})
40+
}
41+
42+
const (
43+
ProtocolMilvus = "milvus"
44+
)
45+
46+
type MilvusCollector struct {
47+
logger logger.Logger
48+
}
49+
50+
func NewMilvusCollector(log logger.Logger) *MilvusCollector {
51+
return &MilvusCollector{
52+
logger: log.WithName("milvus-collector"),
53+
}
54+
}
55+
56+
func (c *MilvusCollector) Protocol() string {
57+
return ProtocolMilvus
58+
}
59+
60+
func (c *MilvusCollector) PreCheck(metrics *job.Metrics) error {
61+
if metrics == nil || metrics.Milvus == nil {
62+
return fmt.Errorf("milvus configuration is missing")
63+
}
64+
return nil
65+
}
66+
67+
func (c *MilvusCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsData {
68+
start := time.Now()
69+
milvusConfig := metrics.Milvus
70+
71+
// 1. Prepare connection
72+
addr := fmt.Sprintf("%s:%s", milvusConfig.Host, milvusConfig.Port)
73+
74+
timeout := 10 * time.Second
75+
if milvusConfig.Timeout != "" {
76+
if t, err := strconv.Atoi(milvusConfig.Timeout); err == nil {
77+
timeout = time.Duration(t) * time.Millisecond
78+
}
79+
}
80+
81+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
82+
defer cancel()
83+
84+
// 2. Connect
85+
var milvusClient client.Client
86+
var err error
87+
88+
if milvusConfig.Username != "" && milvusConfig.Password != "" {
89+
milvusClient, err = client.NewDefaultGrpcClientWithAuth(ctx, addr, milvusConfig.Username, milvusConfig.Password)
90+
} else {
91+
milvusClient, err = client.NewDefaultGrpcClient(ctx, addr)
92+
}
93+
94+
if err != nil {
95+
return c.createFailResponse(metrics, constants.CollectUnConnectable, fmt.Sprintf("failed to connect: %v", err))
96+
}
97+
defer milvusClient.Close()
98+
99+
// 3. Collect data
100+
// Currently only supports basic availability check and version retrieval
101+
version, err := milvusClient.GetVersion(ctx)
102+
if err != nil {
103+
return c.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to get version: %v", err))
104+
}
105+
106+
responseTime := time.Since(start).Milliseconds()
107+
108+
// 4. Parse/Format Response
109+
response := c.createSuccessResponse(metrics)
110+
row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
111+
112+
for i, field := range metrics.AliasFields {
113+
switch strings.ToLower(field) {
114+
case strings.ToLower(constants.RESPONSE_TIME):
115+
row.Columns[i] = strconv.FormatInt(responseTime, 10)
116+
case "version":
117+
row.Columns[i] = version
118+
case "host":
119+
row.Columns[i] = milvusConfig.Host
120+
case "port":
121+
row.Columns[i] = milvusConfig.Port
122+
default:
123+
row.Columns[i] = constants.NULL_VALUE
124+
}
125+
}
126+
127+
response.Values = append(response.Values, row)
128+
return response
129+
}
130+
131+
func (c *MilvusCollector) createSuccessResponse(metrics *job.Metrics) *job.CollectRepMetricsData {
132+
return &job.CollectRepMetricsData{
133+
Metrics: metrics.Name,
134+
Time: time.Now().UnixMilli(),
135+
Code: constants.CollectSuccess,
136+
Msg: "success",
137+
Values: make([]job.ValueRow, 0),
138+
}
139+
}
140+
141+
func (c *MilvusCollector) createFailResponse(metrics *job.Metrics, code int, msg string) *job.CollectRepMetricsData {
142+
return &job.CollectRepMetricsData{
143+
Metrics: metrics.Name,
144+
Time: time.Now().UnixMilli(),
145+
Code: code,
146+
Msg: msg,
147+
}
148+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package milvus
21+
22+
import (
23+
"os"
24+
"testing"
25+
26+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
27+
loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
28+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
29+
)
30+
31+
func TestMilvusCollector_Collect(t *testing.T) {
32+
metrics := &job.Metrics{
33+
Name: "milvus_test",
34+
Milvus: &job.MilvusProtocol{
35+
Host: "localhost",
36+
Port: "19530",
37+
},
38+
AliasFields: []string{"version", "responseTime", "host", "port"},
39+
}
40+
41+
l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
42+
collector := NewMilvusCollector(l)
43+
result := collector.Collect(metrics)
44+
45+
if result.Code != 0 {
46+
t.Logf("Collect failed: %s", result.Msg)
47+
} else {
48+
t.Logf("Collect success: %+v", result.Values)
49+
}
50+
}

0 commit comments

Comments
 (0)