Skip to content

Commit e0a6bcb

Browse files
authored
Merge pull request #1413 from ydb-platform/operation-service
* Added operation service client through `db.Operation()` method (sup…
2 parents a7f790a + a7b81d5 commit e0a6bcb

File tree

12 files changed

+466
-88
lines changed

12 files changed

+466
-88
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Removed experimental methods `query.Session.ReadResultSet` and `query.Session.ReadRows`
99
* Removed experimental methods `query.TxActor.ReadResultSet` and `query.TxActor.ReadRows`
1010
* Removed experimental method `query.Client.Stats`
11+
* Added experimental support for operation service client through `db.Operation()` method (supports methods `Get`, `List`, `Cancel` and `Forget`)
1112

1213
## v3.76.6
1314
* Replaced requirements from go1.22 + experimantal flag to go1.23 for experimental range-over interface

driver.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
3939
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
4040
"github.com/ydb-platform/ydb-go-sdk/v3/log"
41+
"github.com/ydb-platform/ydb-go-sdk/v3/operation"
4142
"github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter"
4243
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
4344
"github.com/ydb-platform/ydb-go-sdk/v3/scripting"
@@ -67,6 +68,8 @@ type Driver struct {
6768
discovery *xsync.Once[*internalDiscovery.Client]
6869
discoveryOptions []discoveryConfig.Option
6970

71+
operation *xsync.Once[*operation.Client]
72+
7073
table *xsync.Once[*internalTable.Client]
7174
tableOptions []tableConfig.Option
7275

@@ -148,8 +151,10 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
148151
d.scheme.Close,
149152
d.scripting.Close,
150153
d.table.Close,
154+
d.operation.Close,
151155
d.query.Close,
152156
d.topic.Close,
157+
d.discovery.Close,
153158
d.balancer.Close,
154159
d.pool.Release,
155160
)
@@ -215,6 +220,13 @@ func (d *Driver) Discovery() discovery.Client {
215220
return d.discovery.Must()
216221
}
217222

223+
// Operation returns operation client
224+
//
225+
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
226+
func (d *Driver) Operation() *operation.Client {
227+
return d.operation.Must()
228+
}
229+
218230
// Scripting returns scripting client
219231
func (d *Driver) Scripting() scripting.Client {
220232
return d.scripting.Must()
@@ -505,6 +517,12 @@ func (d *Driver) connect(ctx context.Context) (err error) {
505517
), nil
506518
})
507519

520+
d.operation = xsync.OnceValue(func() (*operation.Client, error) {
521+
return operation.New(xcontext.ValueOnly(ctx),
522+
d.pool.Get(endpoint.New(d.config.Endpoint())),
523+
), nil
524+
})
525+
508526
d.scripting = xsync.OnceValue(func() (*internalScripting.Client, error) {
509527
return internalScripting.New(xcontext.ValueOnly(ctx),
510528
d.balancer,
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package options
2+
3+
import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
4+
5+
type (
6+
ListOperationsRequest struct {
7+
Ydb_Operations.ListOperationsRequest
8+
}
9+
List func(r *ListOperationsRequest)
10+
)

internal/query/client.go

Lines changed: 10 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ import (
55
"sync/atomic"
66
"time"
77

8-
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
98
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
10-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
119
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1210
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1311
"google.golang.org/grpc"
1412

1513
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1614
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
17-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1815
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1916
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
2017
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
@@ -49,37 +46,14 @@ type (
4946
InUse atomic.Int32
5047
}
5148
Client struct {
52-
config *config.Config
53-
queryServiceClient Ydb_Query_V1.QueryServiceClient
54-
operationServiceClient Ydb_Operation_V1.OperationServiceClient
55-
pool sessionPool
49+
config *config.Config
50+
queryServiceClient Ydb_Query_V1.QueryServiceClient
51+
pool sessionPool
5652

5753
done chan struct{}
5854
}
5955
)
6056

61-
func fetchScriptResultsWithFallback(ctx context.Context,
62-
queryServiceClient Ydb_Query_V1.QueryServiceClient,
63-
operationServiceClient Ydb_Operation_V1.OperationServiceClient,
64-
opID string, opts ...options.FetchScriptOption,
65-
) (*options.FetchScriptResult, error) {
66-
r, err := fetchScriptResults(ctx, queryServiceClient, opID, opts...)
67-
if err != nil {
68-
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_REQUEST) {
69-
r, err = scriptOperationStatus(ctx, operationServiceClient, opID)
70-
if err != nil {
71-
return nil, xerrors.WithStackTrace(err)
72-
}
73-
74-
return r, nil
75-
}
76-
77-
return nil, xerrors.WithStackTrace(err)
78-
}
79-
80-
return r, nil
81-
}
82-
8357
func fetchScriptResults(ctx context.Context,
8458
client Ydb_Query_V1.QueryServiceClient,
8559
opID string, opts ...options.FetchScriptOption,
@@ -115,13 +89,9 @@ func fetchScriptResults(ctx context.Context,
11589
}
11690

11791
return &options.FetchScriptResult{
118-
Ready: true,
119-
Status: response.GetStatus().String(),
120-
Data: &options.FetchScriptResultData{
121-
ResultSetIndex: response.GetResultSetIndex(),
122-
ResultSet: MaterializedResultSet(int(response.GetResultSetIndex()), columnNames, columnTypes, rows),
123-
NextToken: response.GetNextFetchToken(),
124-
},
92+
ResultSetIndex: response.GetResultSetIndex(),
93+
ResultSet: MaterializedResultSet(int(response.GetResultSetIndex()), columnNames, columnTypes, rows),
94+
NextToken: response.GetNextFetchToken(),
12595
}, nil
12696
}, retry.WithIdempotent(true))
12797
if err != nil {
@@ -131,40 +101,11 @@ func fetchScriptResults(ctx context.Context,
131101
return r, nil
132102
}
133103

134-
func scriptOperationStatus(
135-
ctx context.Context, client Ydb_Operation_V1.OperationServiceClient, opID string,
136-
) (*options.FetchScriptResult, error) {
137-
status, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
138-
response, err := client.GetOperation(conn.WithoutWrapping(ctx), &Ydb_Operations.GetOperationRequest{
139-
Id: opID,
140-
})
141-
if err != nil {
142-
return nil, xerrors.WithStackTrace(err)
143-
}
144-
145-
var md Ydb_Query.ExecuteScriptMetadata
146-
err = response.GetOperation().GetMetadata().UnmarshalTo(&md)
147-
if err != nil {
148-
return nil, xerrors.WithStackTrace(err)
149-
}
150-
151-
return &options.FetchScriptResult{
152-
Ready: response.GetOperation().GetReady(),
153-
Status: response.GetOperation().GetStatus().String(),
154-
}, nil
155-
})
156-
if err != nil {
157-
return nil, xerrors.WithStackTrace(err)
158-
}
159-
160-
return status, nil
161-
}
162-
163104
func (c *Client) FetchScriptResults(ctx context.Context,
164105
opID string, opts ...options.FetchScriptOption,
165106
) (*options.FetchScriptResult, error) {
166107
r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
167-
r, err := fetchScriptResultsWithFallback(ctx, c.queryServiceClient, c.operationServiceClient, opID,
108+
r, err := fetchScriptResults(ctx, c.queryServiceClient, opID,
168109
append(opts, func(request *options.FetchScriptResultsRequest) {
169110
request.Trace = c.config.Trace()
170111
})...,
@@ -688,10 +629,9 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
688629
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
689630

690631
client := &Client{
691-
config: cfg,
692-
queryServiceClient: grpcClient,
693-
operationServiceClient: Ydb_Operation_V1.NewOperationServiceClient(balancer),
694-
done: make(chan struct{}),
632+
config: cfg,
633+
queryServiceClient: grpcClient,
634+
done: make(chan struct{}),
695635
pool: newPool(ctx, cfg, func(ctx context.Context) (_ *Session, err error) {
696636
var (
697637
createCtx context.Context

internal/query/client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,10 +1263,10 @@ func TestQueryScript(t *testing.T) {
12631263
require.EqualValues(t, "123", op.ID)
12641264
r, err := fetchScriptResults(ctx, service, op.ID)
12651265
require.NoError(t, err)
1266-
require.EqualValues(t, 0, r.Data.ResultSetIndex)
1267-
require.Equal(t, "456", r.Data.NextToken)
1268-
require.NotNil(t, r.Data.ResultSet)
1269-
row, err := r.Data.ResultSet.NextRow(ctx)
1266+
require.EqualValues(t, 0, r.ResultSetIndex)
1267+
require.Equal(t, "456", r.NextToken)
1268+
require.NotNil(t, r.ResultSet)
1269+
row, err := r.ResultSet.NextRow(ctx)
12701270
require.NoError(t, err)
12711271
var (
12721272
a int

internal/query/options/execute_script.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,11 @@ type (
3636
ConsumedUnits float64
3737
Metadata ExecuteScriptOperationMetadata
3838
}
39-
FetchScriptResultData struct {
39+
FetchScriptResult struct {
4040
ResultSetIndex int64
4141
ResultSet result.Set
4242
NextToken string
4343
}
44-
FetchScriptResult struct {
45-
Ready bool
46-
Status string
47-
Data *FetchScriptResultData
48-
}
4944
)
5045

5146
func WithFetchToken(fetchToken string) FetchScriptOption {

0 commit comments

Comments
 (0)