Skip to content

Commit 94c5618

Browse files
authored
Improve Multi perf on SQL state stores with 1 op only (dapr#3300)
Signed-off-by: ItalyPaleAle <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]>
1 parent 60cd144 commit 94c5618

File tree

8 files changed

+354
-190
lines changed

8 files changed

+354
-190
lines changed

common/component/postgresql/v1/postgresql.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -438,29 +438,40 @@ func (p *PostgreSQL) doDelete(parentCtx context.Context, db pginterfaces.DBQueri
438438
}
439439

440440
func (p *PostgreSQL) Multi(parentCtx context.Context, request *state.TransactionalStateRequest) error {
441-
_, err := pgtransactions.ExecuteInTransaction[struct{}](parentCtx, p.logger, p.db, p.metadata.Timeout, func(ctx context.Context, tx pgx.Tx) (res struct{}, err error) {
442-
for _, o := range request.Operations {
443-
switch x := o.(type) {
444-
case state.SetRequest:
445-
err = p.doSet(parentCtx, tx, &x)
441+
if request == nil {
442+
return nil
443+
}
444+
445+
// If there's only 1 operation, skip starting a transaction
446+
switch len(request.Operations) {
447+
case 0:
448+
return nil
449+
case 1:
450+
return p.execMultiOperation(parentCtx, request.Operations[0], p.db)
451+
default:
452+
_, err := pgtransactions.ExecuteInTransaction[struct{}](parentCtx, p.logger, p.db, p.metadata.Timeout, func(ctx context.Context, tx pgx.Tx) (res struct{}, err error) {
453+
for _, op := range request.Operations {
454+
err = p.execMultiOperation(ctx, op, tx)
446455
if err != nil {
447456
return res, err
448457
}
449-
450-
case state.DeleteRequest:
451-
err = p.doDelete(parentCtx, tx, &x)
452-
if err != nil {
453-
return res, err
454-
}
455-
456-
default:
457-
return res, fmt.Errorf("unsupported operation: %s", o.Operation())
458458
}
459-
}
460459

461-
return res, nil
462-
})
463-
return err
460+
return res, nil
461+
})
462+
return err
463+
}
464+
}
465+
466+
func (p *PostgreSQL) execMultiOperation(ctx context.Context, op state.TransactionalStateOperation, db pginterfaces.DBQuerier) error {
467+
switch x := op.(type) {
468+
case state.SetRequest:
469+
return p.doSet(ctx, db, &x)
470+
case state.DeleteRequest:
471+
return p.doDelete(ctx, db, &x)
472+
default:
473+
return fmt.Errorf("unsupported operation: %s", op.Operation())
474+
}
464475
}
465476

466477
func (p *PostgreSQL) CleanupExpired() error {

common/component/postgresql/v1/postgresql_test.go

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,9 @@ func TestMultiWithNoRequests(t *testing.T) {
4444
m, _ := mockDatabase(t)
4545
defer m.db.Close()
4646

47-
m.db.ExpectBegin()
48-
m.db.ExpectCommit()
49-
// There's also a rollback called after a commit, which is expected and will not have effect
50-
m.db.ExpectRollback()
51-
52-
var operations []state.TransactionalStateOperation
53-
5447
// Act
5548
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
56-
Operations: operations,
49+
Operations: nil,
5750
})
5851

5952
// Assert
@@ -66,24 +59,46 @@ func TestValidSetRequest(t *testing.T) {
6659
defer m.db.Close()
6760

6861
setReq := createSetRequest()
69-
operations := []state.TransactionalStateOperation{setReq}
7062
val, _ := json.Marshal(setReq.Value)
7163

72-
m.db.ExpectBegin()
73-
m.db.ExpectExec("INSERT INTO").
74-
WithArgs(setReq.Key, string(val), false).
75-
WillReturnResult(pgxmock.NewResult("INSERT", 1))
76-
m.db.ExpectCommit()
77-
// There's also a rollback called after a commit, which is expected and will not have effect
78-
m.db.ExpectRollback()
64+
t.Run("single op", func(t *testing.T) {
65+
operations := []state.TransactionalStateOperation{setReq}
7966

80-
// Act
81-
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
82-
Operations: operations,
67+
m.db.ExpectExec("INSERT INTO").
68+
WithArgs(setReq.Key, string(val), false).
69+
WillReturnResult(pgxmock.NewResult("INSERT", 1))
70+
71+
// Act
72+
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
73+
Operations: operations,
74+
})
75+
76+
// Assert
77+
require.NoError(t, err)
8378
})
8479

85-
// Assert
86-
require.NoError(t, err)
80+
t.Run("multiple ops", func(t *testing.T) {
81+
operations := []state.TransactionalStateOperation{setReq, setReq}
82+
83+
m.db.ExpectBegin()
84+
m.db.ExpectExec("INSERT INTO").
85+
WithArgs(setReq.Key, string(val), false).
86+
WillReturnResult(pgxmock.NewResult("INSERT", 1))
87+
m.db.ExpectExec("INSERT INTO").
88+
WithArgs(setReq.Key, string(val), false).
89+
WillReturnResult(pgxmock.NewResult("INSERT", 1))
90+
m.db.ExpectCommit()
91+
// There's also a rollback called after a commit, which is expected and will not have effect
92+
m.db.ExpectRollback()
93+
94+
// Act
95+
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
96+
Operations: operations,
97+
})
98+
99+
// Assert
100+
require.NoError(t, err)
101+
})
87102
}
88103

89104
func TestInvalidMultiSetRequestNoKey(t *testing.T) {
@@ -113,23 +128,45 @@ func TestValidMultiDeleteRequest(t *testing.T) {
113128
defer m.db.Close()
114129

115130
deleteReq := createDeleteRequest()
116-
operations := []state.TransactionalStateOperation{deleteReq}
117131

118-
m.db.ExpectBegin()
119-
m.db.ExpectExec("DELETE FROM").
120-
WithArgs(deleteReq.Key).
121-
WillReturnResult(pgxmock.NewResult("DELETE", 1))
122-
m.db.ExpectCommit()
123-
// There's also a rollback called after a commit, which is expected and will not have effect
124-
m.db.ExpectRollback()
132+
t.Run("single op", func(t *testing.T) {
133+
operations := []state.TransactionalStateOperation{deleteReq}
125134

126-
// Act
127-
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
128-
Operations: operations,
135+
m.db.ExpectExec("DELETE FROM").
136+
WithArgs(deleteReq.Key).
137+
WillReturnResult(pgxmock.NewResult("DELETE", 1))
138+
139+
// Act
140+
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
141+
Operations: operations,
142+
})
143+
144+
// Assert
145+
require.NoError(t, err)
129146
})
130147

131-
// Assert
132-
require.NoError(t, err)
148+
t.Run("multiple ops", func(t *testing.T) {
149+
operations := []state.TransactionalStateOperation{deleteReq, deleteReq}
150+
151+
m.db.ExpectBegin()
152+
m.db.ExpectExec("DELETE FROM").
153+
WithArgs(deleteReq.Key).
154+
WillReturnResult(pgxmock.NewResult("DELETE", 1))
155+
m.db.ExpectExec("DELETE FROM").
156+
WithArgs(deleteReq.Key).
157+
WillReturnResult(pgxmock.NewResult("DELETE", 1))
158+
m.db.ExpectCommit()
159+
// There's also a rollback called after a commit, which is expected and will not have effect
160+
m.db.ExpectRollback()
161+
162+
// Act
163+
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{
164+
Operations: operations,
165+
})
166+
167+
// Assert
168+
require.NoError(t, err)
169+
})
133170
}
134171

135172
func TestInvalidMultiDeleteRequestNoKey(t *testing.T) {
@@ -140,7 +177,7 @@ func TestInvalidMultiDeleteRequestNoKey(t *testing.T) {
140177
m.db.ExpectBegin()
141178
m.db.ExpectRollback()
142179

143-
operations := []state.TransactionalStateOperation{state.DeleteRequest{}} // Delete request without key is not valid for Delete operation
180+
operations := []state.TransactionalStateOperation{state.DeleteRequest{}, state.DeleteRequest{}} // Delete request without key is not valid for Delete operation
144181

145182
// Act
146183
err := m.pg.Multi(context.Background(), &state.TransactionalStateRequest{

state/mysql/mysql.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -768,40 +768,50 @@ func readRow(row interface{ Scan(dest ...any) error }) (key string, value []byte
768768
return key, value, &etag, expireTime, nil
769769
}
770770

771-
// Multi handles multiple transactions.
772-
// TransactionalStore Interface.
771+
// Multi handles multiple operations in batch.
772+
// Implements the TransactionalStore Interface.
773773
func (m *MySQL) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
774-
tx, err := m.db.Begin()
775-
if err != nil {
776-
return err
774+
if request == nil {
775+
return nil
777776
}
778-
defer func() {
779-
rollbackErr := tx.Rollback()
780-
if rollbackErr != nil && !errors.Is(rollbackErr, sql.ErrTxDone) {
781-
m.logger.Errorf("Error rolling back transaction: %v", rollbackErr)
782-
}
783-
}()
784777

785-
for _, o := range request.Operations {
786-
switch req := o.(type) {
787-
case state.SetRequest:
788-
err = m.setValue(ctx, tx, &req)
789-
if err != nil {
790-
return err
778+
// If there's only 1 operation, skip starting a transaction
779+
switch len(request.Operations) {
780+
case 0:
781+
return nil
782+
case 1:
783+
return m.execMultiOperation(ctx, request.Operations[0], m.db)
784+
default:
785+
tx, err := m.db.Begin()
786+
if err != nil {
787+
return err
788+
}
789+
defer func() {
790+
rollbackErr := tx.Rollback()
791+
if rollbackErr != nil && !errors.Is(rollbackErr, sql.ErrTxDone) {
792+
m.logger.Errorf("Error rolling back transaction: %v", rollbackErr)
791793
}
794+
}()
792795

793-
case state.DeleteRequest:
794-
err = m.deleteValue(ctx, tx, &req)
796+
for _, op := range request.Operations {
797+
err = m.execMultiOperation(ctx, op, tx)
795798
if err != nil {
796799
return err
797800
}
798-
799-
default:
800-
return fmt.Errorf("unsupported operation: %s", req.Operation())
801801
}
802+
return tx.Commit()
802803
}
804+
}
803805

804-
return tx.Commit()
806+
func (m *MySQL) execMultiOperation(ctx context.Context, op state.TransactionalStateOperation, db querier) error {
807+
switch req := op.(type) {
808+
case state.SetRequest:
809+
return m.setValue(ctx, db, &req)
810+
case state.DeleteRequest:
811+
return m.deleteValue(ctx, db, &req)
812+
default:
813+
return fmt.Errorf("unsupported operation: %s", op.Operation())
814+
}
805815
}
806816

807817
// Close implements io.Closer.

0 commit comments

Comments
 (0)