Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/collector/basic/database/jdbc_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"golang.org/x/crypto/ssh"

jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
Expand Down Expand Up @@ -184,7 +185,7 @@ type sshTunnelHelper struct {
// startSSHTunnel starts an SSH tunnel
// It connects to the SSH bastion, listens on a local random port,
// and forwards traffic to the target database.
func startSSHTunnel(config *jobtypes.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) {
func startSSHTunnel(config *protocol.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) {
sshHost := config.Host
sshPort, err := strconv.Atoi(config.Port)
if err != nil {
Expand Down Expand Up @@ -314,7 +315,7 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
}

// extractJDBCConfig extracts JDBC configuration from interface{} type
func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, error) {
func extractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, error) {
replacer := param.NewReplacer()
return replacer.ExtractJDBCConfig(jdbcInterface)
}
Expand Down Expand Up @@ -396,7 +397,7 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error {
}

// checkTunnelParam validates SSH tunnel configuration
func (jc *JDBCCollector) checkTunnelParam(config *jobtypes.SSHTunnel) error {
func (jc *JDBCCollector) checkTunnelParam(config *protocol.SSHTunnel) error {
if config == nil {
return nil
}
Expand Down Expand Up @@ -602,7 +603,7 @@ func validateURL(rawURL string, platform string) error {
}

// constructDatabaseURL constructs the database connection URL/DSN
func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol, host, port string) (string, error) {
func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol.JDBCProtocol, host, port string) (string, error) {
// 1. If user provided a full URL, use it (already validated in PreCheck)
if jdbc.URL != "" {
// Validate again to prevent bypassing PreCheck
Expand Down
9 changes: 5 additions & 4 deletions internal/collector/basic/http/http_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/prometheus/common/expfmt"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
Expand Down Expand Up @@ -209,7 +210,7 @@ func (hc *HTTPCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsDat
}

// handleDigestAuth generates a new request with the Digest Authorization header
func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *job.Authorization) (*http.Request, error) {
func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *protocol.Authorization) (*http.Request, error) {
params := hc.parseAuthHeader(authHeader)
realm := params["realm"]
nonce := params["nonce"]
Expand Down Expand Up @@ -298,7 +299,7 @@ func (hc *HTTPCollector) generateCnonce() string {
return hex.EncodeToString(b)
}

func (hc *HTTPCollector) createRequest(config *job.HTTPProtocol, targetURL string) (*http.Request, error) {
func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, targetURL string) (*http.Request, error) {
method := strings.ToUpper(config.Method)
if method == "" {
method = "GET"
Expand Down Expand Up @@ -391,7 +392,7 @@ func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) (*jo
return response, nil
}

func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *job.HTTPProtocol) (*job.CollectRepMetricsData, error) {
func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) {
response := hc.createSuccessResponse(metrics)
row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
keyword := httpConfig.Keyword
Expand Down Expand Up @@ -430,7 +431,7 @@ func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) (
return response, nil
}

func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *job.HTTPProtocol) (*job.CollectRepMetricsData, error) {
func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) {
response := hc.createSuccessResponse(metrics)
var data interface{}
decoder := json.NewDecoder(bytes.NewReader(body))
Expand Down
9 changes: 5 additions & 4 deletions internal/collector/basic/redis/redis_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
Expand Down Expand Up @@ -135,7 +136,7 @@ func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe
return response
}

func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) {
func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) {
opts := &redis.Options{
Addr: fmt.Sprintf("%s:%s", config.Host, config.Port),
Username: config.Username,
Expand Down Expand Up @@ -166,7 +167,7 @@ func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.M
rc.addValueRow(response, metrics.AliasFields, parseMap)
}

func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) {
func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) {
opts := &redis.ClusterOptions{
Addrs: []string{fmt.Sprintf("%s:%s", config.Host, config.Port)},
Username: config.Username,
Expand Down Expand Up @@ -293,13 +294,13 @@ func (rc *RedisCollector) parseInfo(info string) map[string]string {
}

// createRedisDialer creates a dialer function that supports SSH tunneling
func (rc *RedisCollector) createRedisDialer(sshTunnel *jobtypes.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) {
func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) {
if sshTunnel == nil || sshTunnel.Enable != "true" {
return nil, nil
}

// Create SSH config
sshConfig := &jobtypes.SSHProtocol{
sshConfig := &protocol.SSHProtocol{
Host: sshTunnel.Host,
Port: sshTunnel.Port,
Username: sshTunnel.Username,
Expand Down
11 changes: 6 additions & 5 deletions internal/collector/basic/redis/redis_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
Expand Down Expand Up @@ -56,23 +57,23 @@ func TestRedisCollector_PreCheck(t *testing.T) {
assert.Contains(t, err.Error(), "redis configuration is required")

// Test missing host
metrics.Redis = &jobtypes.RedisProtocol{
metrics.Redis = &protocol.RedisProtocol{
Port: "6379",
}
err = collector.PreCheck(metrics)
assert.Error(t, err)
assert.Contains(t, err.Error(), "redis host is required")

// Test missing port
metrics.Redis = &jobtypes.RedisProtocol{
metrics.Redis = &protocol.RedisProtocol{
Host: "localhost",
}
err = collector.PreCheck(metrics)
assert.Error(t, err)
assert.Contains(t, err.Error(), "redis port is required")

// Test valid configuration
metrics.Redis = &jobtypes.RedisProtocol{
metrics.Redis = &protocol.RedisProtocol{
Host: "localhost",
Port: "6379",
}
Expand All @@ -98,7 +99,7 @@ func TestRedisCollector_Collect(t *testing.T) {

metrics := &jobtypes.Metrics{
Name: "redis_test",
Redis: &jobtypes.RedisProtocol{
Redis: &protocol.RedisProtocol{
Host: host,
Port: port,
},
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestRedisCollector_Collect_Auth(t *testing.T) {
// Test with correct password
metrics := &jobtypes.Metrics{
Name: "redis_auth_test",
Redis: &jobtypes.RedisProtocol{
Redis: &protocol.RedisProtocol{
Host: host,
Port: port,
Password: "password123",
Expand Down
5 changes: 3 additions & 2 deletions internal/collector/basic/ssh/ssh_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewSSHCollector(logger logger.Logger) *SSHCollector {

// extractSSHConfig extracts SSH configuration from interface{} type
// This function uses the parameter replacer for consistent configuration extraction
func extractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error) {
func extractSSHConfig(sshInterface interface{}) (*protocol.SSHProtocol, error) {
replacer := param.NewReplacer()
return replacer.ExtractSSHConfig(sshInterface)
}
Expand Down Expand Up @@ -167,7 +168,7 @@ func (sshc *SSHCollector) getTimeout(timeoutStr string) time.Duration {
}

// executeSSHScript executes the SSH script and returns the result
func (sshc *SSHCollector) executeSSHScript(config *jobtypes.SSHProtocol, timeout time.Duration) (string, error) {
func (sshc *SSHCollector) executeSSHScript(config *protocol.SSHProtocol, timeout time.Duration) (string, error) {
// Create SSH client configuration using helper function
clientConfig, err := sshhelper.CreateSSHClientConfig(config, sshc.logger)
if err != nil {
Expand Down
74 changes: 29 additions & 45 deletions internal/collector/common/types/job/metrics_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package job
import (
"fmt"
"time"

"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
)

// Metrics represents a metric configuration
Expand All @@ -44,14 +46,14 @@ type Metrics struct {
HasSubTask bool `json:"hasSubTask"`

// Protocol specific fields
HTTP *HTTPProtocol `json:"http,omitempty"`
SSH interface{} `json:"ssh,omitempty"` // Can be SSHProtocol
JDBC interface{} `json:"jdbc,omitempty"` // Can be JDBCProtocol or map[string]interface{}
SNMP *SNMPProtocol `json:"snmp,omitempty"`
JMX *JMXProtocol `json:"jmx,omitempty"`
Redis *RedisProtocol `json:"redis,omitempty"`
MongoDB *MongoDBProtocol `json:"mongodb,omitempty"`
Milvus *MilvusProtocol `json:"milvus,omitempty"`
HTTP *protocol.HTTPProtocol `json:"http,omitempty"`
SSH *protocol.SSHProtocol `json:"ssh,omitempty"`
JDBC *protocol.JDBCProtocol `json:"jdbc,omitempty"`
SNMP *protocol.SNMPProtocol `json:"snmp,omitempty"`
JMX *protocol.JMXProtocol `json:"jmx,omitempty"`
Redis *protocol.RedisProtocol `json:"redis,omitempty"`
MongoDB *protocol.MongoDBProtocol `json:"mongodb,omitempty"`
Milvus *protocol.MilvusProtocol `json:"milvus,omitempty"`
}

// Field represents a metric field
Expand Down Expand Up @@ -216,18 +218,18 @@ type SSHProtocol struct {

// JDBCProtocol represents JDBC protocol configuration
type JDBCProtocol struct {
Host string `json:"host"`
Port string `json:"port"`
Platform string `json:"platform"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
Timeout string `json:"timeout"`
QueryType string `json:"queryType"`
SQL string `json:"sql"`
URL string `json:"url"`
ReuseConnection string `json:"reuseConnection"`
SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
Host string `json:"host"`
Port string `json:"port"`
Platform string `json:"platform"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
Timeout string `json:"timeout"`
QueryType string `json:"queryType"`
SQL string `json:"sql"`
URL string `json:"url"`
ReuseConnection string `json:"reuseConnection"`
SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
}

// SNMPProtocol represents SNMP protocol configuration
Expand Down Expand Up @@ -260,13 +262,13 @@ type JMXProtocol struct {

// RedisProtocol represents Redis protocol configuration
type RedisProtocol struct {
Host string `json:"host"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Pattern string `json:"pattern"`
Timeout string `json:"timeout"`
SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
Host string `json:"host"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Pattern string `json:"pattern"`
Timeout string `json:"timeout"`
SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
}

// MongoDBProtocol represents MongoDB protocol configuration
Expand All @@ -281,15 +283,6 @@ type MongoDBProtocol struct {
Timeout int `json:"timeout"`
}

// MilvusProtocol represents Milvus protocol configuration
type MilvusProtocol struct {
Host string `json:"host"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Timeout string `json:"timeout"`
}

// GetInterval returns the interval for the metric, using default if not set
func (m *Metrics) GetInterval() time.Duration {
if m.Interval > 0 {
Expand Down Expand Up @@ -413,15 +406,6 @@ type CollectRepMetricsData struct {
Metadata map[string]string `json:"metadata,omitempty"`
}

// SSHTunnel represents SSH tunnel configuration
type SSHTunnel struct {
Enable string `json:"enable"`
Host string `json:"host"`
Port string `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
}

// CollectResponseEventListener defines the interface for handling collect response events
type CollectResponseEventListener interface {
Response(metricsData []CollectRepMetricsData)
Expand Down
71 changes: 71 additions & 0 deletions internal/collector/common/types/job/protocol/http_protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package protocol

import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)

// HTTPProtocol represents HTTP protocol configuration
type HTTPProtocol struct {
URL string `json:"url"`
Method string `json:"method"`
Headers map[string]string `json:"headers"`
Params map[string]string `json:"params"`
Body string `json:"body"`
ParseScript string `json:"parseScript"`
ParseType string `json:"parseType"`
Keyword string `json:"keyword"`
Timeout string `json:"timeout"`
SSL string `json:"ssl"`
Authorization *Authorization `json:"authorization"`

logger logger.Logger
}

type HTTPProtocolOption func(protocol *HTTPProtocol)

func NewHTTPProtocol(url, method string, logger logger.Logger, opts ...HTTPProtocolOption) *HTTPProtocol {
p := &HTTPProtocol{
URL: url,
Method: method,
logger: logger,
}
for _, opt := range opts {
opt(p)
}
return p
}

func (p *HTTPProtocol) IsInvalid() error {
if p.URL == "" {
p.logger.Error(ErrorInvalidURL, "http protocol url is empty")
return ErrorInvalidURL
}
return nil
}

// Authorization represents HTTP authorization configuration
type Authorization struct {
Type string `json:"type"`
BasicAuthUsername string `json:"basicAuthUsername"`
BasicAuthPassword string `json:"basicAuthPassword"`
DigestAuthUsername string `json:"digestAuthUsername"`
DigestAuthPassword string `json:"digestAuthPassword"`
BearerTokenToken string `json:"bearerTokenToken"`
}
Loading