Skip to content

Commit 2f1e034

Browse files
committed
init write impl
1 parent f538d0e commit 2f1e034

File tree

3 files changed

+30
-67
lines changed

3 files changed

+30
-67
lines changed

examples/topic/topicwriter/topic_writer_transaction.go

Lines changed: 12 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ package topicwriter
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"strings"
87

98
"github.com/ydb-platform/ydb-go-sdk/v3"
109
"github.com/ydb-platform/ydb-go-sdk/v3/query"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
1212
)
1313

14-
func SendMessageWithinTransaction(ctx context.Context, db *ydb.Driver, writer *topicwriter.Writer, id int64) error {
14+
func TableAndTopicWithinTransaction(ctx context.Context, db *ydb.Driver, writer *topicwriter.Writer, id int64) error {
1515
return db.Query().DoTx(ctx, func(ctx context.Context, t query.TxActor) error {
16-
row, err := t.ReadRow(ctx, "SELECT val FROM table WHERE id=$id", query.WithParameters(
16+
row, err := t.QueryRow(ctx, "SELECT val FROM table WHERE id=$id", query.WithParameters(
1717
ydb.ParamsBuilder().
1818
Param("$id").Int64(id).
1919
Build()))
@@ -36,68 +36,18 @@ func SendMessageWithinTransaction(ctx context.Context, db *ydb.Driver, writer *t
3636
})
3737
}
3838

39-
func SendWithRecreateWriter(
40-
ctx context.Context,
41-
db *ydb.Driver,
42-
writerFabric func(ctx context.Context, db *ydb.Driver) (*topicwriter.Writer, error),
43-
ids <-chan int64,
44-
) error {
45-
// second loop
46-
var lastData int64
47-
processed := true
48-
return db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
49-
writer, err := writerFabric(ctx, db)
39+
func CopyMessagesBetweenTopics(ctx context.Context, db *ydb.Driver, reader *topicreader.Reader, writer *topicwriter.Writer) error {
40+
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
41+
batch, err := reader.PopMessagesBatchTx(ctx, tx)
5042
if err != nil {
5143
return err
5244
}
53-
defer func() { _ = writer.Close(ctx) }()
5445

55-
for {
56-
var id int64
57-
if processed {
58-
select {
59-
case <-ctx.Done():
60-
return ctx.Err()
61-
case val, ok := <-ids:
62-
if !ok {
63-
return io.EOF
64-
}
65-
lastData = val
66-
processed = false
67-
}
68-
}
69-
70-
id = lastData
71-
t, err := s.Begin(ctx, nil)
72-
if err != nil {
73-
return err
74-
}
75-
76-
row, err := t.ReadRow(ctx, "SELECT val FROM table WHERE id=$id", query.WithParameters(
77-
ydb.ParamsBuilder().
78-
Param("$id").Int64(id).
79-
Build()))
80-
if err != nil {
81-
return err
82-
}
83-
84-
var val int64
85-
if err = row.Scan(&val); err != nil {
86-
return err
87-
}
88-
89-
err = writer.WriteWithTx(ctx, t, topicwriter.Message{
90-
Data: strings.NewReader(fmt.Sprintf("val: %v processed", val)),
91-
})
92-
if err != nil {
93-
return err
94-
}
95-
96-
err = t.CommitTx(ctx)
97-
if err != nil {
98-
return err
99-
}
100-
processed = true
46+
sendMessages := make([]topicwriter.Message, len(batch.Messages))
47+
for i, mess := range batch.Messages {
48+
sendMessages[i] = topicwriter.Message{Data: mess}
10149
}
102-
})
50+
51+
return writer.WriteWithTx(ctx, tx, sendMessages...)
52+
}, query.WithIdempotent())
10353
}

internal/topic/topicwriterinternal/writer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
1011
)
1112

1213
//go:generate mockgen -destination raw_topic_writer_stream_mock_test.go --typed -package topicwriterinternal -write_package_comment=false github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal RawTopicWriterStream
@@ -48,6 +49,11 @@ func (w *Writer) Write(ctx context.Context, messages ...PublicMessage) error {
4849
return w.streamWriter.Write(ctx, messages)
4950
}
5051

52+
func (w *Writer) WriteWithTx(ctx context.Context, tx tx.Transaction, messages ...PublicMessage) error {
53+
//TODO implement me
54+
panic("implement me")
55+
}
56+
5157
func (w *Writer) WaitInit(ctx context.Context) (info InitialInfo, err error) {
5258
return w.streamWriter.WaitInit(ctx)
5359
}

topic/topicwriter/topicwriter.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package topicwriter
22

33
import (
44
"context"
5+
"errors"
6+
57
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
69

710
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"
811
)
@@ -11,7 +14,10 @@ type (
1114
Message = topicwriterinternal.PublicMessage
1215
)
1316

14-
var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
17+
var (
18+
ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
19+
errBadTxId = xerrors.Wrap(errors.New("ydb: bad transaction type, supported transactions from query service only"))
20+
)
1521

1622
// Writer represent write session to topic
1723
// It handles connection problems, reconnect to server when need and resend buffered messages
@@ -45,11 +51,12 @@ func (w *Writer) Write(ctx context.Context, messages ...Message) error {
4551
}
4652

4753
func (w *Writer) WriteWithTx(ctx context.Context, transaction tx.Identifier, messages ...Message) error {
48-
panic("not impl")
49-
}
54+
internalTx, ok := transaction.(tx.Transaction)
55+
if !ok {
56+
return xerrors.WithStackTrace(errBadTxId)
57+
}
5058

51-
func (w *Writer) WriteOpts(ctx context.Context, messages []Message, opts ...WriteOption) error {
52-
panic("not impl")
59+
return w.inner.WriteWithTx(ctx, internalTx, messages...)
5360
}
5461

5562
// WaitInit waits until the reader is initialized

0 commit comments

Comments
 (0)