Skip to content

Commit a4f324c

Browse files
committed
splitted internal/xsql to internal/{xsql,table/conn,query/conn}
1 parent 21fab94 commit a4f324c

39 files changed

+1543
-1271
lines changed

driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ type (
9393
topic *xsync.Once[*topicclientinternal.Client]
9494
topicOptions []topicoptions.TopicOption
9595

96-
databaseSQLOptions []xsql.ConnectorOption
96+
databaseSQLOptions []xsql.Option
9797

9898
pool *conn.Pool
9999

dsn.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/bind"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
13+
tableSql "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/conn"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
1516
)
@@ -59,29 +60,29 @@ func parseConnectionString(dataSourceName string) (opts []Option, _ error) {
5960
opts = append(opts, WithBalancer(balancers.FromConfig(balancer)))
6061
}
6162
if queryMode := info.Params.Get("go_query_mode"); queryMode != "" {
62-
mode := xsql.QueryModeFromString(queryMode)
63-
if mode == xsql.UnknownQueryMode {
63+
mode := tableSql.QueryModeFromString(queryMode)
64+
if mode == tableSql.UnknownQueryMode {
6465
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
6566
}
6667
opts = append(opts, withConnectorOptions(xsql.WithDefaultQueryMode(mode)))
6768
} else if queryMode := info.Params.Get("query_mode"); queryMode != "" {
68-
mode := xsql.QueryModeFromString(queryMode)
69-
if mode == xsql.UnknownQueryMode {
69+
mode := tableSql.QueryModeFromString(queryMode)
70+
if mode == tableSql.UnknownQueryMode {
7071
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
7172
}
7273
opts = append(opts, withConnectorOptions(xsql.WithDefaultQueryMode(mode)))
7374
}
7475
if fakeTx := info.Params.Get("go_fake_tx"); fakeTx != "" {
7576
for _, queryMode := range strings.Split(fakeTx, ",") {
76-
mode := xsql.QueryModeFromString(queryMode)
77-
if mode == xsql.UnknownQueryMode {
77+
mode := tableSql.QueryModeFromString(queryMode)
78+
if mode == tableSql.UnknownQueryMode {
7879
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
7980
}
8081
opts = append(opts, withConnectorOptions(xsql.WithFakeTx(mode)))
8182
}
8283
}
8384
if info.Params.Has("go_query_bind") {
84-
var binders []xsql.ConnectorOption
85+
var binders []xsql.Option
8586
queryTransformers := strings.Split(info.Params.Get("go_query_bind"), ",")
8687
for _, transformer := range queryTransformers {
8788
switch transformer {
@@ -97,7 +98,7 @@ func parseConnectionString(dataSourceName string) (opts []Option, _ error) {
9798
if err != nil {
9899
return nil, xerrors.WithStackTrace(err)
99100
}
100-
binders = append(binders, xsql.WithTablePathPrefix(prefix))
101+
binders = append(binders, xsql.WithQueryBind(bind.TablePathPrefix(prefix)))
101102
} else {
102103
return nil, xerrors.WithStackTrace(
103104
fmt.Errorf("unknown query rewriter: %s", transformer),

dsn_test.go

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import (
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/bind"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/conn"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
1213
)
1314

1415
func TestParse(t *testing.T) {
15-
newConnector := func(opts ...xsql.ConnectorOption) *xsql.Connector {
16+
newConnector := func(opts ...xsql.Option) *xsql.Connector {
1617
c := &xsql.Connector{}
1718
for _, opt := range opts {
1819
if opt != nil {
@@ -32,7 +33,7 @@ func TestParse(t *testing.T) {
3233
for _, tt := range []struct {
3334
dsn string
3435
opts []config.Option
35-
connectorOpts []xsql.ConnectorOption
36+
connectorOpts []xsql.Option
3637
err error
3738
}{
3839
{
@@ -42,9 +43,9 @@ func TestParse(t *testing.T) {
4243
config.WithEndpoint("localhost:2135"),
4344
config.WithDatabase("/local"),
4445
},
45-
connectorOpts: []xsql.ConnectorOption{
46-
xsql.WithFakeTx(xsql.ScriptingQueryMode),
47-
xsql.WithFakeTx(xsql.SchemeQueryMode),
46+
connectorOpts: []xsql.Option{
47+
xsql.WithFakeTx(conn.ScriptingQueryMode),
48+
xsql.WithFakeTx(conn.SchemeQueryMode),
4849
},
4950
err: nil,
5051
},
@@ -75,8 +76,8 @@ func TestParse(t *testing.T) {
7576
config.WithEndpoint("localhost:2135"),
7677
config.WithDatabase("/local"),
7778
},
78-
connectorOpts: []xsql.ConnectorOption{
79-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
79+
connectorOpts: []xsql.Option{
80+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
8081
},
8182
err: nil,
8283
},
@@ -87,9 +88,9 @@ func TestParse(t *testing.T) {
8788
config.WithEndpoint("localhost:2135"),
8889
config.WithDatabase("/local"),
8990
},
90-
connectorOpts: []xsql.ConnectorOption{
91-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
92-
xsql.WithTablePathPrefix("path/to/tables"),
91+
connectorOpts: []xsql.Option{
92+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
93+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
9394
},
9495
err: nil,
9596
},
@@ -100,9 +101,9 @@ func TestParse(t *testing.T) {
100101
config.WithEndpoint("localhost:2135"),
101102
config.WithDatabase("/local"),
102103
},
103-
connectorOpts: []xsql.ConnectorOption{
104-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
105-
xsql.WithTablePathPrefix("path/to/tables"),
104+
connectorOpts: []xsql.Option{
105+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
106+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
106107
xsql.WithQueryBind(bind.NumericArgs{}),
107108
},
108109
err: nil,
@@ -114,9 +115,9 @@ func TestParse(t *testing.T) {
114115
config.WithEndpoint("localhost:2135"),
115116
config.WithDatabase("/local"),
116117
},
117-
connectorOpts: []xsql.ConnectorOption{
118-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
119-
xsql.WithTablePathPrefix("path/to/tables"),
118+
connectorOpts: []xsql.Option{
119+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
120+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
120121
xsql.WithQueryBind(bind.PositionalArgs{}),
121122
},
122123
err: nil,
@@ -128,9 +129,9 @@ func TestParse(t *testing.T) {
128129
config.WithEndpoint("localhost:2135"),
129130
config.WithDatabase("/local"),
130131
},
131-
connectorOpts: []xsql.ConnectorOption{
132-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
133-
xsql.WithTablePathPrefix("path/to/tables"),
132+
connectorOpts: []xsql.Option{
133+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
134+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
134135
xsql.WithQueryBind(bind.AutoDeclare{}),
135136
},
136137
err: nil,
@@ -142,9 +143,9 @@ func TestParse(t *testing.T) {
142143
config.WithEndpoint("localhost:2135"),
143144
config.WithDatabase("/local"),
144145
},
145-
connectorOpts: []xsql.ConnectorOption{
146-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
147-
xsql.WithTablePathPrefix("path/to/tables"),
146+
connectorOpts: []xsql.Option{
147+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
148+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
148149
},
149150
err: nil,
150151
},
@@ -155,9 +156,9 @@ func TestParse(t *testing.T) {
155156
config.WithEndpoint("localhost:2135"),
156157
config.WithDatabase("/local"),
157158
},
158-
connectorOpts: []xsql.ConnectorOption{
159-
xsql.WithDefaultQueryMode(xsql.ScriptingQueryMode),
160-
xsql.WithTablePathPrefix("path/to/tables"),
159+
connectorOpts: []xsql.Option{
160+
xsql.WithDefaultQueryMode(conn.ScriptingQueryMode),
161+
xsql.WithQueryBind(bind.TablePathPrefix("path/to/tables")),
161162
xsql.WithQueryBind(bind.PositionalArgs{}),
162163
xsql.WithQueryBind(bind.AutoDeclare{}),
163164
},

internal/query/client.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,38 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
511511
return nil
512512
}
513513

514+
func CreateSession(ctx context.Context, c *Client) (*Session, error) {
515+
s, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*Session, error) {
516+
var (
517+
createCtx context.Context
518+
cancelCreate context.CancelFunc
519+
)
520+
if d := c.config.SessionCreateTimeout(); d > 0 {
521+
createCtx, cancelCreate = xcontext.WithTimeout(ctx, d)
522+
} else {
523+
createCtx, cancelCreate = xcontext.WithCancel(ctx)
524+
}
525+
defer cancelCreate()
526+
527+
s, err := createSession(createCtx, c.client,
528+
session.WithDeleteTimeout(c.config.SessionDeleteTimeout()),
529+
session.WithTrace(c.config.Trace()),
530+
)
531+
if err != nil {
532+
return nil, xerrors.WithStackTrace(err)
533+
}
534+
535+
s.laztTx = c.config.LazyTx()
536+
537+
return s, nil
538+
})
539+
if err != nil {
540+
return nil, xerrors.WithStackTrace(err)
541+
}
542+
543+
return s, nil
544+
}
545+
514546
func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client {
515547
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
516548
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"),

internal/query/conn/conn.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package conn
2+
3+
import (
4+
"context"
5+
"database/sql/driver"
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/jonboulle/clockwork"
10+
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/bind"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/conn/badconn"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
19+
)
20+
21+
type (
22+
Parent interface {
23+
Query() *query.Client
24+
}
25+
currentTx interface {
26+
Rollback() error
27+
}
28+
conn struct {
29+
ctx context.Context
30+
parent Parent
31+
trace *trace.DatabaseSQL
32+
traceRetry *trace.Retry
33+
retryBudget budget.Budget
34+
bindings []bind.Bind
35+
session *query.Session
36+
clock clockwork.Clock
37+
onClose []func()
38+
closed atomic.Bool
39+
currentTx
40+
}
41+
)
42+
43+
func (c *conn) ID() string {
44+
return c.session.ID()
45+
}
46+
47+
func (c *conn) IsValid() bool {
48+
//TODO implement me
49+
panic("implement me")
50+
}
51+
52+
func (c *conn) CheckNamedValue(value *driver.NamedValue) error {
53+
//TODO implement me
54+
panic("implement me")
55+
}
56+
57+
func (c *conn) Ping(ctx context.Context) error {
58+
//TODO implement me
59+
panic("implement me")
60+
}
61+
62+
func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
63+
//TODO implement me
64+
panic("implement me")
65+
}
66+
67+
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
68+
//TODO implement me
69+
panic("implement me")
70+
}
71+
72+
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
73+
//TODO implement me
74+
panic("implement me")
75+
}
76+
77+
func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
78+
//TODO implement me
79+
panic("implement me")
80+
}
81+
82+
func (c *conn) Prepare(query string) (driver.Stmt, error) {
83+
//TODO implement me
84+
panic("implement me")
85+
}
86+
87+
func (c *conn) Close() (finalErr error) {
88+
if !c.closed.CompareAndSwap(false, true) {
89+
return badconn.Map(xerrors.WithStackTrace(errConnClosedEarly))
90+
}
91+
92+
defer func() {
93+
for _, onClose := range c.onClose {
94+
onClose()
95+
}
96+
}()
97+
98+
var (
99+
ctx = c.ctx
100+
onDone = trace.DatabaseSQLOnConnClose(
101+
c.trace, &ctx,
102+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/conn.(*conn).Close"),
103+
)
104+
)
105+
defer func() {
106+
onDone(finalErr)
107+
}()
108+
if c.currentTx != nil {
109+
_ = c.currentTx.Rollback()
110+
}
111+
err := c.session.Close(xcontext.ValueOnly(ctx))
112+
if err != nil {
113+
return badconn.Map(xerrors.WithStackTrace(err))
114+
}
115+
116+
return nil
117+
}
118+
119+
func (c *conn) Begin() (driver.Tx, error) {
120+
//TODO implement me
121+
panic("implement me")
122+
}
123+
124+
func (c *conn) LastUsage() time.Time {
125+
//TODO implement me
126+
panic("implement me")
127+
}
128+
129+
func New(ctx context.Context, parent Parent, opts ...Option) (*conn, error) {
130+
s, err := query.CreateSession(ctx, parent.Query())
131+
if err != nil {
132+
return nil, xerrors.WithStackTrace(err)
133+
}
134+
135+
cc := &conn{
136+
ctx: ctx,
137+
parent: parent,
138+
session: s,
139+
clock: clockwork.NewRealClock(),
140+
trace: &trace.DatabaseSQL{},
141+
}
142+
143+
for _, opt := range opts {
144+
if opt != nil {
145+
opt(cc)
146+
}
147+
}
148+
149+
return cc, nil
150+
}

internal/query/conn/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package conn
2+
3+
import "errors"
4+
5+
var (
6+
errConnClosedEarly = errors.New("conn closed early")
7+
)

0 commit comments

Comments
 (0)