Skip to content

Commit 4422b48

Browse files
authored
Merge pull request #799 from cicoyle/feat-bulk-pubsub-stable
Bulk PubSub is Stable + update examples to always run the latest rc
2 parents d6afe09 + 1112e61 commit 4422b48

File tree

7 files changed

+35
-16
lines changed

7 files changed

+35
-16
lines changed

.github/workflows/validate_examples.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,15 @@ jobs:
6969
- name: Run go mod tidy check diff
7070
run: make modtidy check-diff
7171

72-
- name: Determine latest Dapr Runtime version
72+
- name: Determine latest Dapr Runtime version (including prerelease)
7373
run: |
74-
RUNTIME_VERSION=$(curl -s "https://api.github.com/repos/dapr/dapr/releases/latest" | grep '"tag_name"' | cut -d ':' -f2 | tr -d '",v')
74+
RUNTIME_VERSION=$(curl -s "https://api.github.com/repos/dapr/dapr/releases" | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ')
7575
echo "DAPR_RUNTIME_VER=$RUNTIME_VERSION" >> $GITHUB_ENV
7676
echo "Found $RUNTIME_VERSION"
7777
78-
- name: Determine latest Dapr Cli version
78+
- name: Determine latest Dapr Cli version (including prerelease)
7979
run: |
80-
CLI_VERSION=$(curl -s "https://api.github.com/repos/dapr/cli/releases/latest" | grep '"tag_name"' | cut -d ':' -f2 | tr -d '",v')
80+
CLI_VERSION=$(curl -s "https://api.github.com/repos/dapr/cli/releases" | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ')
8181
echo "DAPR_CLI_VER=$CLI_VERSION" >> $GITHUB_ENV
8282
echo "Found $CLI_VERSION"
8383

client/client_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,18 @@ func (s *testDaprServer) PublishEvent(ctx context.Context, req *pb.PublishEventR
383383
return &emptypb.Empty{}, nil
384384
}
385385

386+
func (s *testDaprServer) BulkPublishEvent(ctx context.Context, req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) {
387+
return s.bulkPublishEvent(req)
388+
}
389+
386390
// BulkPublishEventAlpha1 mocks the BulkPublishEventAlpha1 API.
391+
func (s *testDaprServer) BulkPublishEventAlpha1(ctx context.Context, req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) {
392+
return s.bulkPublishEvent(req)
393+
}
394+
387395
// It will fail to publish events that start with "fail".
388396
// It will fail the entire request if an event starts with "failall".
389-
func (s *testDaprServer) BulkPublishEventAlpha1(ctx context.Context, req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) {
397+
func (s *testDaprServer) bulkPublishEvent(req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) {
390398
failedEntries := make([]*pb.BulkPublishResponseFailedEntry, 0)
391399
for _, entry := range req.GetEntries() {
392400
if bytes.HasPrefix(entry.GetEvent(), []byte("failall")) {

client/pubsub.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"log"
2222

2323
"github.com/google/uuid"
24+
"google.golang.org/grpc/codes"
25+
"google.golang.org/grpc/status"
2426

2527
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
2628
)
@@ -169,12 +171,15 @@ func (c *GRPCClient) PublishEvents(ctx context.Context, pubsubName, topicName st
169171
o(request)
170172
}
171173

172-
//nolint:staticcheck // SA1019 Deprecated: use BulkPublishEvent instead.
173-
res, err := c.protoClient.BulkPublishEventAlpha1(ctx, request)
174+
res, err := c.protoClient.BulkPublishEvent(ctx, request)
175+
if err != nil && status.Code(err) == codes.Unimplemented {
176+
//nolint:staticcheck // SA1019 Deprecated: use BulkPublishEvent instead.
177+
res, err = c.protoClient.BulkPublishEventAlpha1(ctx, request)
178+
}
174179
// If there is an error, all events failed to publish.
175180
if err != nil {
176181
return PublishEventsResponse{
177-
Error: fmt.Errorf("error publishing events unto %s topic: %w", topicName, err),
182+
Error: fmt.Errorf("error publishing events onto %s topic: %w", topicName, err),
178183
FailedEvents: events,
179184
}
180185
}

examples/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ replace github.com/dapr/go-sdk => ../
66

77
require (
88
github.com/alecthomas/kingpin/v2 v2.4.0
9-
github.com/dapr/durabletask-go v0.10.2-0.20251203182905-2c611fb434fd
9+
github.com/dapr/durabletask-go v0.10.2-0.20260114164104-9ddc9d1ebc1f
1010
github.com/dapr/go-sdk v0.0.0-00010101000000-000000000000
1111
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d
1212
github.com/go-redis/redis/v8 v8.11.5
@@ -20,7 +20,7 @@ require (
2020
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
2121
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2222
github.com/cespare/xxhash/v2 v2.3.0 // indirect
23-
github.com/dapr/dapr v1.16.1-rc.3.0.20260106102127-f224ba75d67c // indirect
23+
github.com/dapr/dapr v1.17.0-rc.1.0.20260119144134-6071c46179eb // indirect
2424
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2525
github.com/go-chi/chi/v5 v5.2.2 // indirect
2626
github.com/go-logr/logr v1.4.3 // indirect

examples/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
66
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
77
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
9-
github.com/dapr/dapr v1.16.1-rc.3.0.20260106102127-f224ba75d67c h1:QJiz7j7BE+fW1OiAs4F8xaeEYvoXKKdB+U25QTDpvD0=
10-
github.com/dapr/dapr v1.16.1-rc.3.0.20260106102127-f224ba75d67c/go.mod h1:9QwK6oaEJ+AtG6Y4MN///YZ9RBf8qEEa9VbFkPBzH/c=
11-
github.com/dapr/durabletask-go v0.10.2-0.20251203182905-2c611fb434fd h1:2BWSiHoytO5FXK2WCE02AfGNbC51iNewdFoC8j0FiVE=
12-
github.com/dapr/durabletask-go v0.10.2-0.20251203182905-2c611fb434fd/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
9+
github.com/dapr/dapr v1.17.0-rc.1.0.20260119144134-6071c46179eb h1:5JmhSW6atGnkJsVxNbljZjpV/MtAw3jYk2y+6Qe6OFI=
10+
github.com/dapr/dapr v1.17.0-rc.1.0.20260119144134-6071c46179eb/go.mod h1:A097WTI5M7HVu+Zgjo62q+dFq04Q2ECyq3zMNq9+FJ8=
11+
github.com/dapr/durabletask-go v0.10.2-0.20260114164104-9ddc9d1ebc1f h1:zRnSR4IgzhzKLTwcIV6ETcDwkuWTeGmeuJy7Jrw4Txw=
12+
github.com/dapr/durabletask-go v0.10.2-0.20260114164104-9ddc9d1ebc1f/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
1313
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d h1:csljij9d1IO6u9nqbg+TuSRmTZ+OXT8G49yh6zie1yI=
1414
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM=
1515
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

examples/pubsub/pub/pub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func main() {
4545

4646
// Publish multiple events
4747
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {
48-
panic(err)
48+
panic(res.Error)
4949
}
5050

5151
fmt.Println("data published")

service/grpc/topic.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"mime"
2222
"strings"
2323

24+
"google.golang.org/grpc/codes"
2425
"google.golang.org/grpc/metadata"
26+
"google.golang.org/grpc/status"
2527
"google.golang.org/protobuf/types/known/emptypb"
2628

2729
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
@@ -180,6 +182,10 @@ func getCustomMetadataFromContext(ctx context.Context) map[string]string {
180182
return md
181183
}
182184

185+
func (s *Server) OnBulkTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventBulkRequest) (*runtimev1pb.TopicEventBulkResponse, error) {
186+
return nil, status.Error(codes.Unimplemented, "bulk pubsub callback is not supported")
187+
}
188+
183189
func (s *Server) OnBulkTopicEventAlpha1(ctx context.Context, in *runtimev1pb.TopicEventBulkRequest) (*runtimev1pb.TopicEventBulkResponse, error) {
184-
panic("This API callback is not supported.")
190+
return s.OnBulkTopicEvent(ctx, in)
185191
}

0 commit comments

Comments
 (0)