Skip to content

Commit e737071

Browse files
authored
Merge pull request #1330 Improve topic reader config validation
2 parents a424af4 + f452b34 commit e737071

File tree

3 files changed

+32
-18
lines changed

3 files changed

+32
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Improve config validation before start topic reader
12
* Added metrics over `db.Table().Do()` and `db.Table().DoTx()`
23
* Added method `ydb.ParamsBuilder().Param(name).Any(value)` to add custom `types.Value`
34
* Upgraded dependencies:

internal/topic/topicreaderinternal/reader.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ var (
2222
errors.New("ydb: first connection attempt not finished"),
2323
))
2424
errReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
25-
errUnexpectedEmptyConsumername = xerrors.Wrap(errors.New("ydb: create ydb reader with empty consumer name. Set one of: consumer name or option WithReaderWithoutConsumer")) //nolint:lll
2625
errSetConsumerAndNoConsumer = xerrors.Wrap(errors.New("ydb: reader has non empty consumer name and set option WithReaderWithoutConsumer. Only one of them must be set")) //nolint:lll
27-
errCantCommitWithoutConsumer = xerrors.Wrap(errors.New("ydb: reader can't commit messages without consumer"))
2826
errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader"))
2927
)
3028

@@ -88,8 +86,11 @@ func NewReader(
8886
) (Reader, error) {
8987
cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...)
9088

91-
if err := cfg.Validate(); err != nil {
92-
return Reader{}, err
89+
if errs := cfg.Validate(); len(errs) > 0 {
90+
return Reader{}, xerrors.WithStackTrace(fmt.Errorf(
91+
"ydb: failed to start topic reader, because is contains error in config: %w",
92+
errors.Join(errs...),
93+
))
9394
}
9495

9596
readerID := nextReaderID()
@@ -234,20 +235,6 @@ type ReaderConfig struct {
234235
topicStreamReaderConfig
235236
}
236237

237-
func (c *ReaderConfig) Validate() error {
238-
if c.Consumer != "" && c.ReadWithoutConsumer {
239-
return xerrors.WithStackTrace(errSetConsumerAndNoConsumer)
240-
}
241-
if c.Consumer == "" && !c.ReadWithoutConsumer {
242-
return xerrors.WithStackTrace(errUnexpectedEmptyConsumername)
243-
}
244-
if c.ReadWithoutConsumer && c.CommitMode != CommitModeNone {
245-
return xerrors.WithStackTrace(errCantCommitWithoutConsumer)
246-
}
247-
248-
return nil
249-
}
250-
251238
type PublicReaderOption func(cfg *ReaderConfig)
252239

253240
func WithCredentials(cred credentials.Credentials) PublicReaderOption {

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ var (
2828
PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
2929

3030
errCommitWithNilPartitionSession = xerrors.Wrap(errors.New("ydb: commit with nil partition session"))
31+
errUnexpectedEmptyConsumerName = xerrors.Wrap(errors.New("ydb: create ydb reader with empty consumer name. Set one of: consumer name or option WithReaderWithoutConsumer")) //nolint:lll
32+
errCantCommitWithoutConsumer = xerrors.Wrap(errors.New("ydb: reader can't commit messages without consumer"))
33+
errBufferSize = xerrors.Wrap(errors.New("ydb: buffer of topic reader must be greater than zero, see option topicoptions.WithReaderBufferSizeBytes")) //nolint:lll
34+
errTopicSelectorsEmpty = xerrors.Wrap(errors.New("ydb: topic selector for topic reader is empty, see arguments on topic starts")) //nolint:lll
3135
)
3236

3337
type partitionSessionID = rawtopicreader.PartitionSessionID
@@ -86,6 +90,28 @@ func newTopicStreamReaderConfig() topicStreamReaderConfig {
8690
}
8791
}
8892

93+
func (cfg *topicStreamReaderConfig) Validate() []error {
94+
var validateErrors []error
95+
96+
if cfg.Consumer != "" && cfg.ReadWithoutConsumer {
97+
validateErrors = append(validateErrors, errSetConsumerAndNoConsumer)
98+
}
99+
if cfg.Consumer == "" && !cfg.ReadWithoutConsumer {
100+
validateErrors = append(validateErrors, errUnexpectedEmptyConsumerName)
101+
}
102+
if cfg.ReadWithoutConsumer && cfg.CommitMode != CommitModeNone {
103+
validateErrors = append(validateErrors, errCantCommitWithoutConsumer)
104+
}
105+
if cfg.BufferSizeProtoBytes <= 0 {
106+
validateErrors = append(validateErrors, errBufferSize)
107+
}
108+
if len(cfg.ReadSelectors) == 0 {
109+
validateErrors = append(validateErrors, errTopicSelectorsEmpty)
110+
}
111+
112+
return validateErrors
113+
}
114+
89115
func (cfg *topicStreamReaderConfig) initMessage() *rawtopicreader.InitRequest {
90116
res := &rawtopicreader.InitRequest{
91117
Consumer: cfg.Consumer,

0 commit comments

Comments
 (0)