Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet *
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
}, sql, bindvars, int64(txID), int64(reservedID))
}, sql, bindvars, int64(txID), int64(reservedID), opts)
}

// ExecOnVTGate executes a query on a local cluster VTGate with the provided
Expand Down
27 changes: 18 additions & 9 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,10 @@ func (itc *internalTabletConn) Execute(
query string,
bindVars map[string]*querypb.BindVariable,
transactionID, reservedID int64,
options *querypb.ExecuteOptions,
) (*sqltypes.Result, error) {
bindVars = sqltypes.CopyBindVariables(bindVars)
reply, err := itc.tablet.qsc.QueryService().Execute(ctx, session, target, query, bindVars, transactionID, reservedID)
reply, err := itc.tablet.qsc.QueryService().Execute(ctx, session, target, query, bindVars, transactionID, reservedID, options)
if err != nil {
return nil, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}
Expand All @@ -484,10 +485,11 @@ func (itc *internalTabletConn) StreamExecute(
bindVars map[string]*querypb.BindVariable,
transactionID int64,
reservedID int64,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) error {
bindVars = sqltypes.CopyBindVariables(bindVars)
err := itc.tablet.qsc.QueryService().StreamExecute(ctx, session, target, query, bindVars, transactionID, reservedID, callback)
err := itc.tablet.qsc.QueryService().StreamExecute(ctx, session, target, query, bindVars, transactionID, reservedID, options, callback)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand All @@ -496,8 +498,9 @@ func (itc *internalTabletConn) Begin(
ctx context.Context,
session queryservice.Session,
target *querypb.Target,
options *querypb.ExecuteOptions,
) (queryservice.TransactionState, error) {
state, err := itc.tablet.qsc.QueryService().Begin(ctx, session, target)
state, err := itc.tablet.qsc.QueryService().Begin(ctx, session, target, options)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down Expand Up @@ -581,9 +584,10 @@ func (itc *internalTabletConn) BeginExecute(
query string,
bindVars map[string]*querypb.BindVariable,
reserveID int64,
options *querypb.ExecuteOptions,
) (queryservice.TransactionState, *sqltypes.Result, error) {
bindVars = sqltypes.CopyBindVariables(bindVars)
state, result, err := itc.tablet.qsc.QueryService().BeginExecute(ctx, session, target, preQueries, query, bindVars, reserveID)
state, result, err := itc.tablet.qsc.QueryService().BeginExecute(ctx, session, target, preQueries, query, bindVars, reserveID, options)
return state, result, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand All @@ -596,10 +600,11 @@ func (itc *internalTabletConn) BeginStreamExecute(
query string,
bindVars map[string]*querypb.BindVariable,
reservedID int64,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (queryservice.TransactionState, error) {
bindVars = sqltypes.CopyBindVariables(bindVars)
state, err := itc.tablet.qsc.QueryService().BeginStreamExecute(ctx, session, target, preQueries, query, bindVars, reservedID, callback)
state, err := itc.tablet.qsc.QueryService().BeginStreamExecute(ctx, session, target, preQueries, query, bindVars, reservedID, options, callback)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down Expand Up @@ -628,9 +633,10 @@ func (itc *internalTabletConn) ReserveBeginExecute(
postBeginQueries []string,
sql string,
bindVariables map[string]*querypb.BindVariable,
options *querypb.ExecuteOptions,
) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
state, result, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, session, target, preQueries, postBeginQueries, sql, bindVariables)
state, result, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, session, target, preQueries, postBeginQueries, sql, bindVariables, options)
return state, result, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand All @@ -643,10 +649,11 @@ func (itc *internalTabletConn) ReserveBeginStreamExecute(
postBeginQueries []string,
sql string,
bindVariables map[string]*querypb.BindVariable,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (queryservice.ReservedTransactionState, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
state, err := itc.tablet.qsc.QueryService().ReserveBeginStreamExecute(ctx, session, target, preQueries, postBeginQueries, sql, bindVariables, callback)
state, err := itc.tablet.qsc.QueryService().ReserveBeginStreamExecute(ctx, session, target, preQueries, postBeginQueries, sql, bindVariables, options, callback)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand All @@ -659,9 +666,10 @@ func (itc *internalTabletConn) ReserveExecute(
sql string,
bindVariables map[string]*querypb.BindVariable,
transactionID int64,
options *querypb.ExecuteOptions,
) (queryservice.ReservedState, *sqltypes.Result, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
state, result, err := itc.tablet.qsc.QueryService().ReserveExecute(ctx, session, target, preQueries, sql, bindVariables, transactionID)
state, result, err := itc.tablet.qsc.QueryService().ReserveExecute(ctx, session, target, preQueries, sql, bindVariables, transactionID, options)
return state, result, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand All @@ -674,10 +682,11 @@ func (itc *internalTabletConn) ReserveStreamExecute(
sql string,
bindVariables map[string]*querypb.BindVariable,
transactionID int64,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (queryservice.ReservedState, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
state, err := itc.tablet.qsc.QueryService().ReserveStreamExecute(ctx, session, target, preQueries, sql, bindVariables, transactionID, callback)
state, err := itc.tablet.qsc.QueryService().ReserveStreamExecute(ctx, session, target, preQueries, sql, bindVariables, transactionID, options, callback)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opt
var _ queryservice.QueryService = (*explainTablet)(nil) // compile-time interface check

// Begin is part of the QueryService interface.
func (t *explainTablet) Begin(ctx context.Context, session queryservice.Session, target *querypb.Target) (queryservice.TransactionState, error) {
func (t *explainTablet) Begin(ctx context.Context, session queryservice.Session, target *querypb.Target, options *querypb.ExecuteOptions) (queryservice.TransactionState, error) {
t.mu.Lock()
t.currentTime = t.vte.batchTime.Wait()
t.tabletQueries = append(t.tabletQueries, &TabletQuery{
Expand All @@ -165,7 +165,7 @@ func (t *explainTablet) Begin(ctx context.Context, session queryservice.Session,

t.mu.Unlock()

return t.tsv.Begin(ctx, session, target)
return t.tsv.Begin(ctx, session, target, options)
}

// Commit is part of the QueryService interface.
Expand All @@ -190,7 +190,7 @@ func (t *explainTablet) Rollback(ctx context.Context, target *querypb.Target, tr
}

// Execute is part of the QueryService interface.
func (t *explainTablet) Execute(ctx context.Context, session queryservice.Session, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64) (*sqltypes.Result, error) {
func (t *explainTablet) Execute(ctx context.Context, session queryservice.Session, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error) {
t.mu.Lock()
t.currentTime = t.vte.batchTime.Wait()

Expand All @@ -204,7 +204,7 @@ func (t *explainTablet) Execute(ctx context.Context, session queryservice.Sessio
})
t.mu.Unlock()

return t.tsv.Execute(ctx, session, target, sql, bindVariables, transactionID, reservedID)
return t.tsv.Execute(ctx, session, target, sql, bindVariables, transactionID, reservedID, options)
}

// Prepare is part of the QueryService interface.
Expand Down Expand Up @@ -264,7 +264,7 @@ func (t *explainTablet) ReadTransaction(ctx context.Context, target *querypb.Tar
}

// BeginExecute is part of the QueryService interface.
func (t *explainTablet) BeginExecute(ctx context.Context, session queryservice.Session, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64) (queryservice.TransactionState, *sqltypes.Result, error) {
func (t *explainTablet) BeginExecute(ctx context.Context, session queryservice.Session, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {
t.mu.Lock()
t.currentTime = t.vte.batchTime.Wait()
bindVariables = sqltypes.CopyBindVariables(bindVariables)
Expand All @@ -275,7 +275,7 @@ func (t *explainTablet) BeginExecute(ctx context.Context, session queryservice.S
})
t.mu.Unlock()

return t.tsv.BeginExecute(ctx, session, target, preQueries, sql, bindVariables, reservedID)
return t.tsv.BeginExecute(ctx, session, target, preQueries, sql, bindVariables, reservedID, options)
}

// Close is part of the QueryService interface.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ func (e *Executor) ShowVitessReplicationStatus(ctx context.Context, filter *sqlp
replicaSQLRunningField = "Slave_SQL_Running"
secondsBehindSourceField = "Seconds_Behind_Master"
}
results, err := e.txConn.tabletGateway.Execute(ctx, nil, ts.Target, sql, nil, 0, 0)
results, err := e.txConn.tabletGateway.Execute(ctx, nil, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
log.Warningf("Could not get replication status from %s: %v", tabletHostPort, err)
} else if row := results.Named().Row(); row != nil {
Expand Down
32 changes: 16 additions & 16 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (stc *ScatterConn) ExecuteMultiShard(

if opts == nil && fetchLastInsertID {
opts = &querypb.ExecuteOptions{FetchLastInsertId: fetchLastInsertID}
session = econtext.NewSafeSession(&vtgatepb.Session{Options: opts})
}

if autocommit {
Expand Down Expand Up @@ -224,21 +223,21 @@ func (stc *ScatterConn) ExecuteMultiShard(

switch info.actionNeeded {
case nothing:
innerqr, err = qs.Execute(ctx, session, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID)
innerqr, err = qs.Execute(ctx, session, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts)
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserve
info.ignoreOldSession = true
var state queryservice.ReservedState
state, innerqr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/)
state, innerqr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case begin:
var state queryservice.TransactionState
state, innerqr, err = qs.BeginExecute(ctx, session, rs.Target, session.SavePoints(), queries[i].Sql, queries[i].BindVariables, reservedID)
state, innerqr, err = qs.BeginExecute(ctx, session, rs.Target, session.SavePoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts)
transactionID = state.TransactionID
alias = state.TabletAlias
if err != nil {
Expand All @@ -247,20 +246,20 @@ func (stc *ScatterConn) ExecuteMultiShard(
info.actionNeeded = reserveBegin
info.ignoreOldSession = true
var state queryservice.ReservedTransactionState
state, innerqr, err = qs.ReserveBeginExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables)
state, innerqr, err = qs.ReserveBeginExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case reserve:
var state queryservice.ReservedState
state, innerqr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID)
state, innerqr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
reservedID = state.ReservedID
alias = state.TabletAlias
case reserveBegin:
var state queryservice.ReservedTransactionState
state, innerqr, err = qs.ReserveBeginExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables)
state, innerqr, err = qs.ReserveBeginExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
Expand Down Expand Up @@ -425,7 +424,6 @@ func (stc *ScatterConn) StreamExecuteMulti(

if opts == nil && fetchLastInsertID {
opts = &querypb.ExecuteOptions{FetchLastInsertId: fetchLastInsertID}
session = econtext.NewSafeSession(&vtgatepb.Session{Options: opts})
}

if autocommit {
Expand Down Expand Up @@ -456,41 +454,41 @@ func (stc *ScatterConn) StreamExecuteMulti(

switch info.actionNeeded {
case nothing:
err = qs.StreamExecute(ctx, session, rs.Target, query, bindVars[i], transactionID, reservedID, observedCallback)
err = qs.StreamExecute(ctx, session, rs.Target, query, bindVars[i], transactionID, reservedID, opts, observedCallback)
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserve
var state queryservice.ReservedState
state, err = qs.ReserveStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), query, bindVars[i], 0 /*transactionId*/, observedCallback)
state, err = qs.ReserveStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), query, bindVars[i], 0 /*transactionId*/, opts, observedCallback)
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case begin:
var state queryservice.TransactionState
state, err = qs.BeginStreamExecute(ctx, session, rs.Target, session.SavePoints(), query, bindVars[i], reservedID, observedCallback)
state, err = qs.BeginStreamExecute(ctx, session, rs.Target, session.SavePoints(), query, bindVars[i], reservedID, opts, observedCallback)
transactionID = state.TransactionID
alias = state.TabletAlias
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
var state queryservice.ReservedTransactionState
state, err = qs.ReserveBeginStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], observedCallback)
state, err = qs.ReserveBeginStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, observedCallback)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case reserve:
var state queryservice.ReservedState
state, err = qs.ReserveStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), query, bindVars[i], transactionID, observedCallback)
state, err = qs.ReserveStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), query, bindVars[i], transactionID, opts, observedCallback)
reservedID = state.ReservedID
alias = state.TabletAlias
case reserveBegin:
var state queryservice.ReservedTransactionState
state, err = qs.ReserveBeginStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], observedCallback)
state, err = qs.ReserveBeginStreamExecute(ctx, session, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, observedCallback)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
Expand Down Expand Up @@ -761,6 +759,7 @@ func (stc *ScatterConn) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedSha
var (
qr *sqltypes.Result
err error
opts *querypb.ExecuteOptions
alias *topodatapb.TabletAlias
)
allErrors := new(concurrency.AllErrorRecorder)
Expand All @@ -771,6 +770,7 @@ func (stc *ScatterConn) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedSha
return nil, vterrors.VT13001("session cannot be nil")
}

opts = session.Options
info, err := lockInfo(rs.Target, session, lockFuncType)
// Lock session is created on alphabetic sorted keyspace.
// This error will occur if the existing session target does not match the current target.
Expand All @@ -788,7 +788,7 @@ func (stc *ScatterConn) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedSha

switch info.actionNeeded {
case nothing:
qr, err = qs.Execute(ctx, session, rs.Target, query.Sql, query.BindVariables, 0 /* transactionID */, reservedID)
qr, err = qs.Execute(ctx, session, rs.Target, query.Sql, query.BindVariables, 0 /* transactionID */, reservedID, opts)
if err != nil && wasConnectionClosed(err) {
// TODO: try to acquire lock again.
session.ResetLock()
Expand All @@ -799,7 +799,7 @@ func (stc *ScatterConn) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedSha
}
case reserve:
var state queryservice.ReservedState
state, qr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), query.Sql, query.BindVariables, 0 /* transactionID */)
state, qr, err = qs.ReserveExecute(ctx, session, rs.Target, session.SetPreQueries(), query.Sql, query.BindVariables, 0 /* transactionID */, opts)
reservedID = state.ReservedID
alias = state.TabletAlias
if err != nil && reservedID != 0 {
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
sbc.SetResults([]*sqltypes.Result{sqlResult1})

// run a query that we indeed get the result added to the sandbox connection back
res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0)
res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil)
require.NoError(t, err)
require.Equal(t, res, sqlResult1)

Expand All @@ -114,7 +114,7 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// execute the query in a go routine since it should be buffered, and check that it eventually succeed
queryChan := make(chan struct{})
go func() {
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0)
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil)
queryChan <- struct{}{}
}()

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) {

// run a query that we indeed get the result added to the sandbox connection back
// this also checks that the query reaches the primary tablet and not the replica
res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0)
res, err := tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil)
require.NoError(t, err)
require.Equal(t, res, sqlResult1)

Expand Down Expand Up @@ -224,7 +224,7 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) {
// execute the query in a go routine since it should be buffered, and check that it eventually succeed
queryChan := make(chan struct{})
go func() {
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0)
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil)
queryChan <- struct{}{}
}()

Expand Down Expand Up @@ -332,7 +332,7 @@ func TestInconsistentStateDetectedBuffering(t *testing.T) {
var err error
queryChan := make(chan struct{})
go func() {
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0)
res, err = tg.Execute(ctx, nil, target, "query", nil, 0, 0, nil)
queryChan <- struct{}{}
}()

Expand Down
Loading
Loading