Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit 7c195d8

Browse files
committed
feat(RNDPOSTGRES-59): support of transaction streaming
1 parent 7137f04 commit 7c195d8

File tree

15 files changed

+1011
-220
lines changed

15 files changed

+1011
-220
lines changed

cmd/postgres/driver.go

Lines changed: 74 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,43 @@ package main
33
import (
44
"context"
55

6-
"github.com/jackc/pgx/v5/pgconn"
6+
trmpgx "github.com/avito-tech/go-transaction-manager/drivers/pgxv5/v2"
7+
"github.com/avito-tech/go-transaction-manager/trm/v2/manager"
78
"go.uber.org/zap"
89

910
"github.com/stroppy-io/stroppy-core/pkg/logger"
1011
"github.com/stroppy-io/stroppy-core/pkg/plugins/driver"
1112
stroppy "github.com/stroppy-io/stroppy-core/pkg/proto"
13+
"github.com/stroppy-io/stroppy-core/pkg/utils/errchan"
1214

1315
"github.com/stroppy-io/stroppy-postgres/internal/pool"
1416
"github.com/stroppy-io/stroppy-postgres/internal/queries"
1517
)
1618

17-
type Connection interface {
18-
// Exec executes the given SQL statement with the provided arguments in the context of the Executor.
19-
//
20-
// Parameters:
21-
// - ctx: The context.Context object.
22-
// - sql: The SQL statement to execute.
23-
// - arguments: The arguments to be passed to the SQL statement.
24-
//
25-
// Returns:
26-
// - pgconn.CommandTag: The command tag returned by the execution.
27-
// - error: An error if the execution fails.
28-
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
29-
Close()
30-
}
31-
3219
type QueryBuilder interface {
3320
Build(
3421
ctx context.Context,
3522
logger *zap.Logger,
36-
buildQueriesContext *stroppy.BuildQueriesContext,
37-
) (*stroppy.DriverQueriesList, error)
23+
buildQueriesContext *stroppy.UnitBuildContext,
24+
) (*stroppy.DriverTransactionList, error)
25+
BuildStream(
26+
ctx context.Context,
27+
logger *zap.Logger,
28+
buildQueriesContext *stroppy.UnitBuildContext,
29+
channel errchan.Chan[stroppy.DriverTransaction],
30+
)
3831
ValueToPgxValue(value *stroppy.Value) (any, error)
3932
}
4033

4134
type Driver struct {
42-
logger *zap.Logger
43-
connPool Connection
44-
builder QueryBuilder
35+
logger *zap.Logger
36+
pgxPool interface {
37+
Executor
38+
Close()
39+
}
40+
txManager *manager.Manager
41+
txExecutor *TxExecutor
42+
builder QueryBuilder
4543
}
4644

4745
func NewDriver() driver.Plugin { //nolint: ireturn // allow
@@ -55,56 +53,89 @@ func NewDriver() driver.Plugin { //nolint: ireturn // allow
5553
func (d *Driver) Initialize(ctx context.Context, runContext *stroppy.StepContext) error {
5654
connPool, err := pool.NewPool(
5755
ctx,
58-
runContext.GetConfig().GetDriver(),
56+
runContext.GetGlobalConfig().GetRun().GetDriver(),
5957
d.logger.Named(pool.LoggerName),
6058
)
6159
if err != nil {
6260
return err
6361
}
6462

65-
d.connPool = connPool
63+
d.pgxPool = connPool
6664

6765
d.builder, err = queries.NewQueryBuilder(runContext)
6866
if err != nil {
6967
return err
7068
}
7169

70+
d.txManager = manager.Must(trmpgx.NewDefaultFactory(connPool))
71+
d.txExecutor = NewTxExecutor(connPool)
72+
7273
return nil
7374
}
7475

75-
func (d *Driver) BuildQueries(
76+
func (d *Driver) BuildTransactionsFromUnit(
7677
ctx context.Context,
77-
buildQueriesContext *stroppy.BuildQueriesContext,
78-
) (*stroppy.DriverQueriesList, error) {
79-
return d.builder.Build(ctx, d.logger, buildQueriesContext)
78+
buildUnitContext *stroppy.UnitBuildContext,
79+
) (*stroppy.DriverTransactionList, error) {
80+
return d.builder.Build(ctx, d.logger, buildUnitContext)
8081
}
8182

82-
func (d *Driver) RunQuery(ctx context.Context, query *stroppy.DriverQuery) error {
83-
d.logger.Debug(
84-
"run query",
85-
zap.String("name", query.GetName()),
86-
zap.String("sql", query.GetRequest()),
87-
zap.Any("args", query.GetParams()),
88-
)
83+
func (d *Driver) BuildTransactionsFromUnitStream(
84+
ctx context.Context,
85+
buildUnitContext *stroppy.UnitBuildContext,
86+
) (errchan.Chan[stroppy.DriverTransaction], error) {
87+
channel := make(errchan.Chan[stroppy.DriverTransaction])
88+
go func() {
89+
d.builder.BuildStream(ctx, d.logger, buildUnitContext, channel)
90+
}()
91+
92+
return channel, nil
93+
}
8994

90-
values := make([]any, len(query.GetParams()))
95+
func (d *Driver) RunTransaction(
96+
ctx context.Context,
97+
transaction *stroppy.DriverTransaction,
98+
) error {
99+
if transaction.GetIsolationLevel() == stroppy.TxIsolationLevel_TX_ISOLATION_LEVEL_UNSPECIFIED {
100+
return d.runTransactionInternal(ctx, transaction, d.pgxPool)
101+
}
102+
103+
return d.txManager.DoWithSettings(
104+
ctx,
105+
NewStroppyIsolationSettings(transaction),
106+
func(ctx context.Context) error {
107+
return d.runTransactionInternal(ctx, transaction, d.txExecutor)
108+
})
109+
}
110+
111+
func (d *Driver) runTransactionInternal(
112+
ctx context.Context,
113+
transaction *stroppy.DriverTransaction,
114+
executor Executor,
115+
) error {
116+
for _, query := range transaction.GetQueries() {
117+
values := make([]any, len(query.GetParams()))
118+
119+
for i, v := range query.GetParams() {
120+
val, err := d.builder.ValueToPgxValue(v)
121+
if err != nil {
122+
return err
123+
}
124+
125+
values[i] = val
126+
}
91127

92-
for i, v := range query.GetParams() {
93-
val, err := d.builder.ValueToPgxValue(v)
128+
_, err := executor.Exec(ctx, query.GetRequest(), values...)
94129
if err != nil {
95130
return err
96131
}
97-
98-
values[i] = val
99132
}
100133

101-
_, err := d.connPool.Exec(ctx, query.GetRequest(), values...)
102-
103-
return err
134+
return nil
104135
}
105136

106137
func (d *Driver) Teardown(_ context.Context) error {
107-
d.connPool.Close()
138+
d.pgxPool.Close()
108139

109140
return nil
110141
}

cmd/postgres/driver_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ type testDriver struct {
1818
func newTestDriver(mockPool pgxmock.PgxPoolIface) *testDriver {
1919
return &testDriver{
2020
Driver: &Driver{
21-
logger: logger.Global(),
22-
connPool: mockPool,
21+
logger: logger.Global(),
22+
pgxPool: mockPool,
2323
},
2424
}
2525
}
@@ -32,15 +32,19 @@ func TestDriver_RunQuery(t *testing.T) {
3232
drv := newTestDriver(mock)
3333

3434
ctx := context.Background()
35-
query := &stroppy.DriverQuery{
36-
Name: "test_query",
37-
Request: "SELECT 1",
38-
Params: nil,
35+
query := &stroppy.DriverTransaction{
36+
Queries: []*stroppy.DriverQuery{
37+
{
38+
Name: "test_query",
39+
Request: "SELECT 1",
40+
Params: nil,
41+
},
42+
},
3943
}
4044

4145
mock.ExpectExec("SELECT 1").WillReturnResult(pgxmock.NewResult("SELECT", 1))
4246

43-
err = drv.RunQuery(ctx, query)
47+
err = drv.RunTransaction(ctx, query)
4448
require.NoError(t, err)
4549

4650
require.NoError(t, mock.ExpectationsWereMet())

cmd/postgres/execute.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
trmpgx "github.com/avito-tech/go-transaction-manager/drivers/pgxv5/v2"
7+
"github.com/jackc/pgx/v5/pgconn"
8+
)
9+
10+
type Executor interface {
11+
// Exec executes the given SQL statement with the provided arguments in the context of the Executor.
12+
//
13+
// Parameters:
14+
// - ctx: The context.Context object.
15+
// - sql: The SQL statement to execute.
16+
// - arguments: The arguments to be passed to the SQL statement.
17+
//
18+
// Returns:
19+
// - pgconn.CommandTag: The command tag returned by the execution.
20+
// - error: An error if the execution fails.
21+
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
22+
}
23+
24+
type ctxGetter interface {
25+
// DefaultTrOrDB returns the default transaction or the provided transaction
26+
// from the context, if it exists.
27+
//
28+
// Parameters:
29+
// - ctx: The context.Context object.
30+
// - db: The transaction to use if it exists in the context.
31+
//
32+
// Returns:
33+
// - trmpgx.Tr: The transaction to use.
34+
DefaultTrOrDB(ctx context.Context, db trmpgx.Tr) trmpgx.Tr
35+
}
36+
37+
var _ Executor = (*TxExecutor)(nil)
38+
39+
type TxExecutor struct {
40+
defaultTr trmpgx.Tr
41+
ctxGetter ctxGetter
42+
}
43+
44+
// NewTxExecutor creates a new TxExecutor with the given defaultTr and options.
45+
//
46+
// Parameters:
47+
// - defaultTr: The default transaction to use.
48+
//
49+
// Returns:
50+
// - *TxExecutor: The newly created TxExecutor.
51+
func NewTxExecutor(defaultTr trmpgx.Tr) *TxExecutor {
52+
executor := &TxExecutor{
53+
defaultTr: defaultTr,
54+
ctxGetter: trmpgx.DefaultCtxGetter,
55+
}
56+
57+
return executor
58+
}
59+
60+
// tr returns the transaction to use based on the provided context.
61+
//
62+
// It first calls the ctxGetter's DefaultTrOrDB method to get the transaction from the context,
63+
// or the default transaction if it doesn't exist. If the returned transaction is nil,
64+
// it returns the default transaction. Otherwise, it returns the obtained transaction.
65+
//
66+
// Parameters:
67+
// - ctx: The context.Context object.
68+
//
69+
// Returns:
70+
// - trmpgx.Tr: The transaction to use.
71+
func (e *TxExecutor) tr(ctx context.Context) trmpgx.Tr { //nolint:ireturn // lib
72+
tr := e.ctxGetter.DefaultTrOrDB(ctx, e.defaultTr)
73+
if tr == nil {
74+
return e.defaultTr
75+
}
76+
77+
return tr
78+
}
79+
80+
// Exec executes the given SQL statement with the provided arguments in the context of the TxExecutor.
81+
//
82+
// Parameters:
83+
// - ctx: The context.Context object.
84+
// - sql: The SQL statement to execute.
85+
// - arguments: The arguments to be passed to the SQL statement.
86+
//
87+
// Returns:
88+
// - pgconn.CommandTag: The command tag returned by the execution.
89+
// - error: An error if the execution fails.
90+
func (e *TxExecutor) Exec(
91+
ctx context.Context,
92+
sql string,
93+
arguments ...interface{},
94+
) (pgconn.CommandTag, error) {
95+
tag, err := e.tr(ctx).Exec(ctx, sql, arguments...)
96+
if err != nil {
97+
return pgconn.CommandTag{}, err
98+
}
99+
100+
return tag, nil
101+
}

cmd/postgres/isolation.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
6+
trmpgx "github.com/avito-tech/go-transaction-manager/drivers/pgxv5/v2"
7+
"github.com/avito-tech/go-transaction-manager/trm/v2/settings"
8+
"github.com/jackc/pgx/v5"
9+
10+
stroppy "github.com/stroppy-io/stroppy-core/pkg/proto"
11+
)
12+
13+
func NewSettings(level pgx.TxIsoLevel, opts ...settings.Opt) *trmpgx.Settings {
14+
setts := trmpgx.MustSettings(settings.Must(opts...),
15+
trmpgx.WithTxOptions(pgx.TxOptions{
16+
IsoLevel: level,
17+
}),
18+
)
19+
20+
return &setts
21+
}
22+
23+
func ReadUncommittedSettings(opts ...settings.Opt) *trmpgx.Settings {
24+
return NewSettings(pgx.ReadUncommitted, opts...)
25+
}
26+
27+
func ReadCommittedSettings(opts ...settings.Opt) *trmpgx.Settings {
28+
return NewSettings(pgx.ReadCommitted, opts...)
29+
}
30+
31+
func RepeatableReadSettings(opts ...settings.Opt) *trmpgx.Settings {
32+
return NewSettings(pgx.RepeatableRead, opts...)
33+
}
34+
35+
func SerializableSettings(opts ...settings.Opt) *trmpgx.Settings {
36+
return NewSettings(pgx.Serializable, opts...)
37+
}
38+
39+
var ErrUnsupportedIsolationLevel = errors.New("unsupported isolation level")
40+
41+
func NewStroppyIsolationSettings(transaction *stroppy.DriverTransaction, opts ...settings.Opt) *trmpgx.Settings {
42+
switch transaction.GetIsolationLevel() {
43+
case stroppy.TxIsolationLevel_TX_ISOLATION_LEVEL_READ_UNCOMMITTED:
44+
return ReadUncommittedSettings(opts...)
45+
case stroppy.TxIsolationLevel_TX_ISOLATION_LEVEL_READ_COMMITTED:
46+
return ReadCommittedSettings(opts...)
47+
case stroppy.TxIsolationLevel_TX_ISOLATION_LEVEL_REPEATABLE_READ:
48+
return RepeatableReadSettings(opts...)
49+
case stroppy.TxIsolationLevel_TX_ISOLATION_LEVEL_SERIALIZABLE:
50+
return SerializableSettings(opts...)
51+
default:
52+
panic(ErrUnsupportedIsolationLevel)
53+
}
54+
}

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ module github.com/stroppy-io/stroppy-postgres
33
go 1.24.3
44

55
require (
6+
github.com/avito-tech/go-transaction-manager/drivers/pgxv5/v2 v2.0.1
7+
github.com/avito-tech/go-transaction-manager/trm/v2 v2.0.1
68
github.com/google/uuid v1.6.0
79
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e
810
github.com/jackc/pgx/v5 v5.7.5
911
github.com/orcaman/concurrent-map/v2 v2.0.1
1012
github.com/pashagolub/pgxmock/v4 v4.8.0
1113
github.com/shopspring/decimal v1.4.0
1214
github.com/stretchr/testify v1.10.0
13-
github.com/stroppy-io/stroppy-core v0.0.3
15+
github.com/stroppy-io/stroppy-core v0.0.7
1416
go.uber.org/zap v1.27.0
1517
google.golang.org/protobuf v1.36.7
1618
)

0 commit comments

Comments
 (0)