Skip to content
50 changes: 45 additions & 5 deletions pulsaradmin/pkg/admin/broker_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package admin

import (
"context"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)

Expand All @@ -26,17 +28,32 @@ type BrokerStats interface {
// GetMetrics returns Monitoring metrics
GetMetrics() ([]utils.Metrics, error)

// GetMetricsWithContext returns Monitoring metrics
GetMetricsWithContext(context.Context) ([]utils.Metrics, error)

// GetMBeans requests JSON string server mbean dump
GetMBeans() ([]utils.Metrics, error)

// GetMBeansWithContext requests JSON string server mbean dump
GetMBeansWithContext(context.Context) ([]utils.Metrics, error)

// GetTopics returns JSON string topics stats
GetTopics() (string, error)

// GetTopicsWithContext returns JSON string topics stats
GetTopicsWithContext(context.Context) (string, error)

// GetLoadReport returns load report of broker
GetLoadReport() (*utils.LocalBrokerData, error)

// GetLoadReport returns load report of broker
GetLoadReportWithContext(context.Context) (*utils.LocalBrokerData, error)

// GetAllocatorStats returns stats from broker
GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error)

// GetAllocatorStatsWithContext returns stats from broker
GetAllocatorStatsWithContext(ctx context.Context, allocatorName string) (*utils.AllocatorStats, error)
}

type brokerStats struct {
Expand All @@ -53,9 +70,13 @@ func (c *pulsarClient) BrokerStats() BrokerStats {
}

func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
return bs.GetMetricsWithContext(context.Background())
}

func (bs *brokerStats) GetMetricsWithContext(ctx context.Context) ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/metrics")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
if err != nil {
return nil, err
}
Expand All @@ -64,9 +85,13 @@ func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
}

func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
return bs.GetMBeansWithContext(context.Background())
}

func (bs *brokerStats) GetMBeansWithContext(ctx context.Context) ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/mbeans")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
if err != nil {
return nil, err
}
Expand All @@ -75,8 +100,12 @@ func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
}

func (bs *brokerStats) GetTopics() (string, error) {
return bs.GetTopicsWithContext(context.Background())
}

func (bs *brokerStats) GetTopicsWithContext(ctx context.Context) (string, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/topics")
buf, err := bs.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
buf, err := bs.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, nil, nil, false)
if err != nil {
return "", err
}
Expand All @@ -85,19 +114,30 @@ func (bs *brokerStats) GetTopics() (string, error) {
}

func (bs *brokerStats) GetLoadReport() (*utils.LocalBrokerData, error) {
return bs.GetLoadReportWithContext(context.Background())
}

func (bs *brokerStats) GetLoadReportWithContext(ctx context.Context) (*utils.LocalBrokerData, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/load-report")
response := utils.NewLocalBrokerData()
err := bs.pulsar.Client.Get(endpoint, &response)
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
if err != nil {
return nil, nil
}
return &response, nil
}

func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error) {
return bs.GetAllocatorStatsWithContext(context.Background(), allocatorName)
}

func (bs *brokerStats) GetAllocatorStatsWithContext(
ctx context.Context,
allocatorName string,
) (*utils.AllocatorStats, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/allocator-stats", allocatorName)
var allocatorStats utils.AllocatorStats
err := bs.pulsar.Client.Get(endpoint, &allocatorStats)
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &allocatorStats)
if err != nil {
return nil, err
}
Expand Down
128 changes: 114 additions & 14 deletions pulsaradmin/pkg/admin/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package admin

import (
"context"
"fmt"
"strings"

Expand All @@ -27,42 +28,85 @@ import (
// Brokers is admin interface for brokers management
type Brokers interface {

// GetListActiveBrokers Get the list of active brokers in the local cluster.
// GetListActiveBrokers returns the list of active brokers in the local cluster.
GetListActiveBrokers() ([]string, error)

// GetListActiveBrokersWithContext returns the list of active brokers in the local cluster.
GetListActiveBrokersWithContext(context.Context) ([]string, error)

// GetActiveBrokers returns the list of active brokers in the cluster.
GetActiveBrokers(cluster string) ([]string, error)

// GetActiveBrokersWithContext returns the list of active brokers in the cluster.
GetActiveBrokersWithContext(ctx context.Context, cluster string) ([]string, error)

// GetDynamicConfigurationNames returns list of updatable configuration name
GetDynamicConfigurationNames() ([]string, error)

// GetDynamicConfigurationNamesWithContext returns list of updatable configuration name
GetDynamicConfigurationNamesWithContext(context.Context) ([]string, error)

// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error)

// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
GetOwnedNamespacesWithContext(
ctx context.Context,
cluster,
brokerURL string,
) (map[string]utils.NamespaceOwnershipStatus, error)

// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
// brokers and all brokers can update {@link ServiceConfiguration} value locally
UpdateDynamicConfiguration(configName, configValue string) error

// UpdateDynamicConfigurationWithContext updates dynamic configuration value in to Zk that triggers watch on
// brokers and all brokers can update {@link ServiceConfiguration} value locally
UpdateDynamicConfigurationWithContext(ctx context.Context, configName, configValue string) error

// DeleteDynamicConfiguration deletes dynamic configuration value in to Zk. It will not impact current value
// in broker but next time when broker restarts, it applies value from configuration file only.
DeleteDynamicConfiguration(configName string) error

// DeleteDynamicConfigurationWithContext deletes dynamic configuration value in to Zk. It will not impact current value
// in broker but next time when broker restarts, it applies value from configuration file only.
DeleteDynamicConfigurationWithContext(ctx context.Context, configName string) error

// GetRuntimeConfigurations returns values of runtime configuration
GetRuntimeConfigurations() (map[string]string, error)

// GetRuntimeConfigurationsWithContext returns values of runtime configuration
GetRuntimeConfigurationsWithContext(context.Context) (map[string]string, error)

// GetInternalConfigurationData returns the internal configuration data
GetInternalConfigurationData() (*utils.InternalConfigurationData, error)

// GetInternalConfigurationDataWithContext returns the internal configuration data
GetInternalConfigurationDataWithContext(context.Context) (*utils.InternalConfigurationData, error)

// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
GetAllDynamicConfigurations() (map[string]string, error)

// GetAllDynamicConfigurationsWithContext returns values of all overridden dynamic-configs
GetAllDynamicConfigurationsWithContext(context.Context) (map[string]string, error)

// Deprecated: Use HealthCheckWithTopicVersion instead
HealthCheck() error

// HealthCheckWithTopicVersion run a health check on the broker
// Deprecated: Use HealthCheckWithTopicVersionWithContext instead
HealthCheckWithContext(context.Context) error

// HealthCheckWithTopicVersion runs a health check on the broker
HealthCheckWithTopicVersion(utils.TopicVersion) error

// HealthCheckWithTopicVersionWithContext runs a health check on the broker
HealthCheckWithTopicVersionWithContext(context.Context, utils.TopicVersion) error

// GetLeaderBroker get the information of the leader broker.
GetLeaderBroker() (utils.BrokerInfo, error)

// GetLeaderBrokerWithContext returns the information of the leader broker.
GetLeaderBrokerWithContext(context.Context) (utils.BrokerInfo, error)
}

type broker struct {
Expand All @@ -79,93 +123,144 @@ func (c *pulsarClient) Brokers() Brokers {
}

func (b *broker) GetActiveBrokers(cluster string) ([]string, error) {
return b.GetActiveBrokersWithContext(context.Background(), cluster)
}

func (b *broker) GetActiveBrokersWithContext(ctx context.Context, cluster string) ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster)
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetListActiveBrokers() ([]string, error) {
return b.GetListActiveBrokersWithContext(context.Background())
}

func (b *broker) GetListActiveBrokersWithContext(ctx context.Context) ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath)
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetDynamicConfigurationNames() ([]string, error) {
return b.GetDynamicConfigurationNamesWithContext(context.Background())
}

func (b *broker) GetDynamicConfigurationNamesWithContext(ctx context.Context) ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/")
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error) {
return b.GetOwnedNamespacesWithContext(context.Background(), cluster, brokerURL)
}

func (b *broker) GetOwnedNamespacesWithContext(
ctx context.Context,
cluster,
brokerURL string,
) (map[string]utils.NamespaceOwnershipStatus, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster, brokerURL, "ownedNamespaces")
var res map[string]utils.NamespaceOwnershipStatus
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) UpdateDynamicConfiguration(configName, configValue string) error {
return b.UpdateDynamicConfigurationWithContext(context.Background(), configName, configValue)
}

func (b *broker) UpdateDynamicConfigurationWithContext(ctx context.Context, configName, configValue string) error {
value := fmt.Sprintf("/configuration/%s/%s", configName, configValue)
endpoint := b.pulsar.endpointWithFullPath(b.basePath, value)
return b.pulsar.Client.Post(endpoint, nil)
return b.pulsar.Client.PostWithContext(ctx, endpoint, nil)
}

func (b *broker) DeleteDynamicConfiguration(configName string) error {
return b.DeleteDynamicConfigurationWithContext(context.Background(), configName)
}

func (b *broker) DeleteDynamicConfigurationWithContext(ctx context.Context, configName string) error {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName)
return b.pulsar.Client.Delete(endpoint)
return b.pulsar.Client.DeleteWithContext(ctx, endpoint)
}

func (b *broker) GetRuntimeConfigurations() (map[string]string, error) {
return b.GetRuntimeConfigurationsWithContext(context.Background())
}

func (b *broker) GetRuntimeConfigurationsWithContext(ctx context.Context) (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "runtime")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetInternalConfigurationData() (*utils.InternalConfigurationData, error) {
return b.GetInternalConfigurationDataWithContext(context.Background())
}

func (b *broker) GetInternalConfigurationDataWithContext(
ctx context.Context,
) (*utils.InternalConfigurationData, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/internal-configuration")
var res utils.InternalConfigurationData
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return &res, nil
}

func (b *broker) GetAllDynamicConfigurations() (map[string]string, error) {
return b.GetAllDynamicConfigurationsWithContext(context.Background())
}

func (b *broker) GetAllDynamicConfigurationsWithContext(ctx context.Context) (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "values")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) HealthCheck() error {
return b.HealthCheckWithTopicVersion(utils.TopicVersionV1)
return b.HealthCheckWithContext(context.Background())
}

func (b *broker) HealthCheckWithContext(ctx context.Context) error {
return b.HealthCheckWithTopicVersionWithContext(ctx, utils.TopicVersionV1)
}

func (b *broker) HealthCheckWithTopicVersion(topicVersion utils.TopicVersion) error {
return b.HealthCheckWithTopicVersionWithContext(context.Background(), topicVersion)
}

func (b *broker) HealthCheckWithTopicVersionWithContext(ctx context.Context, topicVersion utils.TopicVersion) error {
endpoint := b.pulsar.endpoint(b.basePath, "/health")

buf, err := b.pulsar.Client.GetWithQueryParams(endpoint, nil, map[string]string{
buf, err := b.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, nil, map[string]string{
"topicVersion": topicVersion.String(),
}, false)
if err != nil {
Expand All @@ -177,10 +272,15 @@ func (b *broker) HealthCheckWithTopicVersion(topicVersion utils.TopicVersion) er
}
return nil
}

func (b *broker) GetLeaderBroker() (utils.BrokerInfo, error) {
return b.GetLeaderBrokerWithContext(context.Background())
}

func (b *broker) GetLeaderBrokerWithContext(ctx context.Context) (utils.BrokerInfo, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/leaderBroker")
var brokerInfo utils.BrokerInfo
err := b.pulsar.Client.Get(endpoint, &brokerInfo)
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &brokerInfo)
if err != nil {
return brokerInfo, err
}
Expand Down
Loading