Skip to content

Commit 50bf249

Browse files
committed
*: fix the bug loadbalance does not take effect in streaming fetch statements #687
[summary] loadbalance = 1, select /*+ streaming */ need load balance to the replica node. [test case] src/backend/txn_test.go [patch codecov] src/proxy/execute.go 87.8%
1 parent b6006f3 commit 50bf249

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

src/backend/txn_test.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,13 +284,13 @@ func TestTxnExecuteReplicaError(t *testing.T) {
284284
func TestTxnExecuteStreamFetch(t *testing.T) {
285285
defer leaktest.Check(t)()
286286
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
287-
fakedb, txnMgr, backends, addrs, cleanup := MockTxnMgr(log, 2)
287+
fakedb, txnMgr, backends, addrs, cleanup := MockTxnMgrWithReplica(log, 2)
288288
defer cleanup()
289289

290290
querys := []xcontext.QueryTuple{
291-
xcontext.QueryTuple{Query: "select * from node1", Backend: addrs[0]},
292-
xcontext.QueryTuple{Query: "select * from node2", Backend: addrs[1]},
293-
xcontext.QueryTuple{Query: "select * from node3", Backend: addrs[1]},
291+
{Query: "select * from node1", Backend: addrs[0]},
292+
{Query: "select * from node2", Backend: addrs[1]},
293+
{Query: "select * from node3", Backend: addrs[1]},
294294
}
295295

296296
result11 := &sqltypes.Result{
@@ -360,6 +360,33 @@ func TestTxnExecuteStreamFetch(t *testing.T) {
360360
assert.Equal(t, want, got)
361361
}
362362

363+
// loadbalance=1.
364+
{
365+
fakedb.AddQueryStream(querys[0].Query, result11)
366+
fakedb.AddQueryStream(querys[1].Query, result12)
367+
fakedb.AddQueryStream(querys[2].Query, result12)
368+
369+
txn, err := txnMgr.CreateTxn(backends)
370+
assert.Nil(t, err)
371+
defer txn.Finish()
372+
txn.SetIsExecOnRep(true)
373+
374+
rctx := &xcontext.RequestContext{
375+
Querys: querys,
376+
}
377+
378+
callbackQr := &sqltypes.Result{}
379+
err = txn.ExecuteStreamFetch(rctx, func(qr *sqltypes.Result) error {
380+
callbackQr.AppendResult(qr)
381+
return nil
382+
}, 1024*1024)
383+
assert.Nil(t, err)
384+
385+
want := len(result11.Rows) + 2*len(result12.Rows)
386+
got := len(callbackQr.Rows)
387+
assert.Equal(t, want, got)
388+
}
389+
363390
// execute error.
364391
{
365392
fakedb.AddQueryError(querys[0].Query, errors.New("mock.stream.query.error"))

src/proxy/execute.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func (spanner *Spanner) executeWithTimeout(session *driver.Session, database str
166166
// ExecuteStreamFetch used to execute a stream fetch query.
167167
func (spanner *Spanner) ExecuteStreamFetch(session *driver.Session, database string, query string, node sqlparser.Statement, callback func(qr *sqltypes.Result) error) error {
168168
log := spanner.log
169+
conf := spanner.conf
169170
router := spanner.router
170171
scatter := spanner.scatter
171172
sessions := spanner.sessions
@@ -178,6 +179,8 @@ func (spanner *Spanner) ExecuteStreamFetch(session *driver.Session, database str
178179
}
179180
defer txn.Finish()
180181

182+
txn.SetIsExecOnRep(conf.Proxy.LoadBalance != 0)
183+
181184
// binding.
182185
sessions.TxnBinding(session, txn, node, query)
183186
defer sessions.TxnUnBinding(session)

0 commit comments

Comments
 (0)