From 0ef057a3da461480bb7ee71a7476d20df446b687 Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Tue, 30 Apr 2013 15:54:43 -0400 Subject: [PATCH 1/7] rm needless lock on responses rm duplicate mark check in tests --- epsilon_greedy.go | 6 ++---- hostpool.go | 7 ------- hostpool_test.go | 2 -- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index f43fae7..ec2995a 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -13,10 +13,8 @@ type epsilonHostPoolResponse struct { } func (r *epsilonHostPoolResponse) Mark(err error) { - r.Do(func() { - r.ended = time.Now() - doMark(err, r) - }) + r.ended = time.Now() + r.standardHostPoolResponse.Mark(err) } diff --git a/hostpool.go b/hostpool.go index 25ca1fb..db30d41 100644 --- a/hostpool.go +++ b/hostpool.go @@ -28,7 +28,6 @@ type HostPoolResponse interface { type standardHostPoolResponse struct { host string - sync.Once pool HostPool } @@ -94,12 +93,6 @@ func (r *standardHostPoolResponse) hostPool() HostPool { } func (r *standardHostPoolResponse) Mark(err error) { - r.Do(func() { - doMark(err, r) - }) -} - -func doMark(err error, r HostPoolResponse) { if err == nil { r.hostPool().markSuccess(r) } else { diff --git a/hostpool_test.go b/hostpool_test.go index 352c8ea..76a4d4c 100644 --- a/hostpool_test.go +++ b/hostpool_test.go @@ -32,8 +32,6 @@ func TestHostPool(t *testing.T) { respC.Mark(nil) // get again, and verify that it's still c assert.Equal(t, p.Get().Host(), "c") - // now try to mark b as success; should fail because already marked - respB.Mark(nil) assert.Equal(t, p.Get().Host(), "c") // would be b if it were not dead // now restore a respA = &standardHostPoolResponse{host: "a", pool: p} From adb2d1761ea16f826e45fb36555643d47e9aa9b7 Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Tue, 30 Apr 2013 16:09:39 -0400 Subject: [PATCH 2/7] begin refactoring away from markSuccess --- epsilon_greedy.go | 27 ++++++++++++--------------- hostpool.go | 1 - 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index ec2995a..d93e270 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -7,15 +7,18 @@ import ( ) type epsilonHostPoolResponse struct { - standardHostPoolResponse + HostPoolResponse started time.Time ended time.Time + pool *epsilonGreedyHostPool } func (r *epsilonHostPoolResponse) Mark(err error) { - r.ended = time.Now() - r.standardHostPoolResponse.Mark(err) - + if err == nil { + r.ended = time.Now() + r.pool.recordTiming(r) + } + r.HostPoolResponse.Mark(err) } type epsilonGreedyHostPool struct { @@ -94,8 +97,9 @@ func (p *epsilonGreedyHostPool) Get() HostPoolResponse { host := p.getEpsilonGreedy() started := time.Now() return &epsilonHostPoolResponse{ - standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p}, - started: started, + HostPoolResponse: &standardHostPoolResponse{host: host, pool: p}, + started: started, + pool: p, } } @@ -158,15 +162,8 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() string { return hostToUse.host } -func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) { - // first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic - p.standardHostPool.markSuccess(hostR) - eHostR, ok := hostR.(*epsilonHostPoolResponse) - if !ok { - log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type - return - } - host := eHostR.host +func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { + host := eHostR.Host() duration := p.between(eHostR.started, eHostR.ended) p.Lock() diff --git a/hostpool.go b/hostpool.go index db30d41..dbef6b3 100644 --- a/hostpool.go +++ b/hostpool.go @@ -23,7 +23,6 @@ func Version() string { type HostPoolResponse interface { Host() string Mark(error) - hostPool() HostPool } type standardHostPoolResponse struct { From 5b6ec7b8060ffe1e32efb4dc0db6b66099edadea Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Tue, 30 Apr 2013 17:07:25 -0400 Subject: [PATCH 3/7] rm markSuccess and ilk from interface, bind response and hp types --- epsilon_greedy.go | 2 +- example_test.go | 6 +++--- hostpool.go | 17 +++++------------ hostpool_test.go | 2 +- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index d93e270..a065c78 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -97,7 +97,7 @@ func (p *epsilonGreedyHostPool) Get() HostPoolResponse { host := p.getEpsilonGreedy() started := time.Now() return &epsilonHostPoolResponse{ - HostPoolResponse: &standardHostPoolResponse{host: host, pool: p}, + HostPoolResponse: &standardHostPoolResponse{host: host, pool: &p.standardHostPool}, started: started, pool: p, } diff --git a/example_test.go b/example_test.go index 88d0e55..d2c168b 100644 --- a/example_test.go +++ b/example_test.go @@ -1,13 +1,13 @@ package hostpool import ( - "github.com/bitly/go-hostpool" + "errors" ) func ExampleNewEpsilonGreedy() { - hp := hostpool.NewEpsilonGreedy([]string{"a", "b"}, 0, &hostpool.LinearEpsilonValueCalculator{}) + hp := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}) hostResponse := hp.Get() hostname := hostResponse.Host() - err := nil // (make a request with hostname) + err := errors.New("I am your http error from " + hostname) // (make a request with hostname) hostResponse.Mark(err) } diff --git a/hostpool.go b/hostpool.go index dbef6b3..af22bad 100644 --- a/hostpool.go +++ b/hostpool.go @@ -27,7 +27,7 @@ type HostPoolResponse interface { type standardHostPoolResponse struct { host string - pool HostPool + pool *standardHostPool } // --- HostPool structs and interfaces ---- @@ -37,9 +37,6 @@ type standardHostPoolResponse struct { // get the list of all Hosts, and use ResetAll to reset state. type HostPool interface { Get() HostPoolResponse - // keep the marks separate so we can override independently - markSuccess(HostPoolResponse) - markFailed(HostPoolResponse) ResetAll() Hosts() []string @@ -87,15 +84,11 @@ func (r *standardHostPoolResponse) Host() string { return r.host } -func (r *standardHostPoolResponse) hostPool() HostPool { - return r.pool -} - func (r *standardHostPoolResponse) Mark(err error) { if err == nil { - r.hostPool().markSuccess(r) + r.pool.markSuccess(r) } else { - r.hostPool().markFailed(r) + r.pool.markFailed(r) } } @@ -147,7 +140,7 @@ func (p *standardHostPool) doResetAll() { } } -func (p *standardHostPool) markSuccess(hostR HostPoolResponse) { +func (p *standardHostPool) markSuccess(hostR *standardHostPoolResponse) { host := hostR.Host() p.Lock() defer p.Unlock() @@ -159,7 +152,7 @@ func (p *standardHostPool) markSuccess(hostR HostPoolResponse) { h.dead = false } -func (p *standardHostPool) markFailed(hostR HostPoolResponse) { +func (p *standardHostPool) markFailed(hostR *standardHostPoolResponse) { host := hostR.Host() p.Lock() defer p.Unlock() diff --git a/hostpool_test.go b/hostpool_test.go index 76a4d4c..0a3cf32 100644 --- a/hostpool_test.go +++ b/hostpool_test.go @@ -17,7 +17,7 @@ func TestHostPool(t *testing.T) { dummyErr := errors.New("Dummy Error") - p := New([]string{"a", "b", "c"}) + p := New([]string{"a", "b", "c"}).(*standardHostPool) assert.Equal(t, p.Get().Host(), "a") assert.Equal(t, p.Get().Host(), "b") assert.Equal(t, p.Get().Host(), "c") From 22283bf1ec771ada46cde1a844063dcda7926590 Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Tue, 30 Apr 2013 22:32:30 -0400 Subject: [PATCH 4/7] further force epsilon greedy to embed more generic HostPool; still some type assertions needed --- epsilon_greedy.go | 47 +++++++++++++++++++++++++++++------------------ hostpool.go | 6 ++++++ 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index a065c78..4d58c25 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -1,8 +1,10 @@ package hostpool import ( + "errors" "log" "math/rand" + "sync" "time" ) @@ -22,7 +24,8 @@ func (r *epsilonHostPoolResponse) Mark(err error) { } type epsilonGreedyHostPool struct { - standardHostPool // TODO - would be nifty if we could embed HostPool and Locker interfaces + HostPool + sync.Locker epsilon float32 // this is our exploration factor decayDuration time.Duration EpsilonValueCalculator // embed the epsilonValueCalculator @@ -50,7 +53,8 @@ func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonV } stdHP := New(hosts).(*standardHostPool) p := &epsilonGreedyHostPool{ - standardHostPool: *stdHP, + HostPool: stdHP, + Locker: stdHP, epsilon: float32(initialEpsilon), decayDuration: decayDuration, EpsilonValueCalculator: calc, @@ -58,7 +62,7 @@ func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonV } // allocate structures - for _, h := range p.hostList { + for _, h := range stdHP.hostList { h.epsilonCounts = make([]int64, epsilonBuckets) h.epsilonValues = make([]int64, epsilonBuckets) } @@ -82,7 +86,7 @@ func (p *epsilonGreedyHostPool) epsilonGreedyDecay() { } func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() { p.Lock() - for _, h := range p.hostList { + for _, h := range p.HostPool.(*standardHostPool).hostList { h.epsilonIndex += 1 h.epsilonIndex = h.epsilonIndex % epsilonBuckets h.epsilonCounts[h.epsilonIndex] = 0 @@ -93,17 +97,15 @@ func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() { func (p *epsilonGreedyHostPool) Get() HostPoolResponse { p.Lock() - defer p.Unlock() - host := p.getEpsilonGreedy() - started := time.Now() - return &epsilonHostPoolResponse{ - HostPoolResponse: &standardHostPoolResponse{host: host, pool: &p.standardHostPool}, - started: started, - pool: p, + host, err := p.getEpsilonGreedy() + p.Unlock() + if err != nil { + host = p.HostPool.Get().Host() } + return p.responseForHostName(host) } -func (p *epsilonGreedyHostPool) getEpsilonGreedy() string { +func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { var hostToUse *hostEntry // this is our exploration phase @@ -112,14 +114,14 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() string { if p.epsilon < minEpsilon { p.epsilon = minEpsilon } - return p.getRoundRobin() + return "", errors.New("Exploration") } // calculate values for each host in the 0..1 range (but not ormalized) var possibleHosts []*hostEntry now := time.Now() var sumValues float64 - for _, h := range p.hostList { + for _, h := range p.HostPool.(*standardHostPool).hostList { if h.canTryHost(now) { v := h.getWeightedAverageResponseTime() if v > 0 { @@ -153,13 +155,13 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() string { if len(possibleHosts) != 0 { log.Println("Failed to randomly choose a host, Dan loses") } - return p.getRoundRobin() + return "", errors.New("No host chosen") } if hostToUse.dead { - hostToUse.willRetryHost(p.maxRetryInterval) + hostToUse.willRetryHost(p.HostPool.(*standardHostPool).maxRetryInterval) } - return hostToUse.host + return hostToUse.host, nil } func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { @@ -168,7 +170,7 @@ func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { p.Lock() defer p.Unlock() - h, ok := p.hosts[host] + h, ok := p.HostPool.(*standardHostPool).hosts[host] if !ok { log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) } @@ -176,6 +178,15 @@ func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000) } +func (p *epsilonGreedyHostPool) responseForHostName(host string) HostPoolResponse { + started := time.Now() + return &epsilonHostPoolResponse{ + HostPoolResponse: p.HostPool.responseForHostName(host), + started: started, + pool: p, + } +} + // --- timer: this just exists for testing type timer interface { diff --git a/hostpool.go b/hostpool.go index af22bad..05dd0dd 100644 --- a/hostpool.go +++ b/hostpool.go @@ -40,6 +40,8 @@ type HostPool interface { ResetAll() Hosts() []string + + responseForHostName(string) HostPoolResponse } type standardHostPool struct { @@ -175,3 +177,7 @@ func (p *standardHostPool) Hosts() []string { } return hosts } + +func (p *standardHostPool) responseForHostName(host string) HostPoolResponse { + return &standardHostPoolResponse{host: host, pool: p} +} From 291ae4626e614ef36302acfcdb83f6e858c100e7 Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Wed, 1 May 2013 12:25:08 -0400 Subject: [PATCH 5/7] change to selectHost to (sort of) remove knowledge of h.dead from epsilon greedy --- epsilon_greedy.go | 18 ++++++++++-------- hostpool.go | 25 +++++++++++++++---------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index 4d58c25..334c3c4 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -100,9 +100,9 @@ func (p *epsilonGreedyHostPool) Get() HostPoolResponse { host, err := p.getEpsilonGreedy() p.Unlock() if err != nil { - host = p.HostPool.Get().Host() + return p.toEpsilonHostPootResponse(p.HostPool.Get()) } - return p.responseForHostName(host) + return p.selectHost(host) } func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { @@ -157,10 +157,6 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { } return "", errors.New("No host chosen") } - - if hostToUse.dead { - hostToUse.willRetryHost(p.HostPool.(*standardHostPool).maxRetryInterval) - } return hostToUse.host, nil } @@ -178,10 +174,16 @@ func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000) } -func (p *epsilonGreedyHostPool) responseForHostName(host string) HostPoolResponse { +func (p *epsilonGreedyHostPool) selectHost(host string) HostPoolResponse { + resp := p.HostPool.selectHost(host) + return p.toEpsilonHostPootResponse(resp) +} + +// Convert regular response to one equipped for EG. Doesn't require lock, for now +func (p *epsilonGreedyHostPool) toEpsilonHostPootResponse(resp HostPoolResponse) *epsilonHostPoolResponse { started := time.Now() return &epsilonHostPoolResponse{ - HostPoolResponse: p.HostPool.responseForHostName(host), + HostPoolResponse: resp, started: started, pool: p, } diff --git a/hostpool.go b/hostpool.go index 05dd0dd..3320fc4 100644 --- a/hostpool.go +++ b/hostpool.go @@ -41,7 +41,7 @@ type HostPool interface { ResetAll() Hosts() []string - responseForHostName(string) HostPoolResponse + selectHost(string) HostPoolResponse } type standardHostPool struct { @@ -97,9 +97,9 @@ func (r *standardHostPoolResponse) Mark(err error) { // return an entry from the HostPool func (p *standardHostPool) Get() HostPoolResponse { p.Lock() - defer p.Unlock() host := p.getRoundRobin() - return &standardHostPoolResponse{host: host, pool: p} + p.Unlock() + return p.selectHost(host) } func (p *standardHostPool) getRoundRobin() string { @@ -110,12 +110,7 @@ func (p *standardHostPool) getRoundRobin() string { currentIndex := (i + p.nextHostIndex) % hostCount h := p.hostList[currentIndex] - if !h.dead { - p.nextHostIndex = currentIndex + 1 - return h.host - } - if h.nextRetry.Before(now) { - h.willRetryHost(p.maxRetryInterval) + if h.canTryHost(now) { p.nextHostIndex = currentIndex + 1 return h.host } @@ -178,6 +173,16 @@ func (p *standardHostPool) Hosts() []string { return hosts } -func (p *standardHostPool) responseForHostName(host string) HostPoolResponse { +func (p *standardHostPool) selectHost(host string) HostPoolResponse { + p.Lock() + defer p.Unlock() + h, ok := p.hosts[host] + if !ok { + log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) + } + now := time.Now() + if h.dead && h.nextRetry.Before(now) { + h.willRetryHost(p.maxRetryInterval) + } return &standardHostPoolResponse{host: host, pool: p} } From 25b6b21c282f2522fd58a11591a04fe92173f1bc Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Tue, 3 Sep 2013 15:50:00 -0400 Subject: [PATCH 6/7] refactor to public and package level methods --- epsilon_greedy.go | 10 +++++----- example_test.go | 2 +- hostpool.go | 16 +++++++++++----- hostpool_test.go | 28 ++++++++++++++-------------- 4 files changed, 31 insertions(+), 25 deletions(-) diff --git a/epsilon_greedy.go b/epsilon_greedy.go index 334c3c4..ff6165c 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -95,14 +95,14 @@ func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() { p.Unlock() } -func (p *epsilonGreedyHostPool) Get() HostPoolResponse { +func (p *epsilonGreedyHostPool) ChooseNextHost() string { p.Lock() host, err := p.getEpsilonGreedy() p.Unlock() if err != nil { - return p.toEpsilonHostPootResponse(p.HostPool.Get()) + host = p.HostPool.ChooseNextHost() } - return p.selectHost(host) + return host } func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { @@ -174,8 +174,8 @@ func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000) } -func (p *epsilonGreedyHostPool) selectHost(host string) HostPoolResponse { - resp := p.HostPool.selectHost(host) +func (p *epsilonGreedyHostPool) DeliverHostResponse(host string) HostPoolResponse { + resp := p.HostPool.DeliverHostResponse(host) return p.toEpsilonHostPootResponse(resp) } diff --git a/example_test.go b/example_test.go index d2c168b..f081525 100644 --- a/example_test.go +++ b/example_test.go @@ -6,7 +6,7 @@ import ( func ExampleNewEpsilonGreedy() { hp := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}) - hostResponse := hp.Get() + hostResponse := Get(hp) hostname := hostResponse.Host() err := errors.New("I am your http error from " + hostname) // (make a request with hostname) hostResponse.Mark(err) diff --git a/hostpool.go b/hostpool.go index 3320fc4..7341b64 100644 --- a/hostpool.go +++ b/hostpool.go @@ -36,12 +36,13 @@ type standardHostPoolResponse struct { // allow you to Get a HostPoolResponse (which includes a hostname to use), // get the list of all Hosts, and use ResetAll to reset state. type HostPool interface { - Get() HostPoolResponse + // Get() HostPoolResponse ResetAll() Hosts() []string - selectHost(string) HostPoolResponse + ChooseNextHost() string + DeliverHostResponse(string) HostPoolResponse } type standardHostPool struct { @@ -95,11 +96,16 @@ func (r *standardHostPoolResponse) Mark(err error) { } // return an entry from the HostPool -func (p *standardHostPool) Get() HostPoolResponse { +func Get(p HostPool) HostPoolResponse { + host := p.ChooseNextHost() + return p.DeliverHostResponse(host) +} + +func (p *standardHostPool) ChooseNextHost() string { p.Lock() host := p.getRoundRobin() p.Unlock() - return p.selectHost(host) + return host } func (p *standardHostPool) getRoundRobin() string { @@ -173,7 +179,7 @@ func (p *standardHostPool) Hosts() []string { return hosts } -func (p *standardHostPool) selectHost(host string) HostPoolResponse { +func (p *standardHostPool) DeliverHostResponse(host string) HostPoolResponse { p.Lock() defer p.Unlock() h, ok := p.hosts[host] diff --git a/hostpool_test.go b/hostpool_test.go index 0a3cf32..ba380ae 100644 --- a/hostpool_test.go +++ b/hostpool_test.go @@ -18,33 +18,33 @@ func TestHostPool(t *testing.T) { dummyErr := errors.New("Dummy Error") p := New([]string{"a", "b", "c"}).(*standardHostPool) - assert.Equal(t, p.Get().Host(), "a") - assert.Equal(t, p.Get().Host(), "b") - assert.Equal(t, p.Get().Host(), "c") - respA := p.Get() + assert.Equal(t, Get(p).Host(), "a") + assert.Equal(t, Get(p).Host(), "b") + assert.Equal(t, Get(p).Host(), "c") + respA := Get(p) assert.Equal(t, respA.Host(), "a") respA.Mark(dummyErr) - respB := p.Get() + respB := Get(p) respB.Mark(dummyErr) - respC := p.Get() + respC := Get(p) assert.Equal(t, respC.Host(), "c") respC.Mark(nil) // get again, and verify that it's still c - assert.Equal(t, p.Get().Host(), "c") - assert.Equal(t, p.Get().Host(), "c") // would be b if it were not dead + assert.Equal(t, Get(p).Host(), "c") + assert.Equal(t, Get(p).Host(), "c") // would be b if it were not dead // now restore a respA = &standardHostPoolResponse{host: "a", pool: p} respA.Mark(nil) - assert.Equal(t, p.Get().Host(), "a") - assert.Equal(t, p.Get().Host(), "c") + assert.Equal(t, Get(p).Host(), "a") + assert.Equal(t, Get(p).Host(), "c") // ensure that we get *something* back when all hosts fail for _, host := range []string{"a", "b", "c"} { response := &standardHostPoolResponse{host: host, pool: p} response.Mark(dummyErr) } - resp := p.Get() + resp := Get(p) assert.NotEqual(t, resp, nil) } @@ -79,7 +79,7 @@ func TestEpsilonGreedy(t *testing.T) { if i != 0 && i%100 == 0 { p.performEpsilonGreedyDecay() } - hostR := p.Get() + hostR := Get(p) host := hostR.Host() hitCounts[host]++ timing := timings[host] @@ -103,7 +103,7 @@ func TestEpsilonGreedy(t *testing.T) { if i != 0 && i%100 == 0 { p.performEpsilonGreedyDecay() } - hostR := p.Get() + hostR := Get(p) host := hostR.Host() hitCounts[host]++ timing := timings[host] @@ -136,7 +136,7 @@ func BenchmarkEpsilonGreedy(b *testing.B) { if i != 0 && i%100 == 0 { p.performEpsilonGreedyDecay() } - hostR := p.Get() + hostR := Get(p) p.timer = &mockTimer{t: int(timings[i])} hostR.Mark(nil) } From d94a633b9e116923d4998c671a68d0bcd7103d40 Mon Sep 17 00:00:00 2001 From: Dan Frank Date: Wed, 4 Sep 2013 18:03:10 -0400 Subject: [PATCH 7/7] refactor to selector interface --- epsilon_greedy.go | 100 ++++++++++++++++---------------- example_test.go | 4 +- hostpool.go | 142 ++++++---------------------------------------- hostpool_test.go | 52 ++++++++--------- selector.go | 118 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 211 insertions(+), 205 deletions(-) create mode 100644 selector.go diff --git a/epsilon_greedy.go b/epsilon_greedy.go index ff6165c..10e60cf 100644 --- a/epsilon_greedy.go +++ b/epsilon_greedy.go @@ -10,21 +10,21 @@ import ( type epsilonHostPoolResponse struct { HostPoolResponse - started time.Time - ended time.Time - pool *epsilonGreedyHostPool + started time.Time + ended time.Time + selector *epsilonGreedySelector } func (r *epsilonHostPoolResponse) Mark(err error) { if err == nil { r.ended = time.Now() - r.pool.recordTiming(r) + r.selector.recordTiming(r) } r.HostPoolResponse.Mark(err) } -type epsilonGreedyHostPool struct { - HostPool +type epsilonGreedySelector struct { + Selector sync.Locker epsilon float32 // this is our exploration factor decayDuration time.Duration @@ -32,7 +32,7 @@ type epsilonGreedyHostPool struct { timer } -// Construct an Epsilon Greedy HostPool +// Construct an Epsilon Greedy Selector // // Epsilon Greedy is an algorithm that allows HostPool not only to track failure state, // but also to learn about "better" options in terms of speed, and to pick from available hosts @@ -46,73 +46,71 @@ type epsilonGreedyHostPool struct { // To compute the weighting scores, we perform a weighted average of recent response times, over the course of // `decayDuration`. decayDuration may be set to 0 to use the default value of 5 minutes // We then use the supplied EpsilonValueCalculator to calculate a score from that weighted average response time. -func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool { +func NewEpsilonGreedy(decayDuration time.Duration, calc EpsilonValueCalculator) Selector { if decayDuration <= 0 { decayDuration = defaultDecayDuration } - stdHP := New(hosts).(*standardHostPool) - p := &epsilonGreedyHostPool{ - HostPool: stdHP, - Locker: stdHP, + ss := &standardSelector{} + s := &epsilonGreedySelector{ + Selector: ss, + Locker: ss, epsilon: float32(initialEpsilon), decayDuration: decayDuration, EpsilonValueCalculator: calc, timer: &realTimer{}, } + return s +} + +func (s *epsilonGreedySelector) Init(hosts []string) { + s.Selector.Init(hosts) // allocate structures - for _, h := range stdHP.hostList { + for _, h := range s.Selector.(*standardSelector).hostList { h.epsilonCounts = make([]int64, epsilonBuckets) h.epsilonValues = make([]int64, epsilonBuckets) } - go p.epsilonGreedyDecay() - return p -} - -func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) { - p.Lock() - defer p.Unlock() - p.epsilon = newEpsilon + go s.epsilonGreedyDecay() } -func (p *epsilonGreedyHostPool) epsilonGreedyDecay() { - durationPerBucket := p.decayDuration / epsilonBuckets +func (s *epsilonGreedySelector) epsilonGreedyDecay() { + durationPerBucket := s.decayDuration / epsilonBuckets ticker := time.Tick(durationPerBucket) for { <-ticker - p.performEpsilonGreedyDecay() + s.performEpsilonGreedyDecay() } } -func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() { - p.Lock() - for _, h := range p.HostPool.(*standardHostPool).hostList { +func (s *epsilonGreedySelector) performEpsilonGreedyDecay() { + s.Lock() + for _, h := range s.Selector.(*standardSelector).hostList { h.epsilonIndex += 1 h.epsilonIndex = h.epsilonIndex % epsilonBuckets h.epsilonCounts[h.epsilonIndex] = 0 h.epsilonValues[h.epsilonIndex] = 0 } - p.Unlock() + s.Unlock() } -func (p *epsilonGreedyHostPool) ChooseNextHost() string { - p.Lock() - host, err := p.getEpsilonGreedy() - p.Unlock() +func (s *epsilonGreedySelector) SelectNextHost() string { + s.Lock() + host, err := s.getEpsilonGreedy() + s.Unlock() if err != nil { - host = p.HostPool.ChooseNextHost() + host = s.Selector.SelectNextHost() } return host } -func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { +func (s *epsilonGreedySelector) getEpsilonGreedy() (string, error) { var hostToUse *hostEntry // this is our exploration phase - if rand.Float32() < p.epsilon { - p.epsilon = p.epsilon * epsilonDecay - if p.epsilon < minEpsilon { - p.epsilon = minEpsilon + if rand.Float32() < s.epsilon { + s.epsilon = s.epsilon * epsilonDecay + if s.epsilon < minEpsilon { + s.epsilon = minEpsilon } return "", errors.New("Exploration") } @@ -121,11 +119,11 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { var possibleHosts []*hostEntry now := time.Now() var sumValues float64 - for _, h := range p.HostPool.(*standardHostPool).hostList { + for _, h := range s.Selector.(*standardSelector).hostList { if h.canTryHost(now) { v := h.getWeightedAverageResponseTime() if v > 0 { - ev := p.CalcValueFromAvgResponseTime(v) + ev := s.CalcValueFromAvgResponseTime(v) h.epsilonValue = ev sumValues += ev possibleHosts = append(possibleHosts, h) @@ -160,32 +158,32 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() (string, error) { return hostToUse.host, nil } -func (p *epsilonGreedyHostPool) recordTiming(eHostR *epsilonHostPoolResponse) { +func (s *epsilonGreedySelector) recordTiming(eHostR *epsilonHostPoolResponse) { host := eHostR.Host() - duration := p.between(eHostR.started, eHostR.ended) + duration := s.between(eHostR.started, eHostR.ended) - p.Lock() - defer p.Unlock() - h, ok := p.HostPool.(*standardHostPool).hosts[host] + s.Lock() + defer s.Unlock() + h, ok := s.Selector.(*standardSelector).hosts[host] if !ok { - log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) + log.Fatalf("host %s not in HostPool", host) } h.epsilonCounts[h.epsilonIndex]++ h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000) } -func (p *epsilonGreedyHostPool) DeliverHostResponse(host string) HostPoolResponse { - resp := p.HostPool.DeliverHostResponse(host) - return p.toEpsilonHostPootResponse(resp) +func (s *epsilonGreedySelector) MakeHostResponse(host string) HostPoolResponse { + resp := s.Selector.MakeHostResponse(host) + return s.toEpsilonHostPoolResponse(resp) } // Convert regular response to one equipped for EG. Doesn't require lock, for now -func (p *epsilonGreedyHostPool) toEpsilonHostPootResponse(resp HostPoolResponse) *epsilonHostPoolResponse { +func (s *epsilonGreedySelector) toEpsilonHostPoolResponse(resp HostPoolResponse) *epsilonHostPoolResponse { started := time.Now() return &epsilonHostPoolResponse{ HostPoolResponse: resp, started: started, - pool: p, + selector: s, } } diff --git a/example_test.go b/example_test.go index f081525..54843be 100644 --- a/example_test.go +++ b/example_test.go @@ -5,8 +5,8 @@ import ( ) func ExampleNewEpsilonGreedy() { - hp := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}) - hostResponse := Get(hp) + hp := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{})) + hostResponse := hp.Get() hostname := hostResponse.Host() err := errors.New("I am your http error from " + hostname) // (make a request with hostname) hostResponse.Mark(err) diff --git a/hostpool.go b/hostpool.go index 7341b64..5e5638f 100644 --- a/hostpool.go +++ b/hostpool.go @@ -4,8 +4,6 @@ package hostpool import ( - "log" - "sync" "time" ) @@ -27,7 +25,7 @@ type HostPoolResponse interface { type standardHostPoolResponse struct { host string - pool *standardHostPool + ss *standardSelector } // --- HostPool structs and interfaces ---- @@ -36,22 +34,14 @@ type standardHostPoolResponse struct { // allow you to Get a HostPoolResponse (which includes a hostname to use), // get the list of all Hosts, and use ResetAll to reset state. type HostPool interface { - // Get() HostPoolResponse - + Get() HostPoolResponse ResetAll() Hosts() []string - - ChooseNextHost() string - DeliverHostResponse(string) HostPoolResponse } type standardHostPool struct { - sync.RWMutex - hosts map[string]*hostEntry - hostList []*hostEntry - initialRetryDelay time.Duration - maxRetryInterval time.Duration - nextHostIndex int + hosts []string + Selector } // ------ constants ------------------- @@ -64,23 +54,15 @@ const defaultDecayDuration = time.Duration(5) * time.Minute // Construct a basic HostPool using the hostnames provided func New(hosts []string) HostPool { - p := &standardHostPool{ - hosts: make(map[string]*hostEntry, len(hosts)), - hostList: make([]*hostEntry, len(hosts)), - initialRetryDelay: time.Duration(30) * time.Second, - maxRetryInterval: time.Duration(900) * time.Second, - } + return NewWithSelector(hosts, &standardSelector{}) +} - for i, h := range hosts { - e := &hostEntry{ - host: h, - retryDelay: p.initialRetryDelay, - } - p.hosts[h] = e - p.hostList[i] = e +func NewWithSelector(hosts []string, s Selector) HostPool { + s.Init(hosts) + return &standardHostPool{ + hosts, + s, } - - return p } func (r *standardHostPoolResponse) Host() string { @@ -88,107 +70,15 @@ func (r *standardHostPoolResponse) Host() string { } func (r *standardHostPoolResponse) Mark(err error) { - if err == nil { - r.pool.markSuccess(r) - } else { - r.pool.markFailed(r) - } + r.ss.MarkHost(r.host, err) } // return an entry from the HostPool -func Get(p HostPool) HostPoolResponse { - host := p.ChooseNextHost() - return p.DeliverHostResponse(host) -} - -func (p *standardHostPool) ChooseNextHost() string { - p.Lock() - host := p.getRoundRobin() - p.Unlock() - return host -} - -func (p *standardHostPool) getRoundRobin() string { - now := time.Now() - hostCount := len(p.hostList) - for i := range p.hostList { - // iterate via sequenece from where we last iterated - currentIndex := (i + p.nextHostIndex) % hostCount - - h := p.hostList[currentIndex] - if h.canTryHost(now) { - p.nextHostIndex = currentIndex + 1 - return h.host - } - } - - // all hosts are down. re-add them - p.doResetAll() - p.nextHostIndex = 0 - return p.hostList[0].host -} - -func (p *standardHostPool) ResetAll() { - p.Lock() - defer p.Unlock() - p.doResetAll() -} - -// this actually performs the logic to reset, -// and should only be called when the lock has -// already been acquired -func (p *standardHostPool) doResetAll() { - for _, h := range p.hosts { - h.dead = false - } -} - -func (p *standardHostPool) markSuccess(hostR *standardHostPoolResponse) { - host := hostR.Host() - p.Lock() - defer p.Unlock() - - h, ok := p.hosts[host] - if !ok { - log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) - } - h.dead = false +func (p *standardHostPool) Get() HostPoolResponse { + host := p.SelectNextHost() + return p.MakeHostResponse(host) } -func (p *standardHostPool) markFailed(hostR *standardHostPoolResponse) { - host := hostR.Host() - p.Lock() - defer p.Unlock() - h, ok := p.hosts[host] - if !ok { - log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) - } - if !h.dead { - h.dead = true - h.retryCount = 0 - h.retryDelay = p.initialRetryDelay - h.nextRetry = time.Now().Add(h.retryDelay) - } - -} func (p *standardHostPool) Hosts() []string { - hosts := make([]string, len(p.hosts)) - for host, _ := range p.hosts { - hosts = append(hosts, host) - } - return hosts -} - -func (p *standardHostPool) DeliverHostResponse(host string) HostPoolResponse { - p.Lock() - defer p.Unlock() - h, ok := p.hosts[host] - if !ok { - log.Fatalf("host %s not in HostPool %v", host, p.Hosts()) - } - now := time.Now() - if h.dead && h.nextRetry.Before(now) { - h.willRetryHost(p.maxRetryInterval) - } - return &standardHostPoolResponse{host: host, pool: p} + return p.hosts } diff --git a/hostpool_test.go b/hostpool_test.go index ba380ae..f51d04b 100644 --- a/hostpool_test.go +++ b/hostpool_test.go @@ -18,33 +18,33 @@ func TestHostPool(t *testing.T) { dummyErr := errors.New("Dummy Error") p := New([]string{"a", "b", "c"}).(*standardHostPool) - assert.Equal(t, Get(p).Host(), "a") - assert.Equal(t, Get(p).Host(), "b") - assert.Equal(t, Get(p).Host(), "c") - respA := Get(p) + assert.Equal(t, p.Get().Host(), "a") + assert.Equal(t, p.Get().Host(), "b") + assert.Equal(t, p.Get().Host(), "c") + respA := p.Get() assert.Equal(t, respA.Host(), "a") respA.Mark(dummyErr) - respB := Get(p) + respB := p.Get() respB.Mark(dummyErr) - respC := Get(p) + respC := p.Get() assert.Equal(t, respC.Host(), "c") respC.Mark(nil) // get again, and verify that it's still c - assert.Equal(t, Get(p).Host(), "c") - assert.Equal(t, Get(p).Host(), "c") // would be b if it were not dead + assert.Equal(t, p.Get().Host(), "c") + assert.Equal(t, p.Get().Host(), "c") // would be b if it were not dead // now restore a - respA = &standardHostPoolResponse{host: "a", pool: p} + respA = &standardHostPoolResponse{host: "a", ss: p.Selector.(*standardSelector)} respA.Mark(nil) - assert.Equal(t, Get(p).Host(), "a") - assert.Equal(t, Get(p).Host(), "c") + assert.Equal(t, p.Get().Host(), "a") + assert.Equal(t, p.Get().Host(), "c") // ensure that we get *something* back when all hosts fail for _, host := range []string{"a", "b", "c"} { - response := &standardHostPoolResponse{host: host, pool: p} + response := &standardHostPoolResponse{host: host, ss: p.Selector.(*standardSelector)} response.Mark(dummyErr) } - resp := Get(p) + resp := p.Get() assert.NotEqual(t, resp, nil) } @@ -57,13 +57,13 @@ func (t *mockTimer) between(start time.Time, end time.Time) time.Duration { } func TestEpsilonGreedy(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) + // log.SetOutput(ioutil.Discard) + // defer log.SetOutput(os.Stdout) rand.Seed(10) iterations := 12000 - p := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}).(*epsilonGreedyHostPool) + p := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{})).(*standardHostPool) timings := make(map[string]int64) timings["a"] = 200 @@ -77,13 +77,13 @@ func TestEpsilonGreedy(t *testing.T) { for i := 0; i < iterations; i += 1 { if i != 0 && i%100 == 0 { - p.performEpsilonGreedyDecay() + p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay() } - hostR := Get(p) + hostR := p.Get() host := hostR.Host() hitCounts[host]++ timing := timings[host] - p.timer = &mockTimer{t: int(timing)} + p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timing)} hostR.Mark(nil) } @@ -101,13 +101,13 @@ func TestEpsilonGreedy(t *testing.T) { for i := 0; i < iterations; i += 1 { if i != 0 && i%100 == 0 { - p.performEpsilonGreedyDecay() + p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay() } - hostR := Get(p) + hostR := p.Get() host := hostR.Host() hitCounts[host]++ timing := timings[host] - p.timer = &mockTimer{t: int(timing)} + p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timing)} hostR.Mark(nil) } @@ -129,15 +129,15 @@ func BenchmarkEpsilonGreedy(b *testing.B) { } // Make the hostpool with a few hosts - p := NewEpsilonGreedy([]string{"a", "b"}, 0, &LinearEpsilonValueCalculator{}).(*epsilonGreedyHostPool) + p := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{})).(*standardHostPool) b.StartTimer() for i := 0; i < b.N; i++ { if i != 0 && i%100 == 0 { - p.performEpsilonGreedyDecay() + p.Selector.(*epsilonGreedySelector).performEpsilonGreedyDecay() } - hostR := Get(p) - p.timer = &mockTimer{t: int(timings[i])} + hostR := p.Get() + p.Selector.(*epsilonGreedySelector).timer = &mockTimer{t: int(timings[i])} hostR.Mark(nil) } } diff --git a/selector.go b/selector.go new file mode 100644 index 0000000..ad22b3e --- /dev/null +++ b/selector.go @@ -0,0 +1,118 @@ +package hostpool + +import ( + "log" + "sync" + "time" +) + +type Selector interface { + Init([]string) + SelectNextHost() string + MakeHostResponse(string) HostPoolResponse + MarkHost(string, error) + ResetAll() +} + +type standardSelector struct { + sync.RWMutex + hosts map[string]*hostEntry + hostList []*hostEntry + initialRetryDelay time.Duration + maxRetryInterval time.Duration + nextHostIndex int +} + +func (s *standardSelector) Init(hosts []string) { + s.hosts = make(map[string]*hostEntry, len(hosts)) + s.hostList = make([]*hostEntry, len(hosts)) + s.initialRetryDelay = time.Duration(30) * time.Second + s.maxRetryInterval = time.Duration(900) * time.Second + + for i, h := range hosts { + e := &hostEntry{ + host: h, + retryDelay: s.initialRetryDelay, + } + s.hosts[h] = e + s.hostList[i] = e + } +} + +func (s *standardSelector) SelectNextHost() string { + s.Lock() + host := s.getRoundRobin() + s.Unlock() + return host +} + +func (s *standardSelector) getRoundRobin() string { + now := time.Now() + hostCount := len(s.hostList) + for i := range s.hostList { + // iterate via sequenece from where we last iterated + currentIndex := (i + s.nextHostIndex) % hostCount + + h := s.hostList[currentIndex] + if h.canTryHost(now) { + s.nextHostIndex = currentIndex + 1 + return h.host + } + } + + // all hosts are down. re-add them + s.doResetAll() + s.nextHostIndex = 0 + return s.hostList[0].host +} + +func (s *standardSelector) MakeHostResponse(host string) HostPoolResponse { + s.Lock() + defer s.Unlock() + h, ok := s.hosts[host] + if !ok { + log.Fatalf("host %s not in HostPool", host) + } + now := time.Now() + if h.dead && h.nextRetry.Before(now) { + h.willRetryHost(s.maxRetryInterval) + } + return &standardHostPoolResponse{host: host, ss: s} +} + +func (s *standardSelector) MarkHost(host string, err error) { + s.Lock() + defer s.Unlock() + + h, ok := s.hosts[host] + if !ok { + log.Fatalf("host %s not in HostPool", host) + } + if err == nil { + // success - mark host alive + h.dead = false + } else { + // failure - mark host dead + if !h.dead { + h.dead = true + h.retryCount = 0 + h.retryDelay = s.initialRetryDelay + h.nextRetry = time.Now().Add(h.retryDelay) + } + } +} + +func (s *standardSelector) ResetAll() { + s.Lock() + defer s.Unlock() + s.doResetAll() +} + +// this actually performs the logic to reset, +// and should only be called when the lock has +// already been acquired +func (s *standardSelector) doResetAll() { + for _, h := range s.hosts { + h.dead = false + } +}