Skip to content

Commit b22774a

Browse files
authored
Create naive isolation group matching loadbalancer (#6570)
This is an intentionally naive implementation of a load balancer that assigns isolation groups to specific partitions and routes pollers/tasks to those partitions based on their isolation group. Any time there are multiple options one is picked randomly. The goal of this implementation is to benchmark how significantly isolation-group-based routing improves task latencies and isolation group containment for non-extremely-skewed scenarios. It additionally provides a baseline to see how much a more sophisticated solution (storing the isolation group assignment with the partitions and dynamically rebalancing them) might improve these metrics. This isn't intended to be used in production as-is.
1 parent f1102ac commit b22774a

16 files changed

+841
-297
lines changed

client/clientfactory.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type (
7272
metricsClient metrics.Client
7373
dynConfig *dynamicconfig.Collection
7474
numberOfHistoryShards int
75+
allIsolationGroups func() []string
7576
logger log.Logger
7677
}
7778
)
@@ -83,6 +84,7 @@ func NewRPCClientFactory(
8384
metricsClient metrics.Client,
8485
dc *dynamicconfig.Collection,
8586
numberOfHistoryShards int,
87+
allIsolationGroups func() []string,
8688
logger log.Logger,
8789
) Factory {
8890
return &rpcClientFactory{
@@ -91,6 +93,7 @@ func NewRPCClientFactory(
9193
metricsClient: metricsClient,
9294
dynConfig: dc,
9395
numberOfHistoryShards: numberOfHistoryShards,
96+
allIsolationGroups: allIsolationGroups,
9497
logger: logger,
9598
}
9699
}
@@ -155,10 +158,12 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
155158
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
156159
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
157160
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
161+
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, cf.allIsolationGroups)
158162
loadBalancers := map[string]matching.LoadBalancer{
159163
"random": defaultLoadBalancer,
160164
"round-robin": roundRobinLoadBalancer,
161165
"weighted": weightedLoadBalancer,
166+
"isolation": igLoadBalancer,
162167
}
163168
client := matching.NewClient(
164169
rawClient,

client/matching/client.go

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ func (c *clientImpl) AddActivityTask(
6060
opts ...yarpc.CallOption,
6161
) (*types.AddActivityTaskResponse, error) {
6262
partition := c.loadBalancer.PickWritePartition(
63-
request.GetDomainUUID(),
64-
*request.GetTaskList(),
6563
persistence.TaskListTypeActivity,
66-
request.GetForwardedFrom(),
64+
request,
6765
)
6866
originalTaskListName := request.TaskList.GetName()
6967
request.TaskList.Name = partition
@@ -91,10 +89,8 @@ func (c *clientImpl) AddDecisionTask(
9189
opts ...yarpc.CallOption,
9290
) (*types.AddDecisionTaskResponse, error) {
9391
partition := c.loadBalancer.PickWritePartition(
94-
request.GetDomainUUID(),
95-
*request.GetTaskList(),
9692
persistence.TaskListTypeDecision,
97-
request.GetForwardedFrom(),
93+
request,
9894
)
9995
originalTaskListName := request.TaskList.GetName()
10096
request.TaskList.Name = partition
@@ -122,10 +118,9 @@ func (c *clientImpl) PollForActivityTask(
122118
opts ...yarpc.CallOption,
123119
) (*types.MatchingPollForActivityTaskResponse, error) {
124120
partition := c.loadBalancer.PickReadPartition(
125-
request.GetDomainUUID(),
126-
*request.PollRequest.GetTaskList(),
127121
persistence.TaskListTypeActivity,
128-
request.GetForwardedFrom(),
122+
request,
123+
request.GetIsolationGroup(),
129124
)
130125
originalTaskListName := request.PollRequest.GetTaskList().GetName()
131126
request.PollRequest.TaskList.Name = partition
@@ -145,10 +140,8 @@ func (c *clientImpl) PollForActivityTask(
145140
resp.PartitionConfig,
146141
)
147142
c.loadBalancer.UpdateWeight(
148-
request.GetDomainUUID(),
149-
*request.PollRequest.GetTaskList(),
150143
persistence.TaskListTypeActivity,
151-
request.GetForwardedFrom(),
144+
request,
152145
partition,
153146
resp.LoadBalancerHints,
154147
)
@@ -163,10 +156,9 @@ func (c *clientImpl) PollForDecisionTask(
163156
opts ...yarpc.CallOption,
164157
) (*types.MatchingPollForDecisionTaskResponse, error) {
165158
partition := c.loadBalancer.PickReadPartition(
166-
request.GetDomainUUID(),
167-
*request.PollRequest.GetTaskList(),
168159
persistence.TaskListTypeDecision,
169-
request.GetForwardedFrom(),
160+
request,
161+
request.GetIsolationGroup(),
170162
)
171163
originalTaskListName := request.PollRequest.GetTaskList().GetName()
172164
request.PollRequest.TaskList.Name = partition
@@ -186,10 +178,8 @@ func (c *clientImpl) PollForDecisionTask(
186178
resp.PartitionConfig,
187179
)
188180
c.loadBalancer.UpdateWeight(
189-
request.GetDomainUUID(),
190-
*request.PollRequest.GetTaskList(),
191181
persistence.TaskListTypeDecision,
192-
request.GetForwardedFrom(),
182+
request,
193183
partition,
194184
resp.LoadBalancerHints,
195185
)
@@ -204,10 +194,9 @@ func (c *clientImpl) QueryWorkflow(
204194
opts ...yarpc.CallOption,
205195
) (*types.QueryWorkflowResponse, error) {
206196
partition := c.loadBalancer.PickReadPartition(
207-
request.GetDomainUUID(),
208-
*request.GetTaskList(),
209197
persistence.TaskListTypeDecision,
210-
request.GetForwardedFrom(),
198+
request,
199+
"",
211200
)
212201
request.TaskList.Name = partition
213202
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())

client/matching/client_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func TestClient_withResponse(t *testing.T) {
160160
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
161161
},
162162
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
163-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
163+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
164164
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
165165
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddActivityTaskResponse{}, nil)
166166
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
@@ -173,7 +173,7 @@ func TestClient_withResponse(t *testing.T) {
173173
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
174174
},
175175
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
176-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
176+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
177177
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
178178
},
179179
wantError: true,
@@ -184,7 +184,7 @@ func TestClient_withResponse(t *testing.T) {
184184
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
185185
},
186186
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
187-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
187+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
188188
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
189189
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
190190
},
@@ -196,7 +196,7 @@ func TestClient_withResponse(t *testing.T) {
196196
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
197197
},
198198
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
199-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
199+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
200200
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
201201
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddDecisionTaskResponse{}, nil)
202202
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
@@ -209,7 +209,7 @@ func TestClient_withResponse(t *testing.T) {
209209
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
210210
},
211211
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
212-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
212+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
213213
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
214214
},
215215
wantError: true,
@@ -220,7 +220,7 @@ func TestClient_withResponse(t *testing.T) {
220220
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
221221
},
222222
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
223-
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
223+
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
224224
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
225225
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
226226
},
@@ -232,11 +232,11 @@ func TestClient_withResponse(t *testing.T) {
232232
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
233233
},
234234
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
235-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
235+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
236236
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
237237
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil)
238238
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
239-
balancer.EXPECT().UpdateWeight(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "", _testPartition, nil)
239+
balancer.EXPECT().UpdateWeight(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), _testPartition, nil)
240240
},
241241
want: &types.MatchingPollForActivityTaskResponse{},
242242
},
@@ -246,7 +246,7 @@ func TestClient_withResponse(t *testing.T) {
246246
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
247247
},
248248
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
249-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
249+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
250250
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
251251
},
252252
want: nil,
@@ -258,7 +258,7 @@ func TestClient_withResponse(t *testing.T) {
258258
return c.PollForActivityTask(context.Background(), testMatchingPollForActivityTaskRequest())
259259
},
260260
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
261-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
261+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeActivity, testMatchingPollForActivityTaskRequest(), "").Return(_testPartition)
262262
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
263263
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
264264
},
@@ -271,11 +271,11 @@ func TestClient_withResponse(t *testing.T) {
271271
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
272272
},
273273
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
274-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
274+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
275275
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
276276
c.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForDecisionTaskResponse{}, nil)
277277
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
278-
balancer.EXPECT().UpdateWeight(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "", _testPartition, nil)
278+
balancer.EXPECT().UpdateWeight(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), _testPartition, nil)
279279
},
280280
want: &types.MatchingPollForDecisionTaskResponse{},
281281
},
@@ -285,7 +285,7 @@ func TestClient_withResponse(t *testing.T) {
285285
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
286286
},
287287
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
288-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
288+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
289289
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
290290
},
291291
want: nil,
@@ -297,7 +297,7 @@ func TestClient_withResponse(t *testing.T) {
297297
return c.PollForDecisionTask(context.Background(), testMatchingPollForDecisionTaskRequest())
298298
},
299299
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
300-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
300+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingPollForDecisionTaskRequest(), "").Return(_testPartition)
301301
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
302302
c.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
303303
},
@@ -310,7 +310,7 @@ func TestClient_withResponse(t *testing.T) {
310310
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
311311
},
312312
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
313-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
313+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
314314
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
315315
c.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.QueryWorkflowResponse{}, nil)
316316
},
@@ -322,7 +322,7 @@ func TestClient_withResponse(t *testing.T) {
322322
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
323323
},
324324
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
325-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
325+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
326326
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
327327
},
328328
want: nil,
@@ -334,7 +334,7 @@ func TestClient_withResponse(t *testing.T) {
334334
return c.QueryWorkflow(context.Background(), testMatchingQueryWorkflowRequest())
335335
},
336336
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
337-
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
337+
balancer.EXPECT().PickReadPartition(persistence.TaskListTypeDecision, testMatchingQueryWorkflowRequest(), "").Return(_testPartition)
338338
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
339339
c.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
340340
},

0 commit comments

Comments
 (0)