Skip to content

Commit 90e79ae

Browse files
[Go SDK]: Implement natsio.Read transform for reading from NATS (apache#29410)
1 parent d59e192 commit 90e79ae

File tree

14 files changed

+1020
-4
lines changed

14 files changed

+1020
-4
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
6969
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
7070
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
71+
* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).
7172

7273
## New Features / Improvements
7374

sdks/go/pkg/beam/io/natsio/common.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ type natsFn struct {
3131
}
3232

3333
func (fn *natsFn) Setup() error {
34+
if fn.nc != nil && fn.js != nil {
35+
return nil
36+
}
37+
3438
var opts []nats.Option
3539
if fn.CredsFile != "" {
3640
opts = append(opts, nats.UserCredentials(fn.CredsFile))
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package natsio
17+
18+
import (
19+
"context"
20+
"errors"
21+
"fmt"
22+
23+
"github.com/nats-io/nats.go/jetstream"
24+
)
25+
26+
type endEstimator struct {
27+
js jetstream.JetStream
28+
stream string
29+
subject string
30+
}
31+
32+
func newEndEstimator(js jetstream.JetStream, stream string, subject string) *endEstimator {
33+
return &endEstimator{
34+
js: js,
35+
stream: stream,
36+
subject: subject,
37+
}
38+
}
39+
40+
func (e *endEstimator) Estimate() int64 {
41+
ctx := context.Background()
42+
end, err := e.getEndSeqNo(ctx)
43+
if err != nil {
44+
panic(err)
45+
}
46+
return end
47+
}
48+
49+
func (e *endEstimator) getEndSeqNo(ctx context.Context) (int64, error) {
50+
str, err := e.js.Stream(ctx, e.stream)
51+
if err != nil {
52+
return -1, fmt.Errorf("error getting stream: %v", err)
53+
}
54+
55+
msg, err := str.GetLastMsgForSubject(ctx, e.subject)
56+
if err != nil {
57+
if isMessageNotFound(err) {
58+
return 1, nil
59+
}
60+
61+
return -1, fmt.Errorf("error getting last message: %v", err)
62+
}
63+
64+
return int64(msg.Sequence) + 1, nil
65+
}
66+
67+
func isMessageNotFound(err error) bool {
68+
var jsErr jetstream.JetStreamError
69+
if errors.As(err, &jsErr) {
70+
apiErr := jsErr.APIError()
71+
if apiErr.ErrorCode == jetstream.JSErrCodeMessageNotFound {
72+
return true
73+
}
74+
}
75+
76+
return false
77+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package natsio
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"testing"
22+
23+
"github.com/nats-io/nats.go"
24+
)
25+
26+
func Test_endEstimator_Estimate(t *testing.T) {
27+
tests := []struct {
28+
name string
29+
msgs []*nats.Msg
30+
subject string
31+
want int64
32+
}{
33+
{
34+
name: "Estimate end for published messages",
35+
msgs: []*nats.Msg{
36+
{
37+
Subject: "subject.1",
38+
Data: []byte("msg1"),
39+
},
40+
{
41+
Subject: "subject.1",
42+
Data: []byte("msg2"),
43+
},
44+
{
45+
Subject: "subject.2",
46+
Data: []byte("msg3"),
47+
},
48+
},
49+
subject: "subject.1",
50+
want: 3,
51+
},
52+
{
53+
name: "Estimate end for no published messages",
54+
subject: "subject.1",
55+
want: 1,
56+
},
57+
}
58+
for i, tt := range tests {
59+
t.Run(tt.name, func(t *testing.T) {
60+
ctx := context.Background()
61+
srv := newServer(t)
62+
url := srv.ClientURL()
63+
conn := newConn(t, url)
64+
js := newJetStream(t, conn)
65+
66+
stream := fmt.Sprintf("STREAM-%d", i)
67+
subjectFilter := "subject.*"
68+
69+
createStream(ctx, t, js, stream, []string{subjectFilter})
70+
publishMessages(ctx, t, js, tt.msgs)
71+
72+
estimator := newEndEstimator(js, stream, tt.subject)
73+
if got := estimator.Estimate(); got != tt.want {
74+
t.Fatalf("Estimate() = %v, want %v", got, tt.want)
75+
}
76+
})
77+
}
78+
}

sdks/go/pkg/beam/io/natsio/example_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,27 @@ import (
2222
"github.com/apache/beam/sdks/v2/go/pkg/beam"
2323
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/natsio"
2424
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
25+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
2526
"github.com/nats-io/nats.go"
2627
)
2728

29+
func ExampleRead() {
30+
beam.Init()
31+
32+
p, s := beam.NewPipelineWithRoot()
33+
34+
uri := "nats://localhost:4222"
35+
stream := "EVENTS"
36+
subject := "events.*"
37+
38+
col := natsio.Read(s, uri, stream, subject)
39+
debug.Print(s, col)
40+
41+
if err := beamx.Run(context.Background(), p); err != nil {
42+
log.Fatalf("Failed to execute job: %v", err)
43+
}
44+
}
45+
2846
func ExampleWrite() {
2947
beam.Init()
3048

sdks/go/pkg/beam/io/natsio/helper_test.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package natsio
1818
import (
1919
"context"
2020
"testing"
21+
"time"
2122

2223
"github.com/nats-io/nats-server/v2/server"
2324
"github.com/nats-io/nats-server/v2/test"
@@ -62,8 +63,8 @@ func newJetStream(t *testing.T, conn *nats.Conn) jetstream.JetStream {
6263
}
6364

6465
func createStream(
65-
t *testing.T,
6666
ctx context.Context,
67+
t *testing.T,
6768
js jetstream.JetStream,
6869
stream string,
6970
subjects []string,
@@ -89,8 +90,8 @@ func createStream(
8990
}
9091

9192
func createConsumer(
92-
t *testing.T,
9393
ctx context.Context,
94+
t *testing.T,
9495
js jetstream.JetStream,
9596
stream string,
9697
subjects []string,
@@ -128,3 +129,46 @@ func fetchMessages(t *testing.T, cons jetstream.Consumer, size int) []jetstream.
128129

129130
return result
130131
}
132+
133+
func publishMessages(ctx context.Context, t *testing.T, js jetstream.JetStream, msgs []*nats.Msg) {
134+
t.Helper()
135+
136+
for _, msg := range msgs {
137+
if _, err := js.PublishMsg(ctx, msg); err != nil {
138+
t.Fatalf("Failed to publish message: %v", err)
139+
}
140+
}
141+
}
142+
143+
func messagesWithPublishingTime(
144+
t *testing.T,
145+
pubMsgs []jetstream.Msg,
146+
pubIndices []int,
147+
want []any,
148+
) []any {
149+
t.Helper()
150+
151+
wantWTime := make([]any, len(want))
152+
153+
for i := range want {
154+
pubIdx := pubIndices[i]
155+
pubMsg := pubMsgs[pubIdx]
156+
157+
wantMsg := want[i].(ConsumerMessage)
158+
wantMsg.PublishingTime = messageTimestamp(t, pubMsg)
159+
wantWTime[i] = wantMsg
160+
}
161+
162+
return wantWTime
163+
}
164+
165+
func messageTimestamp(t *testing.T, msg jetstream.Msg) time.Time {
166+
t.Helper()
167+
168+
metadata, err := msg.Metadata()
169+
if err != nil {
170+
t.Fatalf("Failed to retrieve metadata: %v", err)
171+
}
172+
173+
return metadata.Timestamp
174+
}

0 commit comments

Comments
 (0)