Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go-client/admin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func defaultReplicaServerPorts() []int {

func timeoutConfig() Config {
return Config{
MetaServers: []string{"0.0.0.0:123456"},
MetaServers: []string{"0.0.0.0:12345"},
Timeout: 500 * time.Millisecond,
}
}
Expand Down
2 changes: 1 addition & 1 deletion go-client/pegasus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewClient(cfg config.Config) Client {

func newClientWithError(cfg config.Config) (Client, error) {
var err error
cfg.MetaServers, err = session.ResolveMetaAddr(cfg.MetaServers)
_, err = session.ResolveMetaAddr(cfg.MetaServers)
if err != nil {
return nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions go-client/session/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
"net"
)

// ResolveMetaAddr into a list of TCP4 addresses. Error is returned if the given `addrs` are not either
// a list of valid TCP4 addresses, or a resolvable hostname.
func ResolveMetaAddr(addrs []string) ([]string, error) {
// resolveMetaAddrImpl is the actual implementation of ResolveMetaAddr
func resolveMetaAddrImpl(addrs []string) ([]string, error) {
if len(addrs) == 0 {
return nil, fmt.Errorf("meta server list should not be empty")
}
Expand Down Expand Up @@ -54,3 +53,8 @@ func ResolveMetaAddr(addrs []string) ([]string, error) {

return nil, fmt.Errorf("illegal meta addresses: %s", addrs)
}

// ResolveMetaAddr into a list of TCP4 addresses. Error is returned if the given `addrs` are not either
// a list of valid TCP4 addresses, or a resolvable hostname.
// This is a function variable so it can be mocked in tests.
var ResolveMetaAddr = resolveMetaAddrImpl
24 changes: 5 additions & 19 deletions go-client/session/meta_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
)

type metaCallFunc func(context.Context, *metaSession) (metaResponse, error)
Expand Down Expand Up @@ -127,28 +126,15 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
}

addr := forwardAddr.GetAddress()
found := false
c.lock.Lock()
for i := range c.metaIPAddrs {
if addr == c.metaIPAddrs[i] {
found = true
break
}
}
c.lock.Unlock()

if !found {
c.lock.Lock()
c.metaIPAddrs = append(c.metaIPAddrs, addr)
c.metas = append(c.metas, &metaSession{
NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics),
logger: pegalog.GetLogger(),
})
found := addMetaSession(&c.metaIPAddrs, &c.metas, addr)
if found {
curLeader = len(c.metas) - 1
meta = c.metas[curLeader]
meta.logger.Printf("add forward address %s as meta server", addr)
c.lock.Unlock()

}
c.lock.Unlock()
if found {
resp, err = c.callFunc(ctx, meta)
}
}
Expand Down
69 changes: 69 additions & 0 deletions go-client/session/meta_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package session

import (
"context"
"net"
"strings"
"sync"

"github.com/apache/incubator-pegasus/go-client/idl/base"
Expand Down Expand Up @@ -65,6 +67,7 @@ type MetaManager struct {
logger pegalog.Logger

metaIPAddrs []string
metaDomain string
metas []*metaSession
currentLeader int // current leader of meta servers

Expand All @@ -73,6 +76,16 @@ type MetaManager struct {
}

func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager {
metaHostAddr := ""
if len(addrs) == 1 && isDomain(addrs[0]) {
metaHostAddr = addrs[0]
}

addrs, err := ResolveMetaAddr(addrs)
if err != nil {
return nil
}

metas := make([]*metaSession, len(addrs))
metaIPAddrs := make([]string, len(addrs))
for i, addr := range addrs {
Expand All @@ -87,11 +100,30 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager {
currentLeader: 0,
metas: metas,
metaIPAddrs: metaIPAddrs,
metaDomain: metaHostAddr,
logger: pegalog.GetLogger(),
}
return mm
}

func isDomain(s string) bool {
host := s
if idx := strings.LastIndex(host, ":"); idx != -1 {
host = host[:idx]
}

if ip := net.ParseIP(host); ip != nil {
return false
}

ips, err := net.LookupIP(host)
if err != nil || len(ips) == 0 {
return false
}

return true
}

func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) {
call := newMetaCall(m.getCurrentLeader(), m.metas, callFunc, m.metaIPAddrs)
resp, err := call.Run(ctx)
Expand All @@ -101,10 +133,47 @@ func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResp
m.setNewMetas(call.metas)
m.setMetaIPAddrs(call.metaIPAddrs)
call.lock.RUnlock()
} else if m.metaDomain != "" {
if newAddrs, err := ResolveMetaAddr([]string{m.metaDomain}); err == nil {
m.mu.Lock()
m.logger.Printf("resolved meta list %s to %v", m.metaDomain, newAddrs)
anyNew := false
for _, addr := range newAddrs {
if addMetaSession(&m.metaIPAddrs, &m.metas, addr) {
anyNew = true
}
}
if anyNew {
m.currentLeader = len(m.metas) - 1
}
m.mu.Unlock()

}
}
return resp, err
}

func addMetaSession(metaIPAddrs *[]string, metas *[]*metaSession, addr string) bool {
found := false
for _, oldAddr := range *metaIPAddrs {
if oldAddr == addr {
found = true
break
}
}

if !found {
*metaIPAddrs = append(*metaIPAddrs, addr)
*metas = append(*metas, &metaSession{
NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics),
logger: pegalog.GetLogger(),
})
return true
}

return false
}

// QueryConfig queries table configuration from the leader of meta servers. If the leader was changed,
// it retries for other servers until it finds the true leader, unless no leader exists.
// Thread-Safe
Expand Down
77 changes: 77 additions & 0 deletions go-client/session/meta_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package session

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -142,3 +143,79 @@ func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) {
assert.Equal(t, resp.Err.Errno, base.ERR_OK.String())
}
}

// TestMetaManager_DNSResolveHost tests that MetaManager can resolve meta IP addresses from a domain name.
func TestMetaManager_DNSResolveHost(t *testing.T) {
defer leaktest.Check(t)()

originalResolveMetaAddr := ResolveMetaAddr
defer func() {
ResolveMetaAddr = originalResolveMetaAddr
}()

// Mock the ResolveMetaAddr function
ResolveMetaAddr = func(hosts []string) ([]string, error) {
return []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, nil
}

// Create a MetaManager with a domain name
mm := NewMetaManager([]string{"localhost:34601"}, NewNodeSession)
defer mm.Close()

// Verify initial resolution
assert.Equal(t, []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, mm.GetMetaIPAddrs())

_, err := mm.QueryConfig(context.Background(), "temp")
assert.Nil(t, err)
}

// TestMetaManager_DNSMetaAllChanged tests that MetaManager re-resolves DNS and updates meta servers when all meta servers fail.
func TestMetaManager_DNSMetaAllChanged(t *testing.T) {
defer leaktest.Check(t)()

// Save original ResolveMetaAddr function
originalResolveMetaAddr := ResolveMetaAddr
defer func() {
ResolveMetaAddr = originalResolveMetaAddr
}()

// Mock the ResolveMetaAddr function to return addresses(unreachable ones)
ResolveMetaAddr = func(hosts []string) ([]string, error) {
return []string{"172.0.0.1:34601", "172.0.0.2:34601"}, nil
}

// Create a MetaManager with a domain name
mm := NewMetaManager([]string{"localhost:34601"}, NewNodeSession)
defer mm.Close()

// Verify initial resolution
assert.Equal(t, []string{"172.0.0.1:34601", "172.0.0.2:34601"}, mm.GetMetaIPAddrs())

// Create a context with timeout to trigger failure
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// This should timeout because the initial meta servers are unreachable
_, err := mm.QueryConfig(ctx, "temp")
assert.NotNil(t, err)
assert.True(t, errors.Is(context.DeadlineExceeded, ctx.Err()))

// Mock the ResolveMetaAddr function to return new addresses(reachable ones)
ResolveMetaAddr = func(hosts []string) ([]string, error) {
return []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, nil
}

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// This should also timeout because the initial meta servers are still unreachable, but it will update the meta servers after timeout.
_, err = mm.QueryConfig(ctx, "temp")
assert.NotNil(t, err)
assert.True(t, errors.Is(context.DeadlineExceeded, ctx.Err()))

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// This should not timeout because meta servers are reachable now.
_, err = mm.QueryConfig(ctx, "temp")
assert.Nil(t, err)
}
Loading