@@ -11,6 +11,11 @@ package base
1111import (
1212 "context"
1313 "errors"
14+ "fmt"
15+ "net/url"
16+ "os"
17+ "runtime"
18+ "strings"
1419
1520 sgbucket "github.com/couchbase/sg-bucket"
1621 "github.com/couchbaselabs/rosmar"
@@ -20,90 +25,145 @@ var _ BootstrapConnection = &RosmarCluster{}
2025
2126// RosmarCluster implements BootstrapConnection and is used for connecting to a rosmar cluster
2227type RosmarCluster struct {
23- serverURL string
28+ serverURL string
29+ bucketDirectory string
2430}
2531
2632// NewRosmarCluster creates a from a given URL
27- func NewRosmarCluster (serverURL string ) * RosmarCluster {
28- return & RosmarCluster {
33+ func NewRosmarCluster (serverURL string ) ( * RosmarCluster , error ) {
34+ cluster := & RosmarCluster {
2935 serverURL : serverURL ,
3036 }
37+ if serverURL != rosmar .InMemoryURL {
38+ u , err := url .Parse (serverURL )
39+ if err != nil {
40+ return nil , err
41+ }
42+ directory := u .Path
43+ if runtime .GOOS == "windows" {
44+ directory = strings .TrimPrefix (directory , "/" )
45+ }
46+ err = os .MkdirAll (directory , 0700 )
47+ if err != nil {
48+ return nil , fmt .Errorf ("could not create or access directory to open rosmar cluster %q: %w" , serverURL , err )
49+ }
50+ cluster .bucketDirectory = directory
51+ }
52+ return cluster , nil
3153}
3254
3355// GetConfigBuckets returns all the buckets registered in rosmar.
34- func (c * RosmarCluster ) GetConfigBuckets () ([]string , error ) {
56+ func (c * RosmarCluster ) GetConfigBuckets (ctx context.Context ) ([]string , error ) {
57+ // If the cluster is a serialized rosmar cluster, we need to open each bucket to add to rosmar.bucketRegistry.
58+ if c .bucketDirectory != "" {
59+ d , err := os .ReadDir (c .bucketDirectory )
60+ if err != nil {
61+ return nil , err
62+ }
63+ for _ , bucketName := range d {
64+ bucket , err := c .openBucket (bucketName .Name ())
65+ if err != nil {
66+ return nil , fmt .Errorf ("could not open bucket %s from %s :%w" , bucketName , c .serverURL , err )
67+ }
68+ defer bucket .Close (ctx )
69+
70+ }
71+ }
3572 return rosmar .GetBucketNames (), nil
3673}
3774
75+ // openBucket opens a rosmar bucket with the given name.
76+ func (c * RosmarCluster ) openBucket (bucketName string ) (* rosmar.Bucket , error ) {
77+ // OpenBucketIn is required to open a bucket from a serialized rosmar implementation.
78+ return rosmar .OpenBucketIn (c .serverURL , bucketName , rosmar .CreateOrOpen )
79+ }
80+
81+ // getDefaultDataStore returns the default datastore for the specified bucket. Returns a bucket close function and an
82+ // error.
83+ func (c * RosmarCluster ) getDefaultDataStore (ctx context.Context , bucketName string ) (sgbucket.DataStore , func (ctx context.Context ), error ) {
84+ bucket , err := rosmar .OpenBucketIn (c .serverURL , bucketName , rosmar .CreateOrOpen )
85+ if err != nil {
86+ return nil , nil , err
87+ }
88+ closeFn := func (ctx context.Context ) { bucket .Close (ctx ) }
89+
90+ ds , err := bucket .NamedDataStore (DefaultScopeAndCollectionName ())
91+ if err != nil {
92+ AssertfCtx (ctx , "Unexpected error getting default collection for bucket %q: %v" , bucketName , err )
93+ closeFn (ctx )
94+ return nil , nil , err
95+ }
96+ return ds , closeFn , nil
97+ }
98+
3899// GetMetadataDocument returns a metadata document from the default collection for the specified bucket.
39100func (c * RosmarCluster ) GetMetadataDocument (ctx context.Context , location , docID string , valuePtr any ) (cas uint64 , err error ) {
40- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
101+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
41102 if err != nil {
42103 return 0 , err
43104 }
44- defer bucket . Close (ctx )
105+ defer closer (ctx )
45106
46- return bucket . DefaultDataStore () .Get (docID , valuePtr )
107+ return ds .Get (docID , valuePtr )
47108}
48109
49110// InsertMetadataDocument inserts a metadata document, and fails if it already exists.
50111func (c * RosmarCluster ) InsertMetadataDocument (ctx context.Context , location , key string , value any ) (newCAS uint64 , err error ) {
51- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
112+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
52113 if err != nil {
53114 return 0 , err
54115 }
55- defer bucket . Close (ctx )
116+ defer closer (ctx )
56117
57- return bucket . DefaultDataStore () .WriteCas (key , 0 , 0 , value , 0 )
118+ return ds .WriteCas (key , 0 , 0 , value , 0 )
58119}
59120
60121// WriteMetadataDocument writes a metadata document, and fails on CAS mismatch
61122func (c * RosmarCluster ) WriteMetadataDocument (ctx context.Context , location , docID string , cas uint64 , value any ) (newCAS uint64 , err error ) {
62- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
123+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
63124 if err != nil {
64125 return 0 , err
65126 }
66- defer bucket .Close (ctx )
67-
68- return bucket .DefaultDataStore ().WriteCas (docID , 0 , cas , value , 0 )
127+ defer closer (ctx )
128+ return ds .WriteCas (docID , 0 , cas , value , 0 )
69129}
70130
71131// TouchMetadataDocument sets the specified property in a bootstrap metadata document for a given bucket and key. Used to
72132// trigger CAS update on the document, to block any racing updates. Does not retry on CAS failure.
73133
74134func (c * RosmarCluster ) TouchMetadataDocument (ctx context.Context , location , docID string , property , value string , cas uint64 ) (newCAS uint64 , err error ) {
75- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
135+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
76136 if err != nil {
77137 return 0 , err
78138 }
79- defer bucket . Close (ctx )
139+ defer closer (ctx )
80140
81141 // FIXME to not touch the whole document?
82- return bucket . DefaultDataStore () .Touch (docID , 0 )
142+ return ds .Touch (docID , 0 )
83143}
84144
85145// DeleteMetadataDocument deletes an existing bootstrap metadata document for a given bucket and key.
86146func (c * RosmarCluster ) DeleteMetadataDocument (ctx context.Context , location , key string , cas uint64 ) error {
87- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
147+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
88148 if err != nil {
89149 return err
90150 }
91- defer bucket . Close (ctx )
151+ defer closer (ctx )
92152
93- _ , err = bucket . DefaultDataStore () .Remove (key , cas )
153+ _ , err = ds .Remove (key , cas )
94154 return err
95155}
96156
97157// UpdateMetadataDocument updates a given document and retries on CAS mismatch.
98158func (c * RosmarCluster ) UpdateMetadataDocument (ctx context.Context , location , docID string , updateCallback func (bucketConfig []byte , rawBucketConfigCas uint64 ) (newConfig []byte , err error )) (newCAS uint64 , err error ) {
99- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
159+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
100160 if err != nil {
101161 return 0 , err
102162 }
103- defer bucket . Close (ctx )
163+ defer closer (ctx )
104164 for {
105165 var bucketValue []byte
106- cas , err := bucket . DefaultDataStore () .Get (docID , & bucketValue )
166+ cas , err := ds .Get (docID , & bucketValue )
107167 if err != nil {
108168 return 0 , err
109169 }
@@ -113,7 +173,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
113173 }
114174 // handle delete when updateCallback returns nil
115175 if newConfig == nil {
116- removeCasOut , err := bucket . DefaultDataStore () .Remove (docID , cas )
176+ removeCasOut , err := ds .Remove (docID , cas )
117177 if err != nil {
118178 // retry on cas failure
119179 if errors .As (err , & sgbucket.CasMismatchErr {}) {
@@ -124,7 +184,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
124184 return removeCasOut , nil
125185 }
126186
127- replaceCfgCasOut , err := bucket . DefaultDataStore () .WriteCas (docID , 0 , cas , newConfig , 0 )
187+ replaceCfgCasOut , err := ds .WriteCas (docID , 0 , cas , newConfig , 0 )
128188 if err != nil {
129189 if errors .As (err , & sgbucket.CasMismatchErr {}) {
130190 // retry on cas failure
@@ -140,25 +200,25 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
140200
141201// KeyExists checks whether a key exists in the default collection for the specified bucket
142202func (c * RosmarCluster ) KeyExists (ctx context.Context , location , docID string ) (exists bool , err error ) {
143- bucket , err := rosmar . OpenBucket ( c . serverURL , location , rosmar . CreateOrOpen )
203+ ds , closer , err := c . getDefaultDataStore ( ctx , location )
144204 if err != nil {
145205 return false , err
146206 }
147- defer bucket . Close (ctx )
207+ defer closer (ctx )
148208
149- return bucket . DefaultDataStore () .Exists (docID )
209+ return ds .Exists (docID )
150210}
151211
152212// GetDocument fetches a document from the default collection. Does not use configPersistence - callers
153213// requiring configPersistence handling should use GetMetadataDocument.
154214func (c * RosmarCluster ) GetDocument (ctx context.Context , bucketName , docID string , rv any ) (exists bool , err error ) {
155- bucket , err := rosmar . OpenBucket ( c . serverURL , bucketName , rosmar . CreateOrOpen )
215+ ds , closer , err := c . getDefaultDataStore ( ctx , bucketName )
156216 if err != nil {
157217 return false , err
158218 }
159- defer bucket . Close (ctx )
219+ defer closer (ctx )
160220
161- _ , err = bucket . DefaultDataStore () .Get (docID , rv )
221+ _ , err = ds .Get (docID , rv )
162222 if IsDocNotFoundError (err ) {
163223 return false , nil
164224 }
@@ -171,3 +231,12 @@ func (c *RosmarCluster) Close() {
171231}
172232
173233func (c * RosmarCluster ) SetConnectionStringServerless () error { return nil }
234+
235+ // AsRosmarBucket returns a bucket as a rosmar.Bucket, or an error if it is not one.
236+ func AsRosmarBucket (bucket Bucket ) (* rosmar.Bucket , error ) {
237+ baseBucket := GetBaseBucket (bucket )
238+ if b , ok := baseBucket .(* rosmar.Bucket ); ok {
239+ return b , nil
240+ }
241+ return nil , fmt .Errorf ("bucket is not a rosmar bucket (type %T)" , baseBucket )
242+ }
0 commit comments