diff --git a/internal/collector/basic/database/jdbc_collector.go b/internal/collector/basic/database/jdbc_collector.go index f91d79d..841a0f7 100644 --- a/internal/collector/basic/database/jdbc_collector.go +++ b/internal/collector/basic/database/jdbc_collector.go @@ -40,6 +40,7 @@ import ( "golang.org/x/crypto/ssh" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" @@ -184,7 +185,7 @@ type sshTunnelHelper struct { // startSSHTunnel starts an SSH tunnel // It connects to the SSH bastion, listens on a local random port, // and forwards traffic to the target database. -func startSSHTunnel(config *jobtypes.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) { +func startSSHTunnel(config *protocol.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) { sshHost := config.Host sshPort, err := strconv.Atoi(config.Port) if err != nil { @@ -314,7 +315,7 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector { } // extractJDBCConfig extracts JDBC configuration from interface{} type -func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, error) { +func extractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, error) { replacer := param.NewReplacer() return replacer.ExtractJDBCConfig(jdbcInterface) } @@ -396,7 +397,7 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error { } // checkTunnelParam validates SSH tunnel configuration -func (jc *JDBCCollector) checkTunnelParam(config *jobtypes.SSHTunnel) error { +func (jc *JDBCCollector) checkTunnelParam(config *protocol.SSHTunnel) error { if config == nil { return nil } @@ -602,7 +603,7 @@ func validateURL(rawURL string, platform string) error { } // constructDatabaseURL constructs the database connection URL/DSN -func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol, host, port string) (string, error) { +func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol.JDBCProtocol, host, port string) (string, error) { // 1. If user provided a full URL, use it (already validated in PreCheck) if jdbc.URL != "" { // Validate again to prevent bypassing PreCheck diff --git a/internal/collector/basic/http/http_collector.go b/internal/collector/basic/http/http_collector.go index 1c065f0..846b6b6 100644 --- a/internal/collector/basic/http/http_collector.go +++ b/internal/collector/basic/http/http_collector.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/common/expfmt" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -209,7 +210,7 @@ func (hc *HTTPCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsDat } // handleDigestAuth generates a new request with the Digest Authorization header -func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *job.Authorization) (*http.Request, error) { +func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *protocol.Authorization) (*http.Request, error) { params := hc.parseAuthHeader(authHeader) realm := params["realm"] nonce := params["nonce"] @@ -298,7 +299,7 @@ func (hc *HTTPCollector) generateCnonce() string { return hex.EncodeToString(b) } -func (hc *HTTPCollector) createRequest(config *job.HTTPProtocol, targetURL string) (*http.Request, error) { +func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, targetURL string) (*http.Request, error) { method := strings.ToUpper(config.Method) if method == "" { method = "GET" @@ -391,7 +392,7 @@ func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) (*jo return response, nil } -func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *job.HTTPProtocol) (*job.CollectRepMetricsData, error) { +func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { response := hc.createSuccessResponse(metrics) row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} keyword := httpConfig.Keyword @@ -430,7 +431,7 @@ func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) ( return response, nil } -func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *job.HTTPProtocol) (*job.CollectRepMetricsData, error) { +func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { response := hc.createSuccessResponse(metrics) var data interface{} decoder := json.NewDecoder(bytes.NewReader(body)) diff --git a/internal/collector/basic/redis/redis_collector.go b/internal/collector/basic/redis/redis_collector.go index cf9cc83..12553e4 100644 --- a/internal/collector/basic/redis/redis_collector.go +++ b/internal/collector/basic/redis/redis_collector.go @@ -33,6 +33,7 @@ import ( sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -135,7 +136,7 @@ func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe return response } -func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { +func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { opts := &redis.Options{ Addr: fmt.Sprintf("%s:%s", config.Host, config.Port), Username: config.Username, @@ -166,7 +167,7 @@ func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.M rc.addValueRow(response, metrics.AliasFields, parseMap) } -func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { +func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { opts := &redis.ClusterOptions{ Addrs: []string{fmt.Sprintf("%s:%s", config.Host, config.Port)}, Username: config.Username, @@ -293,13 +294,13 @@ func (rc *RedisCollector) parseInfo(info string) map[string]string { } // createRedisDialer creates a dialer function that supports SSH tunneling -func (rc *RedisCollector) createRedisDialer(sshTunnel *jobtypes.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) { +func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) { if sshTunnel == nil || sshTunnel.Enable != "true" { return nil, nil } // Create SSH config - sshConfig := &jobtypes.SSHProtocol{ + sshConfig := &protocol.SSHProtocol{ Host: sshTunnel.Host, Port: sshTunnel.Port, Username: sshTunnel.Username, diff --git a/internal/collector/basic/redis/redis_collector_test.go b/internal/collector/basic/redis/redis_collector_test.go index 21c0720..56f7b2c 100644 --- a/internal/collector/basic/redis/redis_collector_test.go +++ b/internal/collector/basic/redis/redis_collector_test.go @@ -27,6 +27,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" @@ -56,7 +57,7 @@ func TestRedisCollector_PreCheck(t *testing.T) { assert.Contains(t, err.Error(), "redis configuration is required") // Test missing host - metrics.Redis = &jobtypes.RedisProtocol{ + metrics.Redis = &protocol.RedisProtocol{ Port: "6379", } err = collector.PreCheck(metrics) @@ -64,7 +65,7 @@ func TestRedisCollector_PreCheck(t *testing.T) { assert.Contains(t, err.Error(), "redis host is required") // Test missing port - metrics.Redis = &jobtypes.RedisProtocol{ + metrics.Redis = &protocol.RedisProtocol{ Host: "localhost", } err = collector.PreCheck(metrics) @@ -72,7 +73,7 @@ func TestRedisCollector_PreCheck(t *testing.T) { assert.Contains(t, err.Error(), "redis port is required") // Test valid configuration - metrics.Redis = &jobtypes.RedisProtocol{ + metrics.Redis = &protocol.RedisProtocol{ Host: "localhost", Port: "6379", } @@ -98,7 +99,7 @@ func TestRedisCollector_Collect(t *testing.T) { metrics := &jobtypes.Metrics{ Name: "redis_test", - Redis: &jobtypes.RedisProtocol{ + Redis: &protocol.RedisProtocol{ Host: host, Port: port, }, @@ -145,7 +146,7 @@ func TestRedisCollector_Collect_Auth(t *testing.T) { // Test with correct password metrics := &jobtypes.Metrics{ Name: "redis_auth_test", - Redis: &jobtypes.RedisProtocol{ + Redis: &protocol.RedisProtocol{ Host: host, Port: port, Password: "password123", diff --git a/internal/collector/basic/ssh/ssh_collector.go b/internal/collector/basic/ssh/ssh_collector.go index a6667c0..20e1bab 100644 --- a/internal/collector/basic/ssh/ssh_collector.go +++ b/internal/collector/basic/ssh/ssh_collector.go @@ -27,6 +27,7 @@ import ( "time" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" @@ -59,7 +60,7 @@ func NewSSHCollector(logger logger.Logger) *SSHCollector { // extractSSHConfig extracts SSH configuration from interface{} type // This function uses the parameter replacer for consistent configuration extraction -func extractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error) { +func extractSSHConfig(sshInterface interface{}) (*protocol.SSHProtocol, error) { replacer := param.NewReplacer() return replacer.ExtractSSHConfig(sshInterface) } @@ -167,7 +168,7 @@ func (sshc *SSHCollector) getTimeout(timeoutStr string) time.Duration { } // executeSSHScript executes the SSH script and returns the result -func (sshc *SSHCollector) executeSSHScript(config *jobtypes.SSHProtocol, timeout time.Duration) (string, error) { +func (sshc *SSHCollector) executeSSHScript(config *protocol.SSHProtocol, timeout time.Duration) (string, error) { // Create SSH client configuration using helper function clientConfig, err := sshhelper.CreateSSHClientConfig(config, sshc.logger) if err != nil { diff --git a/internal/collector/common/types/job/metrics_types.go b/internal/collector/common/types/job/metrics_types.go index b76c083..ea591f3 100644 --- a/internal/collector/common/types/job/metrics_types.go +++ b/internal/collector/common/types/job/metrics_types.go @@ -20,6 +20,8 @@ package job import ( "fmt" "time" + + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" ) // Metrics represents a metric configuration @@ -44,14 +46,14 @@ type Metrics struct { HasSubTask bool `json:"hasSubTask"` // Protocol specific fields - HTTP *HTTPProtocol `json:"http,omitempty"` - SSH interface{} `json:"ssh,omitempty"` // Can be SSHProtocol - JDBC interface{} `json:"jdbc,omitempty"` // Can be JDBCProtocol or map[string]interface{} - SNMP *SNMPProtocol `json:"snmp,omitempty"` - JMX *JMXProtocol `json:"jmx,omitempty"` - Redis *RedisProtocol `json:"redis,omitempty"` - MongoDB *MongoDBProtocol `json:"mongodb,omitempty"` - Milvus *MilvusProtocol `json:"milvus,omitempty"` + HTTP *protocol.HTTPProtocol `json:"http,omitempty"` + SSH *protocol.SSHProtocol `json:"ssh,omitempty"` + JDBC *protocol.JDBCProtocol `json:"jdbc,omitempty"` + SNMP *protocol.SNMPProtocol `json:"snmp,omitempty"` + JMX *protocol.JMXProtocol `json:"jmx,omitempty"` + Redis *protocol.RedisProtocol `json:"redis,omitempty"` + MongoDB *protocol.MongoDBProtocol `json:"mongodb,omitempty"` + Milvus *protocol.MilvusProtocol `json:"milvus,omitempty"` } // Field represents a metric field @@ -216,18 +218,18 @@ type SSHProtocol struct { // JDBCProtocol represents JDBC protocol configuration type JDBCProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Platform string `json:"platform"` - Username string `json:"username"` - Password string `json:"password"` - Database string `json:"database"` - Timeout string `json:"timeout"` - QueryType string `json:"queryType"` - SQL string `json:"sql"` - URL string `json:"url"` - ReuseConnection string `json:"reuseConnection"` - SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` + Host string `json:"host"` + Port string `json:"port"` + Platform string `json:"platform"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + Timeout string `json:"timeout"` + QueryType string `json:"queryType"` + SQL string `json:"sql"` + URL string `json:"url"` + ReuseConnection string `json:"reuseConnection"` + SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"` } // SNMPProtocol represents SNMP protocol configuration @@ -260,13 +262,13 @@ type JMXProtocol struct { // RedisProtocol represents Redis protocol configuration type RedisProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Username string `json:"username"` - Password string `json:"password"` - Pattern string `json:"pattern"` - Timeout string `json:"timeout"` - SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Pattern string `json:"pattern"` + Timeout string `json:"timeout"` + SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"` } // MongoDBProtocol represents MongoDB protocol configuration @@ -281,15 +283,6 @@ type MongoDBProtocol struct { Timeout int `json:"timeout"` } -// MilvusProtocol represents Milvus protocol configuration -type MilvusProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Username string `json:"username"` - Password string `json:"password"` - Timeout string `json:"timeout"` -} - // GetInterval returns the interval for the metric, using default if not set func (m *Metrics) GetInterval() time.Duration { if m.Interval > 0 { @@ -413,15 +406,6 @@ type CollectRepMetricsData struct { Metadata map[string]string `json:"metadata,omitempty"` } -// SSHTunnel represents SSH tunnel configuration -type SSHTunnel struct { - Enable string `json:"enable"` - Host string `json:"host"` - Port string `json:"port"` - Username string `json:"username"` - Password string `json:"password"` -} - // CollectResponseEventListener defines the interface for handling collect response events type CollectResponseEventListener interface { Response(metricsData []CollectRepMetricsData) diff --git a/internal/collector/common/types/job/protocol/http_protocol.go b/internal/collector/common/types/job/protocol/http_protocol.go new file mode 100644 index 0000000..494f4ea --- /dev/null +++ b/internal/collector/common/types/job/protocol/http_protocol.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// HTTPProtocol represents HTTP protocol configuration +type HTTPProtocol struct { + URL string `json:"url"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` + Params map[string]string `json:"params"` + Body string `json:"body"` + ParseScript string `json:"parseScript"` + ParseType string `json:"parseType"` + Keyword string `json:"keyword"` + Timeout string `json:"timeout"` + SSL string `json:"ssl"` + Authorization *Authorization `json:"authorization"` + + logger logger.Logger +} + +type HTTPProtocolOption func(protocol *HTTPProtocol) + +func NewHTTPProtocol(url, method string, logger logger.Logger, opts ...HTTPProtocolOption) *HTTPProtocol { + p := &HTTPProtocol{ + URL: url, + Method: method, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *HTTPProtocol) IsInvalid() error { + if p.URL == "" { + p.logger.Error(ErrorInvalidURL, "http protocol url is empty") + return ErrorInvalidURL + } + return nil +} + +// Authorization represents HTTP authorization configuration +type Authorization struct { + Type string `json:"type"` + BasicAuthUsername string `json:"basicAuthUsername"` + BasicAuthPassword string `json:"basicAuthPassword"` + DigestAuthUsername string `json:"digestAuthUsername"` + DigestAuthPassword string `json:"digestAuthPassword"` + BearerTokenToken string `json:"bearerTokenToken"` +} diff --git a/internal/collector/common/types/job/protocol/jdbc_protocol.go b/internal/collector/common/types/job/protocol/jdbc_protocol.go new file mode 100644 index 0000000..f81d3cd --- /dev/null +++ b/internal/collector/common/types/job/protocol/jdbc_protocol.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// JDBCProtocol represents JDBC protocol configuration +type JDBCProtocol struct { + Host string `json:"host"` + Port string `json:"port"` + Platform string `json:"platform"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + Timeout string `json:"timeout"` + QueryType string `json:"queryType"` + SQL string `json:"sql"` + URL string `json:"url"` + ReuseConnection string `json:"reuseConnection"` + SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` + + logger logger.Logger +} + +type JDBCProtocolOption func(protocol *JDBCProtocol) + +func NewJDBCProtocol(host, port, platform string, logger logger.Logger, opts ...JDBCProtocolOption) *JDBCProtocol { + p := &JDBCProtocol{ + Host: host, + Port: port, + Platform: platform, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *JDBCProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "jdbc protocol host is empty") + return ErrorInvalidHost + } + if p.Port == "" { + p.logger.Error(ErrorInvalidPort, "jdbc protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/jmx_protocol.go b/internal/collector/common/types/job/protocol/jmx_protocol.go new file mode 100644 index 0000000..9666d03 --- /dev/null +++ b/internal/collector/common/types/job/protocol/jmx_protocol.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// JMXProtocol represents JMX protocol configuration +type JMXProtocol struct { + Host string `json:"host"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Protocol string `json:"protocol"` + URL string `json:"url"` + Timeout int `json:"timeout"` + + logger logger.Logger +} + +type JMXProtocolOption func(protocol *JMXProtocol) + +func NewJMXProtocol(host string, port int, logger logger.Logger, opts ...JMXProtocolOption) *JMXProtocol { + p := &JMXProtocol{ + Host: host, + Port: port, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *JMXProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "jmx protocol host is empty") + return ErrorInvalidHost + } + if p.Port == 0 { + p.logger.Error(ErrorInvalidPort, "jmx protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/milvus_protocol.go b/internal/collector/common/types/job/protocol/milvus_protocol.go new file mode 100644 index 0000000..eb70dcd --- /dev/null +++ b/internal/collector/common/types/job/protocol/milvus_protocol.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// MilvusProtocol represents Milvus protocol configuration +type MilvusProtocol struct { + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Timeout string `json:"timeout"` + + logger logger.Logger +} + +type MilvusProtocolOption func(protocol *MilvusProtocol) + +func NewMilvusProtocol(host, port string, logger logger.Logger, opts ...MilvusProtocolOption) *MilvusProtocol { + + p := &MilvusProtocol{ + Host: host, + Port: port, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *MilvusProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "milvus protocol host is empty") + return ErrorInvalidHost + } + if p.Port == "" { + p.logger.Error(ErrorInvalidPort, "milvus protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/mongodb_protocol.go b/internal/collector/common/types/job/protocol/mongodb_protocol.go new file mode 100644 index 0000000..30a249b --- /dev/null +++ b/internal/collector/common/types/job/protocol/mongodb_protocol.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// MongoDBProtocol represents MongoDB protocol configuration +type MongoDBProtocol struct { + Host string `json:"host"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + AuthDatabase string `json:"authDatabase"` + Command string `json:"command"` + Timeout int `json:"timeout"` + + logger logger.Logger +} + +type MongoDBProtocolOption func(protocol *MongoDBProtocol) + +func NewMongoDBProtocol(host string, port int, logger logger.Logger, opts ...MongoDBProtocolOption) *MongoDBProtocol { + p := &MongoDBProtocol{ + Host: host, + Port: port, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *MongoDBProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "mongodb protocol host is empty") + return ErrorInvalidHost + } + if p.Port == 0 { + p.logger.Error(ErrorInvalidPort, "mongodb protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/redis_protocol.go b/internal/collector/common/types/job/protocol/redis_protocol.go new file mode 100644 index 0000000..e5f6fce --- /dev/null +++ b/internal/collector/common/types/job/protocol/redis_protocol.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// RedisProtocol represents Redis protocol configuration +type RedisProtocol struct { + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Pattern string `json:"pattern"` + Timeout string `json:"timeout"` + SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` + + logger logger.Logger +} + +type RedisProtocolOption func(protocol *RedisProtocol) + +func NewRedisProtocol(host, port string, logger logger.Logger, opts ...RedisProtocolOption) *RedisProtocol { + p := &RedisProtocol{ + Host: host, + Port: port, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +func (p *RedisProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "redis protocol host is empty") + return ErrorInvalidHost + } + if p.Port == "" { + p.logger.Error(ErrorInvalidPort, "redis protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/snmp_protocol.go b/internal/collector/common/types/job/protocol/snmp_protocol.go new file mode 100644 index 0000000..b2622e6 --- /dev/null +++ b/internal/collector/common/types/job/protocol/snmp_protocol.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// SNMPProtocol represents SNMP protocol configuration +type SNMPProtocol struct { + Host string `json:"host"` + Port int `json:"port"` + Version string `json:"version"` + Community string `json:"community"` + Username string `json:"username"` + AuthType string `json:"authType"` + AuthPasswd string `json:"authPasswd"` + PrivType string `json:"privType"` + PrivPasswd string `json:"privPasswd"` + ContextName string `json:"contextName"` + Timeout int `json:"timeout"` + Operation string `json:"operation"` + OIDs string `json:"oids"` + + logger logger.Logger +} + +type SNMPProtocolOption func(protocol *SNMPProtocol) + +func NewSNMPProtocol(host string, port int, logger logger.Logger, opts ...SNMPProtocolOption) *SNMPProtocol { + p := &SNMPProtocol{ + Host: host, + Port: port, + logger: logger, + } + for _, opt := range opts { + opt(p) + } + return p +} + +// todo add more check. +func (p *SNMPProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "snmp protocol host is empty") + return ErrorInvalidHost + } + if p.Port == 0 { + p.logger.Error(ErrorInvalidPort, "snmp protocol port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/common/types/job/protocol/ssh_protocol.go b/internal/collector/common/types/job/protocol/ssh_protocol.go index 1a1f108..fba5088 100644 --- a/internal/collector/common/types/job/protocol/ssh_protocol.go +++ b/internal/collector/common/types/job/protocol/ssh_protocol.go @@ -1,75 +1,71 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package protocol -type SshProtocol struct { - Host string - Port string - Timeout string - Username string - Password string - PrivateKey string - PrivateKeyPassphrase string - ReuseConnection string - Script string - ParseType string - ProxyHost string - ProxyPort string - ProxyUsername string - ProxyPassword string - UseProxy string - ProxyPrivateKey string -} +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) -type SshProtocolConfigOptFunc func(option *SshProtocol) +// SSHProtocol represents SSH protocol configuration +type SSHProtocol struct { + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + PrivateKey string `json:"privateKey"` + PrivateKeyPassphrase string `json:"privateKeyPassphrase"` + Script string `json:"script"` + ParseType string `json:"parseType"` + ParseScript string `json:"parseScript"` + Timeout string `json:"timeout"` + ReuseConnection string `json:"reuseConnection"` + UseProxy string `json:"useProxy"` + ProxyHost string `json:"proxyHost"` + ProxyPort string `json:"proxyPort"` + ProxyUsername string `json:"proxyUsername"` + ProxyPassword string `json:"proxyPassword"` + ProxyPrivateKey string `json:"proxyPrivateKey"` -func NewSshProtocol(host, port string, opts ...SshProtocolConfigOptFunc) *SshProtocol { + logger logger.Logger +} - option := &SshProtocol{ - Host: host, - Port: port, - } +type SSHProtocolOption func(protocol *SSHProtocol) - for _, opt := range opts { - opt(option) +func NewSSHProtocol(host, port string, logger logger.Logger, opts ...SSHProtocolOption) *SSHProtocol { + p := &SSHProtocol{ + Host: host, + Port: port, + logger: logger, } - - return &SshProtocol{ - Host: host, - Port: port, - Timeout: option.Timeout, - Username: option.Username, - Password: option.Password, - PrivateKey: option.PrivateKey, - PrivateKeyPassphrase: option.PrivateKeyPassphrase, - ReuseConnection: option.ReuseConnection, - Script: option.Script, - ParseType: option.ParseType, - ProxyHost: option.ProxyHost, - ProxyPort: option.ProxyPort, - ProxyUsername: option.ProxyUsername, - ProxyPassword: option.ProxyPassword, - UseProxy: option.UseProxy, - ProxyPrivateKey: option.ProxyPrivateKey, + for _, opt := range opts { + opt(p) } + return p } -func (sp *SshProtocol) IsInvalid() error { - +func (p *SSHProtocol) IsInvalid() error { + if p.Host == "" { + p.logger.Error(ErrorInvalidHost, "ssh protocol host is empty") + return ErrorInvalidHost + } + if p.Port == "" { + p.logger.Error(ErrorInvalidPort, "ssh protocol port is empty") + return ErrorInvalidPort + } return nil } diff --git a/internal/collector/common/types/job/protocol/ssh_tunnel.go b/internal/collector/common/types/job/protocol/ssh_tunnel.go new file mode 100644 index 0000000..eb0e7f7 --- /dev/null +++ b/internal/collector/common/types/job/protocol/ssh_tunnel.go @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// SSHTunnel represents SSH tunnel configuration +type SSHTunnel struct { + Enable string `json:"enable"` + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + + logger logger.Logger +} + +type SSHTunnelOption func(tunnel *SSHTunnel) + +func NewSSHTunnel(host, port string, logger logger.Logger, opts ...SSHTunnelOption) *SSHTunnel { + t := &SSHTunnel{ + Host: host, + Port: port, + logger: logger.WithName("sshTunel-config"), + } + for _, opt := range opts { + opt(t) + } + return t +} + +func (t *SSHTunnel) IsInvalid() error { + if t.Host == "" { + t.logger.Error(ErrorInvalidHost, "ssh tunnel host is empty") + return ErrorInvalidHost + } + if t.Port == "" { + t.logger.Error(ErrorInvalidPort, "ssh tunnel port is empty") + return ErrorInvalidPort + } + return nil +} diff --git a/internal/collector/extension/milvus/milvus_collector_test.go b/internal/collector/extension/milvus/milvus_collector_test.go index 0491efa..137d4c3 100644 --- a/internal/collector/extension/milvus/milvus_collector_test.go +++ b/internal/collector/extension/milvus/milvus_collector_test.go @@ -20,31 +20,32 @@ package milvus import ( - "os" - "testing" + "os" + "testing" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) func TestMilvusCollector_Collect(t *testing.T) { - metrics := &job.Metrics{ - Name: "milvus_test", - Milvus: &job.MilvusProtocol{ - Host: "localhost", - Port: "19530", - }, - AliasFields: []string{"version", "responseTime", "host", "port"}, - } + metrics := &job.Metrics{ + Name: "milvus_test", + Milvus: &protocol.MilvusProtocol{ + Host: "localhost", + Port: "19530", + }, + AliasFields: []string{"version", "responseTime", "host", "port"}, + } - l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) - collector := NewMilvusCollector(l) - result := collector.Collect(metrics) + l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) + collector := NewMilvusCollector(l) + result := collector.Collect(metrics) - if result.Code != 0 { - t.Logf("Collect failed: %s", result.Msg) - } else { - t.Logf("Collect success: %+v", result.Values) - } + if result.Code != 0 { + t.Logf("Collect failed: %s", result.Msg) + } else { + t.Logf("Collect success: %+v", result.Values) + } } diff --git a/internal/transport/grpc_client.go b/internal/transport/grpc_client.go index d5d82cd..9040ea6 100644 --- a/internal/transport/grpc_client.go +++ b/internal/transport/grpc_client.go @@ -18,62 +18,63 @@ package transport import ( - "context" - "log" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + "context" + "log" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + + pb "hertzbeat.apache.org/hertzbeat-collector-go/api" ) // ResponseFuture represents a future response for sync calls type ResponseFuture struct { - response chan *pb.Message - error chan error + response chan *pb.Message + error chan error } func NewResponseFuture() *ResponseFuture { - return &ResponseFuture{ - response: make(chan *pb.Message, 1), - error: make(chan error, 1), - } + return &ResponseFuture{ + response: make(chan *pb.Message, 1), + error: make(chan error, 1), + } } func (f *ResponseFuture) Wait(timeout time.Duration) (*pb.Message, error) { - select { - case resp := <-f.response: - return resp, nil - case err := <-f.error: - return nil, err - case <-time.After(timeout): - return nil, context.DeadlineExceeded - } + select { + case resp := <-f.response: + return resp, nil + case err := <-f.error: + return nil, err + case <-time.After(timeout): + return nil, context.DeadlineExceeded + } } func (f *ResponseFuture) PutResponse(resp *pb.Message) { - f.response <- resp + f.response <- resp } func (f *ResponseFuture) PutError(err error) { - f.error <- err + f.error <- err } // EventType represents connection event types type EventType int const ( - EventConnected EventType = iota - EventDisconnected - EventConnectFailed + EventConnected EventType = iota + EventDisconnected + EventConnectFailed ) // Event represents a connection event type Event struct { - Type EventType - Address string - Error error + Type EventType + Address string + Error error } // EventHandler handles connection events @@ -81,228 +82,228 @@ type EventHandler func(event Event) // GrpcClient implements TransportClient using gRPC. type GrpcClient struct { - conn *grpc.ClientConn - client pb.ClusterMsgServiceClient - addr string - started bool - mu sync.RWMutex - registry *ProcessorRegistry - responseTable map[string]*ResponseFuture - eventHandler EventHandler - cancel context.CancelFunc + conn *grpc.ClientConn + client pb.ClusterMsgServiceClient + addr string + started bool + mu sync.RWMutex + registry *ProcessorRegistry + responseTable map[string]*ResponseFuture + eventHandler EventHandler + cancel context.CancelFunc } func NewGrpcClient(addr string) *GrpcClient { - return &GrpcClient{ - addr: addr, - registry: NewProcessorRegistry(), - responseTable: make(map[string]*ResponseFuture), - eventHandler: defaultEventHandler, - } + return &GrpcClient{ + addr: addr, + registry: NewProcessorRegistry(), + responseTable: make(map[string]*ResponseFuture), + eventHandler: defaultEventHandler, + } } func defaultEventHandler(event Event) { - // Default event handler - log.Printf("Connection event: Type=%d, Address=%s, Error=%v", event.Type, event.Address, event.Error) + // Default event handler + log.Printf("Connection event: Type=%d, Address=%s, Error=%v", event.Type, event.Address, event.Error) } func (c *GrpcClient) SetEventHandler(handler EventHandler) { - c.mu.Lock() - defer c.mu.Unlock() - c.eventHandler = handler + c.mu.Lock() + defer c.mu.Unlock() + c.eventHandler = handler } func (c *GrpcClient) triggerEvent(eventType EventType, err error) { - if c.eventHandler != nil { - c.eventHandler(Event{ - Type: eventType, - Address: c.addr, - Error: err, - }) - } + if c.eventHandler != nil { + c.eventHandler(Event{ + Type: eventType, + Address: c.addr, + Error: err, + }) + } } func (c *GrpcClient) Start() error { - c.mu.Lock() - defer c.mu.Unlock() - if c.started { - return nil - } - - _, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - - conn, err := grpc.Dial(c.addr, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - c.triggerEvent(EventConnectFailed, err) - return err - } - c.conn = conn - c.client = pb.NewClusterMsgServiceClient(conn) - c.started = true - - c.triggerEvent(EventConnected, nil) - - go c.heartbeatLoop() - go c.connectionMonitor() - go c.streamMsgLoop() - return nil + c.mu.Lock() + defer c.mu.Unlock() + if c.started { + return nil + } + + _, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + + conn, err := grpc.Dial(c.addr, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + c.triggerEvent(EventConnectFailed, err) + return err + } + c.conn = conn + c.client = pb.NewClusterMsgServiceClient(conn) + c.started = true + + c.triggerEvent(EventConnected, nil) + + go c.heartbeatLoop() + go c.connectionMonitor() + go c.streamMsgLoop() + return nil } func (c *GrpcClient) Shutdown() error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.cancel != nil { - c.cancel() - } - - if c.conn != nil { - _ = c.conn.Close() - } - c.started = false - c.triggerEvent(EventDisconnected, nil) - return nil + c.mu.Lock() + defer c.mu.Unlock() + + if c.cancel != nil { + c.cancel() + } + + if c.conn != nil { + _ = c.conn.Close() + } + c.started = false + c.triggerEvent(EventDisconnected, nil) + return nil } func (c *GrpcClient) IsStarted() bool { - c.mu.RLock() - defer c.mu.RUnlock() - return c.started + c.mu.RLock() + defer c.mu.RUnlock() + return c.started } // processor: func(msg interface{}) (resp interface{}, err error) func (c *GrpcClient) RegisterProcessor(msgType int32, processor ProcessorFunc) { - c.registry.Register(msgType, processor) + c.registry.Register(msgType, processor) } func (c *GrpcClient) SendMsg(msg interface{}) error { - pbMsg, ok := msg.(*pb.Message) - if !ok { - return nil - } - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - _, err := c.client.SendMsg(ctx, pbMsg) - return err + pbMsg, ok := msg.(*pb.Message) + if !ok { + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _, err := c.client.SendMsg(ctx, pbMsg) + return err } func (c *GrpcClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { - pbMsg, ok := msg.(*pb.Message) - if !ok { - return nil, nil - } - - // Use the existing identity as correlation ID - // If empty, generate a new one - if pbMsg.Identity == "" { - pbMsg.Identity = generateCorrelationID() - } - - // Create response future for this request - future := NewResponseFuture() - c.responseTable[pbMsg.Identity] = future - defer delete(c.responseTable, pbMsg.Identity) - - // Send message - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMillis)*time.Millisecond) - defer cancel() - - resp, err := c.client.SendMsg(ctx, pbMsg) - if err != nil { - future.PutError(err) - return nil, err - } - - // Check if this is a response to our request - if resp != nil && resp.Identity == pbMsg.Identity { - future.PutResponse(resp) - return resp, nil - } - - // If no immediate response, wait for async response - return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) + pbMsg, ok := msg.(*pb.Message) + if !ok { + return nil, nil + } + + // Use the existing identity as correlation ID + // If empty, generate a new one + if pbMsg.Identity == "" { + pbMsg.Identity = generateCorrelationID() + } + + // Create response future for this request + future := NewResponseFuture() + c.responseTable[pbMsg.Identity] = future + defer delete(c.responseTable, pbMsg.Identity) + + // Send message + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMillis)*time.Millisecond) + defer cancel() + + resp, err := c.client.SendMsg(ctx, pbMsg) + if err != nil { + future.PutError(err) + return nil, err + } + + // Check if this is a response to our request + if resp != nil && resp.Identity == pbMsg.Identity { + future.PutResponse(resp) + return resp, nil + } + + // If no immediate response, wait for async response + return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) } func generateCorrelationID() string { - return time.Now().Format("20060102150405.999999999") + "-" + randomString(8) + return time.Now().Format("20060102150405.999999999") + "-" + randomString(8) } func randomString(length int) string { - const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - b := make([]byte, length) - for i := range b { - b[i] = charset[time.Now().Nanosecond()%len(charset)] - } - return string(b) + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, length) + for i := range b { + b[i] = charset[time.Now().Nanosecond()%len(charset)] + } + return string(b) } func (c *GrpcClient) connectionMonitor() { - for c.IsStarted() { - time.Sleep(5 * time.Second) - if c.conn != nil && c.conn.GetState() != connectivity.Ready { - c.triggerEvent(EventDisconnected, nil) - log.Println("gRPC connection lost, attempting to reconnect...") - _ = c.Shutdown() - if err := c.Start(); err != nil { - c.triggerEvent(EventConnectFailed, err) - log.Printf("Failed to reconnect: %v", err) - } - } - } + for c.IsStarted() { + time.Sleep(5 * time.Second) + if c.conn != nil && c.conn.GetState() != connectivity.Ready { + c.triggerEvent(EventDisconnected, nil) + log.Println("gRPC connection lost, attempting to reconnect...") + _ = c.Shutdown() + if err := c.Start(); err != nil { + c.triggerEvent(EventConnectFailed, err) + log.Printf("Failed to reconnect: %v", err) + } + } + } } func (c *GrpcClient) heartbeatLoop() { - for c.IsStarted() { - // 发送心跳消息 - heartbeat := &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_REQUEST, - Identity: "collector-go", // 可根据实际配置 - } - _, _ = c.SendMsgSync(heartbeat, 2000) - time.Sleep(10 * time.Second) - } + for c.IsStarted() { + // 发送心跳消息 + heartbeat := &pb.Message{ + Type: pb.MessageType_HEARTBEAT, + Direction: pb.Direction_REQUEST, + Identity: "collector-go", // 可根据实际配置 + } + _, _ = c.SendMsgSync(heartbeat, 2000) + time.Sleep(10 * time.Second) + } } // StreamMsg 双向流式通信(可用于实时推送/心跳/任务下发等) func (c *GrpcClient) streamMsgLoop() { - ctx := context.Background() - stream, err := c.client.StreamMsg(ctx) - if err != nil { - log.Printf("streamMsgLoop error: %v", err) - return - } - - // Start receiving messages - go func() { - for c.IsStarted() { - in, err := stream.Recv() - if err != nil { - log.Printf("streamMsgLoop recv error: %v", err) - return - } - - // Process the received message - c.processReceivedMessage(in) - } - }() - - // Keep the stream open - <-ctx.Done() + ctx := context.Background() + stream, err := c.client.StreamMsg(ctx) + if err != nil { + log.Printf("streamMsgLoop error: %v", err) + return + } + + // Start receiving messages + go func() { + for c.IsStarted() { + in, err := stream.Recv() + if err != nil { + log.Printf("streamMsgLoop recv error: %v", err) + return + } + + // Process the received message + c.processReceivedMessage(in) + } + }() + + // Keep the stream open + <-ctx.Done() } func (c *GrpcClient) processReceivedMessage(msg *pb.Message) { - // Check if this is a response to a sync request - if msg.Direction == pb.Direction_RESPONSE { - if future, ok := c.responseTable[msg.Identity]; ok { - future.PutResponse(msg) - return - } - } - - // If not a sync response, distribute to registered processors - if fn, ok := c.registry.Get(int32(msg.Type)); ok { - go fn(msg) - } + // Check if this is a response to a sync request + if msg.Direction == pb.Direction_RESPONSE { + if future, ok := c.responseTable[msg.Identity]; ok { + future.PutResponse(msg) + return + } + } + + // If not a sync response, distribute to registered processors + if fn, ok := c.registry.Get(int32(msg.Type)); ok { + go fn(msg) + } } diff --git a/internal/util/param/param_replacer.go b/internal/util/param/param_replacer.go index 0aeec7d..984e8cd 100644 --- a/internal/util/param/param_replacer.go +++ b/internal/util/param/param_replacer.go @@ -27,6 +27,7 @@ import ( "strings" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" @@ -131,16 +132,12 @@ func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) map[stri func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[string]string) error { // 1. JDBC if metrics.JDBC != nil { - if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err != nil { - return fmt.Errorf("failed to replace JDBC params: %w", err) - } + r.replaceJDBCParams(metrics.JDBC, paramMap) } // 2. SSH if metrics.SSH != nil { - if err := r.replaceProtocolParams(&metrics.SSH, paramMap); err != nil { - return fmt.Errorf("failed to replace SSH params: %w", err) - } + r.replaceSSHParams(metrics.SSH, paramMap) } // 3. HTTP @@ -162,7 +159,7 @@ func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[ } // replaceHTTPParams specific replacement logic for HTTPProtocol struct -func (r *Replacer) replaceHTTPParams(http *jobtypes.HTTPProtocol, paramMap map[string]string) { +func (r *Replacer) replaceHTTPParams(http *protocol.HTTPProtocol, paramMap map[string]string) { http.URL = r.replaceParamPlaceholders(http.URL, paramMap) http.Method = r.replaceParamPlaceholders(http.Method, paramMap) http.Body = r.replaceParamPlaceholders(http.Body, paramMap) @@ -210,7 +207,7 @@ func (r *Replacer) replaceHTTPParams(http *jobtypes.HTTPProtocol, paramMap map[s } // replaceRedisParams specific replacement logic for RedisProtocol struct -func (r *Replacer) replaceRedisParams(redis *jobtypes.RedisProtocol, paramMap map[string]string) { +func (r *Replacer) replaceRedisParams(redis *protocol.RedisProtocol, paramMap map[string]string) { redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap) redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap) redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap) @@ -225,44 +222,46 @@ func (r *Replacer) replaceRedisParams(redis *jobtypes.RedisProtocol, paramMap ma } } -// replaceProtocolParams replaces parameters in any protocol configuration defined as interface{} -func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, paramMap map[string]string) error { - if *protocolInterface == nil { - return nil - } - - // Convert protocol interface{} to map for manipulation - protocolMap, ok := (*protocolInterface).(map[string]interface{}) - if !ok { - return nil - } - - return r.replaceParamsInMap(protocolMap, paramMap) +// replaceSSHParams specific replacement logic for SSHProtocol struct +func (r *Replacer) replaceSSHParams(ssh *protocol.SSHProtocol, paramMap map[string]string) { + ssh.Host = r.replaceParamPlaceholders(ssh.Host, paramMap) + ssh.Port = r.replaceParamPlaceholders(ssh.Port, paramMap) + ssh.Username = r.replaceParamPlaceholders(ssh.Username, paramMap) + ssh.Password = r.replaceParamPlaceholders(ssh.Password, paramMap) + ssh.PrivateKey = r.replaceParamPlaceholders(ssh.PrivateKey, paramMap) + ssh.PrivateKeyPassphrase = r.replaceParamPlaceholders(ssh.PrivateKeyPassphrase, paramMap) + ssh.Script = r.replaceParamPlaceholders(ssh.Script, paramMap) + ssh.ParseType = r.replaceParamPlaceholders(ssh.ParseType, paramMap) + ssh.ParseScript = r.replaceParamPlaceholders(ssh.ParseScript, paramMap) + ssh.Timeout = r.replaceParamPlaceholders(ssh.Timeout, paramMap) + ssh.ReuseConnection = r.replaceParamPlaceholders(ssh.ReuseConnection, paramMap) + ssh.UseProxy = r.replaceParamPlaceholders(ssh.UseProxy, paramMap) + ssh.ProxyHost = r.replaceParamPlaceholders(ssh.ProxyHost, paramMap) + ssh.ProxyPort = r.replaceParamPlaceholders(ssh.ProxyPort, paramMap) + ssh.ProxyUsername = r.replaceParamPlaceholders(ssh.ProxyUsername, paramMap) + ssh.ProxyPassword = r.replaceParamPlaceholders(ssh.ProxyPassword, paramMap) + ssh.ProxyPrivateKey = r.replaceParamPlaceholders(ssh.ProxyPrivateKey, paramMap) } -// replaceParamsInMap recursively replaces parameters in a map structure -func (r *Replacer) replaceParamsInMap(data map[string]interface{}, paramMap map[string]string) error { - for key, value := range data { - switch v := value.(type) { - case string: - data[key] = r.replaceParamPlaceholders(v, paramMap) - case map[string]interface{}: - if err := r.replaceParamsInMap(v, paramMap); err != nil { - return fmt.Errorf("failed to replace params in nested map %s: %w", key, err) - } - case []interface{}: - for i, item := range v { - if itemMap, ok := item.(map[string]interface{}); ok { - if err := r.replaceParamsInMap(itemMap, paramMap); err != nil { - return fmt.Errorf("failed to replace params in array item %d: %w", i, err) - } - } else if itemStr, ok := item.(string); ok { - v[i] = r.replaceParamPlaceholders(itemStr, paramMap) - } - } - } +// replaceJDBCParams specific replacement logic for JDBCProtocol struct +func (r *Replacer) replaceJDBCParams(jdbc *protocol.JDBCProtocol, paramMap map[string]string) { + jdbc.Host = r.replaceParamPlaceholders(jdbc.Host, paramMap) + jdbc.Port = r.replaceParamPlaceholders(jdbc.Port, paramMap) + jdbc.Platform = r.replaceParamPlaceholders(jdbc.Platform, paramMap) + jdbc.Username = r.replaceParamPlaceholders(jdbc.Username, paramMap) + jdbc.Password = r.replaceParamPlaceholders(jdbc.Password, paramMap) + jdbc.Database = r.replaceParamPlaceholders(jdbc.Database, paramMap) + jdbc.Timeout = r.replaceParamPlaceholders(jdbc.Timeout, paramMap) + jdbc.QueryType = r.replaceParamPlaceholders(jdbc.QueryType, paramMap) + jdbc.SQL = r.replaceParamPlaceholders(jdbc.SQL, paramMap) + jdbc.URL = r.replaceParamPlaceholders(jdbc.URL, paramMap) + jdbc.ReuseConnection = r.replaceParamPlaceholders(jdbc.ReuseConnection, paramMap) + if jdbc.SSHTunnel != nil { + jdbc.SSHTunnel.Host = r.replaceParamPlaceholders(jdbc.SSHTunnel.Host, paramMap) + jdbc.SSHTunnel.Port = r.replaceParamPlaceholders(jdbc.SSHTunnel.Port, paramMap) + jdbc.SSHTunnel.Username = r.replaceParamPlaceholders(jdbc.SSHTunnel.Username, paramMap) + jdbc.SSHTunnel.Password = r.replaceParamPlaceholders(jdbc.SSHTunnel.Password, paramMap) } - return nil } // replaceBasicMetricsParams replaces parameters in basic metrics fields @@ -322,14 +321,14 @@ func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{}, targetSt } // ExtractJDBCConfig extracts and processes JDBC configuration -func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, error) { +func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, error) { if jdbcInterface == nil { return nil, nil } - if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok { + if jdbcConfig, ok := jdbcInterface.(*protocol.JDBCProtocol); ok { return jdbcConfig, nil } - var jdbcConfig jobtypes.JDBCProtocol + var jdbcConfig protocol.JDBCProtocol if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != nil { return nil, fmt.Errorf("failed to extract JDBC config: %w", err) } @@ -337,14 +336,14 @@ func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCP } // ExtractHTTPConfig extracts and processes HTTP configuration -func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*jobtypes.HTTPProtocol, error) { +func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*protocol.HTTPProtocol, error) { if httpInterface == nil { return nil, nil } - if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok { + if httpConfig, ok := httpInterface.(*protocol.HTTPProtocol); ok { return httpConfig, nil } - var httpConfig jobtypes.HTTPProtocol + var httpConfig protocol.HTTPProtocol if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != nil { return nil, fmt.Errorf("failed to extract HTTP config: %w", err) } @@ -352,14 +351,14 @@ func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*jobtypes.HTTPP } // ExtractSSHConfig extracts and processes SSH configuration -func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error) { +func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) (*protocol.SSHProtocol, error) { if sshInterface == nil { return nil, nil } - if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok { + if sshConfig, ok := sshInterface.(*protocol.SSHProtocol); ok { return sshConfig, nil } - var sshConfig jobtypes.SSHProtocol + var sshConfig protocol.SSHProtocol if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil { return nil, fmt.Errorf("failed to extract SSH config: %w", err) } diff --git a/internal/util/ssh/ssh_helper.go b/internal/util/ssh/ssh_helper.go index fbecb76..77b7d21 100644 --- a/internal/util/ssh/ssh_helper.go +++ b/internal/util/ssh/ssh_helper.go @@ -29,12 +29,12 @@ import ( "golang.org/x/crypto/ssh" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CreateSSHClientConfig creates SSH client configuration based on the protocol config -func CreateSSHClientConfig(config *jobtypes.SSHProtocol, logger logger.Logger) (*ssh.ClientConfig, error) { +func CreateSSHClientConfig(config *protocol.SSHProtocol, logger logger.Logger) (*ssh.ClientConfig, error) { clientConfig := &ssh.ClientConfig{ User: config.Username, HostKeyCallback: ssh.InsecureIgnoreHostKey(), // Note: In production, you should verify host keys @@ -100,7 +100,7 @@ func ParsePrivateKey(privateKeyStr string) (ssh.Signer, error) { } // createProxyClientConfig creates SSH client configuration for proxy server -func createProxyClientConfig(config *jobtypes.SSHProtocol, logger logger.Logger) (*ssh.ClientConfig, error) { +func createProxyClientConfig(config *protocol.SSHProtocol, logger logger.Logger) (*ssh.ClientConfig, error) { clientConfig := &ssh.ClientConfig{ User: config.ProxyUsername, HostKeyCallback: ssh.InsecureIgnoreHostKey(), @@ -127,7 +127,7 @@ func createProxyClientConfig(config *jobtypes.SSHProtocol, logger logger.Logger) } // dialViaProxy establishes SSH connection through a proxy server -func dialViaProxy(ctx context.Context, config *jobtypes.SSHProtocol, targetConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) { +func dialViaProxy(ctx context.Context, config *protocol.SSHProtocol, targetConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) { // Create proxy client configuration proxyConfig, err := createProxyClientConfig(config, logger) if err != nil { @@ -201,7 +201,7 @@ func DialWithContext(ctx context.Context, network, addr string, config *ssh.Clie } // DialWithProxy dials SSH connection through proxy with context support -func DialWithProxy(ctx context.Context, sshConfig *jobtypes.SSHProtocol, clientConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) { +func DialWithProxy(ctx context.Context, sshConfig *protocol.SSHProtocol, clientConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) { // TODO: implement Reuse connection logic // Check if proxy is enabled and configured diff --git a/internal/util/ssh/ssh_helper_test.go b/internal/util/ssh/ssh_helper_test.go index 12e7bfc..4cbc6fd 100644 --- a/internal/util/ssh/ssh_helper_test.go +++ b/internal/util/ssh/ssh_helper_test.go @@ -32,7 +32,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -65,7 +65,7 @@ func TestCreateSSHClientConfig_PasswordAuth(t *testing.T) { testLogger := createTestLogger() // Test configuration with password authentication - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ Username: "testuser", Password: "testpassword", } @@ -86,7 +86,7 @@ func TestCreateSSHClientConfig_PrivateKeyAuth(t *testing.T) { require.NoError(t, err) // Test configuration with private key authentication - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ Username: "testuser", PrivateKey: privateKeyStr, } @@ -103,7 +103,7 @@ func TestCreateSSHClientConfig_NoAuthMethod(t *testing.T) { testLogger := createTestLogger() // Test configuration with no authentication method - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ Username: "testuser", } @@ -153,7 +153,7 @@ func TestCreateProxyClientConfig_PasswordAuth(t *testing.T) { testLogger := createTestLogger() // Test configuration with proxy password authentication - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ ProxyUsername: "proxyuser", ProxyPassword: "proxypassword", } @@ -174,7 +174,7 @@ func TestCreateProxyClientConfig_PrivateKeyAuth(t *testing.T) { require.NoError(t, err) // Test configuration with proxy private key authentication - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ ProxyUsername: "proxyuser", ProxyPrivateKey: privateKeyStr, } @@ -191,7 +191,7 @@ func TestCreateProxyClientConfig_NoAuthMethod(t *testing.T) { testLogger := createTestLogger() // Test configuration with no proxy authentication method - config := &jobtypes.SSHProtocol{ + config := &protocol.SSHProtocol{ ProxyUsername: "proxyuser", } @@ -225,14 +225,14 @@ func TestDialWithProxy_Disabled(t *testing.T) { testLogger := createTestLogger() // Test with proxy disabled - should fall back to direct connection - sshConfig := &jobtypes.SSHProtocol{ - Host: "localhost", - Port: "22", - Username: "testuser", - Password: "testpassword", - UseProxy: "false", // Proxy disabled - ProxyHost: "", - ProxyPort: "", + sshConfig := &protocol.SSHProtocol{ + Host: "localhost", + Port: "22", + Username: "testuser", + Password: "testpassword", + UseProxy: "false", // Proxy disabled + ProxyHost: "", + ProxyPort: "", } clientConfig := &ssh.ClientConfig{ @@ -254,14 +254,14 @@ func TestDialWithProxy_EnabledButNotConfigured(t *testing.T) { testLogger := createTestLogger() // Test with proxy enabled but not properly configured - sshConfig := &jobtypes.SSHProtocol{ - Host: "localhost", - Port: "22", - Username: "testuser", - Password: "testpassword", - UseProxy: "true", // Proxy enabled - ProxyHost: "", // But not configured - ProxyPort: "", + sshConfig := &protocol.SSHProtocol{ + Host: "localhost", + Port: "22", + Username: "testuser", + Password: "testpassword", + UseProxy: "true", // Proxy enabled + ProxyHost: "", // But not configured + ProxyPort: "", } clientConfig := &ssh.ClientConfig{ @@ -284,7 +284,7 @@ func TestDialWithProxy_EnabledAndConfigured(t *testing.T) { testLogger := createTestLogger() // Test with proxy enabled and configured but missing credentials - sshConfig := &jobtypes.SSHProtocol{ + sshConfig := &protocol.SSHProtocol{ Host: "localhost", Port: "22", Username: "testuser", @@ -313,7 +313,7 @@ func TestDialWithProxy_EnabledAndFullyConfigured(t *testing.T) { testLogger := createTestLogger() // Test with proxy enabled and fully configured but missing proxy auth - sshConfig := &jobtypes.SSHProtocol{ + sshConfig := &protocol.SSHProtocol{ Host: "localhost", Port: "22", Username: "testuser", @@ -337,4 +337,4 @@ func TestDialWithProxy_EnabledAndFullyConfigured(t *testing.T) { // Should fail due to missing proxy authentication assert.Error(t, err) assert.Contains(t, err.Error(), "either proxy password or proxy private key is required when using proxy") -} \ No newline at end of file +}