Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/go-sql-driver/mysql v1.9.3
github.com/lib/pq v1.10.9
github.com/microsoft/go-mssqldb v1.9.3
github.com/milvus-io/milvus-sdk-go/v2 v2.4.2
github.com/prometheus/client_golang v1.23.0
github.com/prometheus/common v0.65.0
github.com/redis/go-redis/v9 v9.17.1
Expand All @@ -28,23 +29,37 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/flatbuffers v23.1.21+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.10-0.20240819025435-512e3b98866a // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
358 changes: 358 additions & 0 deletions go.sum

Large diffs are not rendered by default.

5 changes: 0 additions & 5 deletions internal/cmd/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
* under the License.
*/

// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package version

import (
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/basic/redis/redis_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/redis/go-redis/v9"
"golang.org/x/crypto/ssh"

sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/basic/ssh/ssh_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"strings"
"time"

sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
)

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
Expand Down
10 changes: 10 additions & 0 deletions internal/collector/common/types/job/metrics_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Metrics struct {
JMX *JMXProtocol `json:"jmx,omitempty"`
Redis *RedisProtocol `json:"redis,omitempty"`
MongoDB *MongoDBProtocol `json:"mongodb,omitempty"`
Milvus *MilvusProtocol `json:"milvus,omitempty"`
}

// Field represents a metric field
Expand Down Expand Up @@ -280,6 +281,15 @@ type MongoDBProtocol struct {
Timeout int `json:"timeout"`
}

// MilvusProtocol represents Milvus protocol configuration
type MilvusProtocol struct {
Host string `json:"host"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Timeout string `json:"timeout"`
}

// GetInterval returns the interval for the metric, using default if not set
func (m *Metrics) GetInterval() time.Duration {
if m.Interval > 0 {
Expand Down
148 changes: 148 additions & 0 deletions internal/collector/extension/milvus/milvus_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package milvus

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/milvus-io/milvus-sdk-go/v2/client"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)

func init() {
strategy.RegisterFactory(ProtocolMilvus, func(logger logger.Logger) strategy.Collector {
return NewMilvusCollector(logger)
})
}

const (
ProtocolMilvus = "milvus"
)

type MilvusCollector struct {
logger logger.Logger
}

func NewMilvusCollector(log logger.Logger) *MilvusCollector {
return &MilvusCollector{
logger: log.WithName("milvus-collector"),
}
}

func (c *MilvusCollector) Protocol() string {
return ProtocolMilvus
}

func (c *MilvusCollector) PreCheck(metrics *job.Metrics) error {
if metrics == nil || metrics.Milvus == nil {
return fmt.Errorf("milvus configuration is missing")
}
return nil
}

func (c *MilvusCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsData {
start := time.Now()
milvusConfig := metrics.Milvus

// 1. Prepare connection
addr := fmt.Sprintf("%s:%s", milvusConfig.Host, milvusConfig.Port)

timeout := 10 * time.Second
if milvusConfig.Timeout != "" {
if t, err := strconv.Atoi(milvusConfig.Timeout); err == nil {
timeout = time.Duration(t) * time.Millisecond
}
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// 2. Connect
var milvusClient client.Client
var err error

if milvusConfig.Username != "" && milvusConfig.Password != "" {
milvusClient, err = client.NewDefaultGrpcClientWithAuth(ctx, addr, milvusConfig.Username, milvusConfig.Password)
} else {
milvusClient, err = client.NewDefaultGrpcClient(ctx, addr)
}

if err != nil {
return c.createFailResponse(metrics, constants.CollectUnConnectable, fmt.Sprintf("failed to connect: %v", err))
}
defer milvusClient.Close()

// 3. Collect data
// Currently only supports basic availability check and version retrieval
version, err := milvusClient.GetVersion(ctx)
if err != nil {
return c.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to get version: %v", err))
}

responseTime := time.Since(start).Milliseconds()

// 4. Parse/Format Response
response := c.createSuccessResponse(metrics)
row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}

for i, field := range metrics.AliasFields {
switch strings.ToLower(field) {
case strings.ToLower(constants.RESPONSE_TIME):
row.Columns[i] = strconv.FormatInt(responseTime, 10)
case "version":
row.Columns[i] = version
case "host":
row.Columns[i] = milvusConfig.Host
case "port":
row.Columns[i] = milvusConfig.Port
default:
row.Columns[i] = constants.NULL_VALUE
}
}

response.Values = append(response.Values, row)
return response
}

func (c *MilvusCollector) createSuccessResponse(metrics *job.Metrics) *job.CollectRepMetricsData {
return &job.CollectRepMetricsData{
Metrics: metrics.Name,
Time: time.Now().UnixMilli(),
Code: constants.CollectSuccess,
Msg: "success",
Values: make([]job.ValueRow, 0),
}
}

func (c *MilvusCollector) createFailResponse(metrics *job.Metrics, code int, msg string) *job.CollectRepMetricsData {
return &job.CollectRepMetricsData{
Metrics: metrics.Name,
Time: time.Now().UnixMilli(),
Code: code,
Msg: msg,
}
}
50 changes: 50 additions & 0 deletions internal/collector/extension/milvus/milvus_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package milvus

import (
"os"
"testing"

"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)

func TestMilvusCollector_Collect(t *testing.T) {
metrics := &job.Metrics{
Name: "milvus_test",
Milvus: &job.MilvusProtocol{
Host: "localhost",
Port: "19530",
},
AliasFields: []string{"version", "responseTime", "host", "port"},
}

l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
collector := NewMilvusCollector(l)
result := collector.Collect(metrics)

if result.Code != 0 {
t.Logf("Collect failed: %s", result.Msg)
} else {
t.Logf("Collect success: %+v", result.Values)
}
}
Loading