Skip to content

Commit 4ef65c7

Browse files
committed
Merge branch '3318-RavenDB-state-store-new' of github.com-n-personal:nmalocic/components-contrib into 3318-RavenDB-state-store-new
2 parents 7576537 + bd5858b commit 4ef65c7

File tree

9 files changed

+285
-13
lines changed

9 files changed

+285
-13
lines changed

.github/pull_request_template.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ Please make sure you've completed the relevant tasks for this PR, out of the fol
1414

1515
* [ ] Code compiles correctly
1616
* [ ] Created/updated tests
17-
* [ ] Extended the documentation / Created issue in the https://github.com/dapr/docs/ repo: dapr/docs#_[issue number]_
17+
* [ ] Extended the documentation
18+
* [ ] Created the dapr/docs PR: <insert PR link here>
19+
20+
**Note:** We expect contributors to open a corresponding documentation PR in the [dapr/docs](https://github.com/dapr/docs/) repository. As the implementer, you are the best person to document your work! Implementation PRs will not be merged until the documentation PR is opened and ready for review.

common/proto/state/sqlserver/test.pb.go

Lines changed: 162 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
syntax = "proto3";
2+
3+
option go_package = "github.com/dapr/components-contrib/common/proto/state/sqlserver";
4+
5+
import "google/protobuf/timestamp.proto";
6+
7+
message TestEvent {
8+
int32 eventId = 1;
9+
google.protobuf.Timestamp timestamp = 2;
10+
}

pubsub/pulsar/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type pulsarMetadata struct {
3939
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
4040
SubscriptionType string `mapstructure:"subscribeType"`
4141
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
42+
ReplicateSubscriptionState bool `mapstructure:"replicateSubscriptionState"`
4243
SubscriptionMode string `mapstructure:"subscribeMode"`
4344
Token string `mapstructure:"token"`
4445
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`

pubsub/pulsar/metadata.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,17 @@ metadata:
202202
url:
203203
title: "Pulsar SubscriptionInitialPosition"
204204
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
205+
- name: replicateSubscriptionState
206+
type: bool
207+
description: |
208+
Enable replication of subscription state across geo-replicated Pulsar clusters.
209+
When enabled, subscription state (such as cursor positions and acknowledgments) will be replicated to other clusters in a geo-replicated setup.
210+
This is useful for maintaining subscription consistency during cluster failovers.
211+
default: 'false'
212+
example: '"true", "false"'
213+
url:
214+
title: "Pulsar Geo-Replication"
215+
url: "https://pulsar.apache.org/docs/administration-geo/"
205216
- name: subscribeMode
206217
type: string
207218
description: |
@@ -210,4 +221,4 @@ metadata:
210221
example: '"durable"'
211222
url:
212223
title: "Pulsar SubscriptionMode"
213-
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode"
224+
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionMode"

pubsub/pulsar/pulsar.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
509509
MessageChannel: channel,
510510
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
511511
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
512+
ReplicateSubscriptionState: p.metadata.ReplicateSubscriptionState,
512513
}
513514

514515
// Handle KeySharedPolicy for key_shared subscription type

pubsub/pulsar/pulsar_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,48 @@ func TestEncryptionKeys(t *testing.T) {
605605
})
606606
}
607607

608+
func TestParsePulsarMetadataReplicateSubscriptionState(t *testing.T) {
609+
tt := []struct {
610+
name string
611+
replicateSubscriptionState string
612+
expected bool
613+
}{
614+
{
615+
name: "test replicateSubscriptionState true",
616+
replicateSubscriptionState: "true",
617+
expected: true,
618+
},
619+
{
620+
name: "test replicateSubscriptionState false",
621+
replicateSubscriptionState: "false",
622+
expected: false,
623+
},
624+
{
625+
name: "test replicateSubscriptionState empty (defaults to false)",
626+
replicateSubscriptionState: "",
627+
expected: false,
628+
},
629+
}
630+
631+
for _, tc := range tt {
632+
t.Run(tc.name, func(t *testing.T) {
633+
m := pubsub.Metadata{}
634+
m.Properties = map[string]string{
635+
"host": "a",
636+
}
637+
638+
if tc.replicateSubscriptionState != "" {
639+
m.Properties["replicateSubscriptionState"] = tc.replicateSubscriptionState
640+
}
641+
642+
meta, err := parsePulsarMetadata(m)
643+
644+
require.NoError(t, err)
645+
assert.Equal(t, tc.expected, meta.ReplicateSubscriptionState)
646+
})
647+
}
648+
}
649+
608650
func TestSanitiseURL(t *testing.T) {
609651
tests := []struct {
610652
name string

state/sqlserver/sqlserver.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package sqlserver
1616
import (
1717
"context"
1818
"database/sql"
19+
"encoding/base64"
1920
"encoding/hex"
2021
"encoding/json"
2122
"errors"
@@ -287,8 +288,15 @@ func (s *SQLServer) Get(ctx context.Context, req *state.GetRequest) (*state.GetR
287288
}
288289
}
289290

291+
bytes, err := base64.StdEncoding.DecodeString(data)
292+
if err != nil {
293+
s.logger.
294+
WithFields(map[string]any{"error": err}).
295+
Debug("error decoding base64 data. Fallback to []byte")
296+
bytes = []byte(data)
297+
}
290298
return &state.GetResponse{
291-
Data: []byte(data),
299+
Data: bytes,
292300
ETag: ptr.Of(etag),
293301
Metadata: metadata,
294302
}, nil
@@ -305,16 +313,23 @@ type dbExecutor interface {
305313
}
306314

307315
func (s *SQLServer) executeSet(ctx context.Context, db dbExecutor, req *state.SetRequest) error {
308-
var err error
309-
var bytes []byte
310-
bytes, err = utils.Marshal(req.Value, json.Marshal)
311-
if err != nil {
312-
return err
316+
var reqValue string
317+
318+
bytes, ok := req.Value.([]byte)
319+
if !ok {
320+
bt, err := json.Marshal(req.Value)
321+
if err != nil {
322+
return err
323+
}
324+
reqValue = string(bt)
325+
} else {
326+
reqValue = base64.StdEncoding.EncodeToString(bytes)
313327
}
328+
314329
etag := sql.Named(rowVersionColumnName, nil)
315330
if req.HasETag() {
316331
var b []byte
317-
b, err = hex.DecodeString(*req.ETag)
332+
b, err := hex.DecodeString(*req.ETag)
318333
if err != nil {
319334
return state.NewETagError(state.ETagInvalid, err)
320335
}
@@ -327,13 +342,14 @@ func (s *SQLServer) executeSet(ctx context.Context, db dbExecutor, req *state.Se
327342
}
328343

329344
var res sql.Result
345+
var err error
330346
if req.Options.Concurrency == state.FirstWrite {
331347
res, err = db.ExecContext(ctx, s.upsertCommand, sql.Named(keyColumnName, req.Key),
332-
sql.Named("Data", string(bytes)), etag,
348+
sql.Named("Data", reqValue), etag,
333349
sql.Named("FirstWrite", 1), sql.Named("TTL", ttl))
334350
} else {
335351
res, err = db.ExecContext(ctx, s.upsertCommand, sql.Named(keyColumnName, req.Key),
336-
sql.Named("Data", string(bytes)), etag,
352+
sql.Named("Data", reqValue), etag,
337353
sql.Named("FirstWrite", 0), sql.Named("TTL", ttl))
338354
}
339355

0 commit comments

Comments
 (0)