Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 72 additions & 66 deletions epsilon_greedy.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
package hostpool

import (
"errors"
"log"
"math/rand"
"sync"
"time"
)

type epsilonHostPoolResponse struct {
standardHostPoolResponse
started time.Time
ended time.Time
HostPoolResponse
started time.Time
ended time.Time
selector *epsilonGreedySelector
}

func (r *epsilonHostPoolResponse) Mark(err error) {
r.Do(func() {
if err == nil {
r.ended = time.Now()
doMark(err, r)
})

r.selector.recordTiming(r)
}
r.HostPoolResponse.Mark(err)
}

type epsilonGreedyHostPool struct {
standardHostPool // TODO - would be nifty if we could embed HostPool and Locker interfaces
type epsilonGreedySelector struct {
Selector
sync.Locker
epsilon float32 // this is our exploration factor
decayDuration time.Duration
EpsilonValueCalculator // embed the epsilonValueCalculator
timer
}

// Construct an Epsilon Greedy HostPool
// Construct an Epsilon Greedy Selector
//
// Epsilon Greedy is an algorithm that allows HostPool not only to track failure state,
// but also to learn about "better" options in terms of speed, and to pick from available hosts
Expand All @@ -42,86 +46,84 @@ type epsilonGreedyHostPool struct {
// To compute the weighting scores, we perform a weighted average of recent response times, over the course of
// `decayDuration`. decayDuration may be set to 0 to use the default value of 5 minutes
// We then use the supplied EpsilonValueCalculator to calculate a score from that weighted average response time.
func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
func NewEpsilonGreedy(decayDuration time.Duration, calc EpsilonValueCalculator) Selector {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/NewEpsilonGreedy/NewEpsilonGreedySelector


if decayDuration <= 0 {
decayDuration = defaultDecayDuration
}
stdHP := New(hosts).(*standardHostPool)
p := &epsilonGreedyHostPool{
standardHostPool: *stdHP,
ss := &standardSelector{}
s := &epsilonGreedySelector{
Selector: ss,
Locker: ss,
epsilon: float32(initialEpsilon),
decayDuration: decayDuration,
EpsilonValueCalculator: calc,
timer: &realTimer{},
}

return s
}

func (s *epsilonGreedySelector) Init(hosts []string) {
s.Selector.Init(hosts)
// allocate structures
for _, h := range p.hostList {
for _, h := range s.Selector.(*standardSelector).hostList {
h.epsilonCounts = make([]int64, epsilonBuckets)
h.epsilonValues = make([]int64, epsilonBuckets)
}
go p.epsilonGreedyDecay()
return p
}

func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
p.Lock()
defer p.Unlock()
p.epsilon = newEpsilon
go s.epsilonGreedyDecay()
}

func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
durationPerBucket := p.decayDuration / epsilonBuckets
func (s *epsilonGreedySelector) epsilonGreedyDecay() {
durationPerBucket := s.decayDuration / epsilonBuckets
ticker := time.Tick(durationPerBucket)
for {
<-ticker
p.performEpsilonGreedyDecay()
s.performEpsilonGreedyDecay()
}
}
func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
p.Lock()
for _, h := range p.hostList {
func (s *epsilonGreedySelector) performEpsilonGreedyDecay() {
s.Lock()
for _, h := range s.Selector.(*standardSelector).hostList {
h.epsilonIndex += 1
h.epsilonIndex = h.epsilonIndex % epsilonBuckets
h.epsilonCounts[h.epsilonIndex] = 0
h.epsilonValues[h.epsilonIndex] = 0
}
p.Unlock()
s.Unlock()
}

func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
p.Lock()
defer p.Unlock()
host := p.getEpsilonGreedy()
started := time.Now()
return &epsilonHostPoolResponse{
standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
started: started,
func (s *epsilonGreedySelector) SelectNextHost() string {
s.Lock()
host, err := s.getEpsilonGreedy()
s.Unlock()
if err != nil {
host = s.Selector.SelectNextHost()
}
return host
}

func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
func (s *epsilonGreedySelector) getEpsilonGreedy() (string, error) {
var hostToUse *hostEntry

// this is our exploration phase
if rand.Float32() < p.epsilon {
p.epsilon = p.epsilon * epsilonDecay
if p.epsilon < minEpsilon {
p.epsilon = minEpsilon
if rand.Float32() < s.epsilon {
s.epsilon = s.epsilon * epsilonDecay
if s.epsilon < minEpsilon {
s.epsilon = minEpsilon
}
return p.getRoundRobin()
return "", errors.New("Exploration")
}

// calculate values for each host in the 0..1 range (but not ormalized)
var possibleHosts []*hostEntry
now := time.Now()
var sumValues float64
for _, h := range p.hostList {
for _, h := range s.Selector.(*standardSelector).hostList {
if h.canTryHost(now) {
v := h.getWeightedAverageResponseTime()
if v > 0 {
ev := p.CalcValueFromAvgResponseTime(v)
ev := s.CalcValueFromAvgResponseTime(v)
h.epsilonValue = ev
sumValues += ev
possibleHosts = append(possibleHosts, h)
Expand Down Expand Up @@ -151,36 +153,40 @@ func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
if len(possibleHosts) != 0 {
log.Println("Failed to randomly choose a host, Dan loses")
}
return p.getRoundRobin()
}

if hostToUse.dead {
hostToUse.willRetryHost(p.maxRetryInterval)
return "", errors.New("No host chosen")
}
return hostToUse.host
return hostToUse.host, nil
}

func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
p.standardHostPool.markSuccess(hostR)
eHostR, ok := hostR.(*epsilonHostPoolResponse)
if !ok {
log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
return
}
host := eHostR.host
duration := p.between(eHostR.started, eHostR.ended)
func (s *epsilonGreedySelector) recordTiming(eHostR *epsilonHostPoolResponse) {
host := eHostR.Host()
duration := s.between(eHostR.started, eHostR.ended)

p.Lock()
defer p.Unlock()
h, ok := p.hosts[host]
s.Lock()
defer s.Unlock()
h, ok := s.Selector.(*standardSelector).hosts[host]
if !ok {
log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
log.Fatalf("host %s not in HostPool", host)
}
h.epsilonCounts[h.epsilonIndex]++
h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
}

func (s *epsilonGreedySelector) MakeHostResponse(host string) HostPoolResponse {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there's any value in maintaining a separate MakeHostResponse, having the function is fine, but making it part of the interface is unnecessary and can be hidden as an implementation detail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree on this point. I really want it to be possible to create a HostPool with multiple Selectors composed together. To do that, I don't think we can rely on hidden implementation details of the Selectors (among other things, extensions shouldn't need to be in the hostpool package). I haven't quite gotten to that point in this pull req, due to some stuff with the implementation of hostEntry that I've been deliberately leaving to a subsequent pull req to keep things from getting too complicated.

MakeHostResponse does the work necessary to update state based on the chosen host, I do think it's a necessary inclusion in the interface. Note here that we call out to the embedded Selector's MakeHostResponse, without privileged access to its implementation. That's the pattern I'd like to encourage for other Selectors going forward

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have been more specific. I wasn't suggesting that you rely on or even have access to internals...

It seems like SelectNextHost is the only public interface that a Selector needs to implement, it just needs to be changed to return a HostPoolResponse interface... that method's internal implementation doesn't seem relevant.

But, I'm not quite sure where you're going with selector composition, so please elaborate if I'm just missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, that definitely sounds more straightforward, and is closer to what I originally wanted to do. The problem I was running into was that we typically want to update some state when we select which host to try. This may be on the HostPool/Selector side (state around retrying a host previously marked dead) or on the HostPoolResponse side (storing the time that the request was initiated for timing purposes). MakeHostResponse was a poorly named way to shoehorn all those state updates together.

All of this is further muddied by the fact that I'm still just working with one hostEntry implementation. I'll try to continue thinking about this on a higher level in terms of what state needs to kept and when it needs to be accessed, maybe I'll come up with something simpler.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the same page w/ different selectors needing to maintain separate state, I just don't think that requires two public interface methods, right?

I agree with your assessment that hostEntry has too much responsibility and should be specific to each selector, ideally.

resp := s.Selector.MakeHostResponse(host)
return s.toEpsilonHostPoolResponse(resp)
}

// Convert regular response to one equipped for EG. Doesn't require lock, for now
func (s *epsilonGreedySelector) toEpsilonHostPoolResponse(resp HostPoolResponse) *epsilonHostPoolResponse {
started := time.Now()
return &epsilonHostPoolResponse{
HostPoolResponse: resp,
started: started,
selector: s,
}
}

// --- timer: this just exists for testing

type timer interface {
Expand Down
6 changes: 3 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package hostpool

import (
"github.com/bitly/go-hostpool"
"errors"
)

func ExampleNewEpsilonGreedy() {
hp := hostpool.NewEpsilonGreedy([]string{"a", "b"}, 0, &hostpool.LinearEpsilonValueCalculator{})
hp := NewWithSelector([]string{"a", "b"}, NewEpsilonGreedy(0, &LinearEpsilonValueCalculator{}))
hostResponse := hp.Get()
hostname := hostResponse.Host()
err := nil // (make a request with hostname)
err := errors.New("I am your http error from " + hostname) // (make a request with hostname)
hostResponse.Mark(err)
}
Loading