diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go index d768c025a3..9fc52de1dd 100644 --- a/go-client/admin/client_test.go +++ b/go-client/admin/client_test.go @@ -51,7 +51,7 @@ func defaultReplicaServerPorts() []int { func timeoutConfig() Config { return Config{ - MetaServers: []string{"0.0.0.0:123456"}, + MetaServers: []string{"0.0.0.0:12345"}, Timeout: 500 * time.Millisecond, } } diff --git a/go-client/pegasus/client.go b/go-client/pegasus/client.go index e14b282680..d1b5b2419e 100644 --- a/go-client/pegasus/client.go +++ b/go-client/pegasus/client.go @@ -64,7 +64,7 @@ func NewClient(cfg config.Config) Client { func newClientWithError(cfg config.Config) (Client, error) { var err error - cfg.MetaServers, err = session.ResolveMetaAddr(cfg.MetaServers) + _, err = session.ResolveMetaAddr(cfg.MetaServers) if err != nil { return nil, err } diff --git a/go-client/session/addr.go b/go-client/session/addr.go index c4edc1a739..7fe665ff74 100644 --- a/go-client/session/addr.go +++ b/go-client/session/addr.go @@ -24,9 +24,8 @@ import ( "net" ) -// ResolveMetaAddr into a list of TCP4 addresses. Error is returned if the given `addrs` are not either -// a list of valid TCP4 addresses, or a resolvable hostname. -func ResolveMetaAddr(addrs []string) ([]string, error) { +// resolveMetaAddrImpl is the actual implementation of ResolveMetaAddr +func resolveMetaAddrImpl(addrs []string) ([]string, error) { if len(addrs) == 0 { return nil, fmt.Errorf("meta server list should not be empty") } @@ -54,3 +53,8 @@ func ResolveMetaAddr(addrs []string) ([]string, error) { return nil, fmt.Errorf("illegal meta addresses: %s", addrs) } + +// ResolveMetaAddr into a list of TCP4 addresses. Error is returned if the given `addrs` are not either +// a list of valid TCP4 addresses, or a resolvable hostname. +// This is a function variable so it can be mocked in tests. +var ResolveMetaAddr = resolveMetaAddrImpl diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index f86b9face0..af1df0e495 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -27,7 +27,6 @@ import ( "github.com/apache/incubator-pegasus/go-client/idl/base" "github.com/apache/incubator-pegasus/go-client/idl/replication" - "github.com/apache/incubator-pegasus/go-client/pegalog" ) type metaCallFunc func(context.Context, *metaSession) (metaResponse, error) @@ -127,28 +126,15 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool { } addr := forwardAddr.GetAddress() - found := false c.lock.Lock() - for i := range c.metaIPAddrs { - if addr == c.metaIPAddrs[i] { - found = true - break - } - } - c.lock.Unlock() - - if !found { - c.lock.Lock() - c.metaIPAddrs = append(c.metaIPAddrs, addr) - c.metas = append(c.metas, &metaSession{ - NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics), - logger: pegalog.GetLogger(), - }) + found := addMetaSession(&c.metaIPAddrs, &c.metas, addr) + if found { curLeader = len(c.metas) - 1 meta = c.metas[curLeader] meta.logger.Printf("add forward address %s as meta server", addr) - c.lock.Unlock() - + } + c.lock.Unlock() + if found { resp, err = c.callFunc(ctx, meta) } } diff --git a/go-client/session/meta_session.go b/go-client/session/meta_session.go index 252839f1e9..5225daefcc 100644 --- a/go-client/session/meta_session.go +++ b/go-client/session/meta_session.go @@ -21,6 +21,8 @@ package session import ( "context" + "net" + "strings" "sync" "github.com/apache/incubator-pegasus/go-client/idl/base" @@ -65,6 +67,7 @@ type MetaManager struct { logger pegalog.Logger metaIPAddrs []string + metaDomain string metas []*metaSession currentLeader int // current leader of meta servers @@ -73,6 +76,16 @@ type MetaManager struct { } func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager { + metaHostAddr := "" + if len(addrs) == 1 && isDomain(addrs[0]) { + metaHostAddr = addrs[0] + } + + addrs, err := ResolveMetaAddr(addrs) + if err != nil { + return nil + } + metas := make([]*metaSession, len(addrs)) metaIPAddrs := make([]string, len(addrs)) for i, addr := range addrs { @@ -87,11 +100,30 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager { currentLeader: 0, metas: metas, metaIPAddrs: metaIPAddrs, + metaDomain: metaHostAddr, logger: pegalog.GetLogger(), } return mm } +func isDomain(s string) bool { + host := s + if idx := strings.LastIndex(host, ":"); idx != -1 { + host = host[:idx] + } + + if ip := net.ParseIP(host); ip != nil { + return false + } + + ips, err := net.LookupIP(host) + if err != nil || len(ips) == 0 { + return false + } + + return true +} + func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) { call := newMetaCall(m.getCurrentLeader(), m.metas, callFunc, m.metaIPAddrs) resp, err := call.Run(ctx) @@ -101,10 +133,47 @@ func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResp m.setNewMetas(call.metas) m.setMetaIPAddrs(call.metaIPAddrs) call.lock.RUnlock() + } else if m.metaDomain != "" { + if newAddrs, err := ResolveMetaAddr([]string{m.metaDomain}); err == nil { + m.mu.Lock() + m.logger.Printf("resolved meta list %s to %v", m.metaDomain, newAddrs) + anyNew := false + for _, addr := range newAddrs { + if addMetaSession(&m.metaIPAddrs, &m.metas, addr) { + anyNew = true + } + } + if anyNew { + m.currentLeader = len(m.metas) - 1 + } + m.mu.Unlock() + + } } return resp, err } +func addMetaSession(metaIPAddrs *[]string, metas *[]*metaSession, addr string) bool { + found := false + for _, oldAddr := range *metaIPAddrs { + if oldAddr == addr { + found = true + break + } + } + + if !found { + *metaIPAddrs = append(*metaIPAddrs, addr) + *metas = append(*metas, &metaSession{ + NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics), + logger: pegalog.GetLogger(), + }) + return true + } + + return false +} + // QueryConfig queries table configuration from the leader of meta servers. If the leader was changed, // it retries for other servers until it finds the true leader, unless no leader exists. // Thread-Safe diff --git a/go-client/session/meta_session_test.go b/go-client/session/meta_session_test.go index 5014a4680e..455bdde8dd 100644 --- a/go-client/session/meta_session_test.go +++ b/go-client/session/meta_session_test.go @@ -21,6 +21,7 @@ package session import ( "context" + "errors" "sync" "testing" "time" @@ -142,3 +143,79 @@ func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) { assert.Equal(t, resp.Err.Errno, base.ERR_OK.String()) } } + +// TestMetaManager_DNSResolveHost tests that MetaManager can resolve meta IP addresses from a domain name. +func TestMetaManager_DNSResolveHost(t *testing.T) { + defer leaktest.Check(t)() + + originalResolveMetaAddr := ResolveMetaAddr + defer func() { + ResolveMetaAddr = originalResolveMetaAddr + }() + + // Mock the ResolveMetaAddr function + ResolveMetaAddr = func(hosts []string) ([]string, error) { + return []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, nil + } + + // Create a MetaManager with a domain name + mm := NewMetaManager([]string{"localhost:34601"}, NewNodeSession) + defer mm.Close() + + // Verify initial resolution + assert.Equal(t, []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, mm.GetMetaIPAddrs()) + + _, err := mm.QueryConfig(context.Background(), "temp") + assert.Nil(t, err) +} + +// TestMetaManager_DNSMetaAllChanged tests that MetaManager re-resolves DNS and updates meta servers when all meta servers fail. +func TestMetaManager_DNSMetaAllChanged(t *testing.T) { + defer leaktest.Check(t)() + + // Save original ResolveMetaAddr function + originalResolveMetaAddr := ResolveMetaAddr + defer func() { + ResolveMetaAddr = originalResolveMetaAddr + }() + + // Mock the ResolveMetaAddr function to return addresses(unreachable ones) + ResolveMetaAddr = func(hosts []string) ([]string, error) { + return []string{"172.0.0.1:34601", "172.0.0.2:34601"}, nil + } + + // Create a MetaManager with a domain name + mm := NewMetaManager([]string{"localhost:34601"}, NewNodeSession) + defer mm.Close() + + // Verify initial resolution + assert.Equal(t, []string{"172.0.0.1:34601", "172.0.0.2:34601"}, mm.GetMetaIPAddrs()) + + // Create a context with timeout to trigger failure + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // This should timeout because the initial meta servers are unreachable + _, err := mm.QueryConfig(ctx, "temp") + assert.NotNil(t, err) + assert.True(t, errors.Is(context.DeadlineExceeded, ctx.Err())) + + // Mock the ResolveMetaAddr function to return new addresses(reachable ones) + ResolveMetaAddr = func(hosts []string) ([]string, error) { + return []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}, nil + } + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // This should also timeout because the initial meta servers are still unreachable, but it will update the meta servers after timeout. + _, err = mm.QueryConfig(ctx, "temp") + assert.NotNil(t, err) + assert.True(t, errors.Is(context.DeadlineExceeded, ctx.Err())) + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // This should not timeout because meta servers are reachable now. + _, err = mm.QueryConfig(ctx, "temp") + assert.Nil(t, err) +}