-
Notifications
You must be signed in to change notification settings - Fork 143
Expand file tree
/
Copy pathdatabase.go
More file actions
2567 lines (2205 loc) · 91 KB
/
database.go
File metadata and controls
2567 lines (2205 loc) · 91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2012-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package db
import (
"context"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/couchbase/cbgt"
sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/auth"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
pkgerrors "github.com/pkg/errors"
)
const (
DBOffline uint32 = iota
DBStarting
DBOnline
DBStopping
DBResyncing
)
var RunStateString = []string{
DBOffline: "Offline",
DBStarting: "Starting",
DBOnline: "Online",
DBStopping: "Stopping",
DBResyncing: "Resyncing",
}
const (
DBCompactNotRunning uint32 = iota
DBCompactRunning
)
const (
DefaultRevsLimitNoConflicts = 50
DefaultRevsLimitConflicts = 100
// DefaultPurgeInterval represents a time duration of 30 days to be
// used as default metadata purge interval when the server’s purge
// interval (either bucket specific or cluster wide) is not available.
DefaultPurgeInterval = 30 * 24 * time.Hour
DefaultSGReplicateEnabled = true
DefaultSGReplicateWebsocketPingInterval = time.Minute * 5
DefaultCompactInterval = 24 * time.Hour
)
// Default values for delta sync
var (
DefaultDeltaSyncEnabled = false
DefaultDeltaSyncRevMaxAge = uint32(60 * 60 * 24) // 24 hours in seconds
)
var (
DefaultQueryPaginationLimit = 5000
)
var (
BypassReleasedSequenceWait atomic.Bool // Used to optimize single-node testing, see DisableSequenceWaitOnDbRestart
)
const (
CompactIntervalMinDays = float32(0.04) // ~1 Hour in days
CompactIntervalMaxDays = float32(60) // 60 Days in days
)
// BGTCompletionMaxWait is the maximum amount of time to wait for
// completion of all background tasks and background managers before the server is stopped.
const BGTCompletionMaxWait = 30 * time.Second
// Basic description of a database. Shared between all Database objects on the same database.
// This object is thread-safe so it can be shared between HTTP handlers.
type DatabaseContext struct {
Name string // Database name
UUID string // UUID for this database instance. Used by cbgt and sgr
MetadataStore base.DataStore // Storage for database metadata (anything that isn't an end-user's/customer's documents)
Bucket base.Bucket // Storage
BucketSpec base.BucketSpec // The BucketSpec
BucketLock sync.RWMutex // Control Access to the underlying bucket object
mutationListener changeListener // Caching feed listener
ImportListener *importListener // Import feed listener
sequences *sequenceAllocator // Source of new sequence numbers
StartTime time.Time // Timestamp when context was instantiated
RevsLimit uint32 // Max depth a document's revision tree can grow to
autoImport bool // Add sync data to new untracked couchbase server docs? (Xattr mode specific)
revisionCache RevisionCache // Cache of recently-accessed doc revisions
channelCache ChannelCache
changeCache changeCache // Cache of recently-access channels
EventMgr *EventManager // Manages notification events
AllowEmptyPassword bool // Allow empty passwords? Defaults to false
Options DatabaseContextOptions // Database Context Options
AccessLock sync.RWMutex // Allows DB offline to block until synchronous calls have completed
State uint32 // The runtime state of the DB from a service perspective
ResyncManager *BackgroundManager
TombstoneCompactionManager *BackgroundManager
AttachmentCompactionManager *BackgroundManager
ExitChanges chan struct{} // Active _changes feeds on the DB will close when this channel is closed
OIDCProviders auth.OIDCProviderMap // OIDC clients
LocalJWTProviders auth.LocalJWTProviderMap
ServerUUID string // UUID of the server, if available
DbStats *base.DbStats // stats that correspond to this database context
CompactState uint32 // Status of database compaction
terminator chan bool // Signal termination of background goroutines
CancelContext context.Context // Cancelled when the database is closed - used to notify associated processes (e.g. blipContext)
cancelContextFunc context.CancelFunc // Cancel function for cancelContext
backgroundTasks []BackgroundTask // List of background tasks that are initiated.
activeChannels *channels.ActiveChannels // Tracks active replications by channel
CfgSG cbgt.Cfg // Sync Gateway cluster shared config
SGReplicateMgr *sgReplicateManager // Manages interactions with sg-replicate replications
Heartbeater base.Heartbeater // Node heartbeater for SG cluster awareness
ServeInsecureAttachmentTypes bool // Attachment content type will bypass the content-disposition handling, default false
NoX509HTTPClient *http.Client // A HTTP Client from gocb to use the management endpoints
ServerContextHasStarted chan struct{} // Closed via PostStartup once the server has fully started
UserFunctionTimeout time.Duration // Default timeout for N1QL & JavaScript queries. (Applies to REST and BLIP requests.)
Scopes map[string]Scope // A map keyed by scope name containing a set of scopes/collections. Nil if running with only _default._default
CollectionByID map[uint32]*DatabaseCollection // A map keyed by collection ID to Collection
CollectionNames map[string]map[string]struct{} // Map of scope, collection names
MetadataKeys *base.MetadataKeys // Factory to generate metadata document keys
RequireResync base.ScopeAndCollectionNames // Collections requiring resync before database can go online
CORS *auth.CORSConfig // CORS configuration
EnableMou bool // Write _mou xattr when performing metadata-only update. Set based on bucket capability on connect
WasInitializedSynchronously bool // true if the database was initialized synchronously
}
type Scope struct {
Collections map[string]*DatabaseCollection
}
type DatabaseContextOptions struct {
CacheOptions *CacheOptions
RevisionCacheOptions *RevisionCacheOptions
OldRevExpirySeconds uint32
AdminInterface *string
UnsupportedOptions *UnsupportedOptions
OIDCOptions *auth.OIDCOptions
LocalJWTConfig auth.LocalJWTConfig
ImportOptions ImportOptions
EnableXattr bool // Use xattr for _sync
LocalDocExpirySecs uint32 // The _local doc expiry time in seconds
SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride
SessionCookieName string // Pass-through DbConfig.SessionCookieName
SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly
UserFunctions *UserFunctions // JS/N1QL functions clients can call
AllowConflicts *bool // False forbids creating conflicts
SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header
DisablePasswordAuthentication bool // True enforces OIDC/guest only
UseViews bool // Force use of views
DeltaSyncOptions DeltaSyncOptions // Delta Sync Options
CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run
SGReplicateOptions SGReplicateOptions
SlowQueryWarningThreshold time.Duration
QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit
UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled.
ClientPartitionWindow time.Duration
BcryptCost int
GroupID string
JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter)
Serverless bool // If running in serverless mode
Scopes ScopesOptions
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
MetadataID string // MetadataID used for metadata storage
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
ConfigPrincipals *ConfigPrincipals
PurgeInterval *time.Duration // Add a custom purge interval, as a testing seam. If nil, this parameter is filled in by Couchbase Server, with a fallback to a default value SG has.
LoggingConfig *base.DbLogConfig // Per-database log configuration
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
NumIndexReplicas uint // Number of replicas for GSI indexes
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
}
type ConfigPrincipals struct {
Users map[string]*auth.PrincipalConfig
Roles map[string]*auth.PrincipalConfig
Guest *auth.PrincipalConfig
}
type ScopesOptions map[string]ScopeOptions
type ScopeOptions struct {
Collections map[string]CollectionOptions
}
type CollectionOptions struct {
Sync *string // Collection sync function
ImportFilter *ImportFilterFunction // Opt-in filter for document import
}
type SGReplicateOptions struct {
Enabled bool // Whether this node can be assigned sg-replicate replications
WebsocketPingInterval time.Duration // BLIP Websocket Ping interval (for active replicators)
}
type OidcTestProviderOptions struct {
Enabled bool `json:"enabled,omitempty"` // Whether the oidc_test_provider endpoints should be exposed on the public API
}
type UserViewsOptions struct {
Enabled *bool `json:"enabled,omitempty"` // Whether pass-through view query is supported through public API
}
type DeltaSyncOptions struct {
Enabled bool // Whether delta sync is enabled (EE only)
RevMaxAgeSeconds uint32 // The number of seconds deltas for old revs are available for
}
type APIEndpoints struct {
// This setting is only needed for testing purposes. In the Couchbase Lite unit tests that run in "integration mode"
// against a running Sync Gateway, the tests need to be able to flush the data in between tests to start with a clean DB.
EnableCouchbaseBucketFlush bool `json:"enable_couchbase_bucket_flush,omitempty"` // Whether Couchbase buckets can be flushed via Admin REST API
}
// UnsupportedOptions are not supported for external use
type UnsupportedOptions struct {
UserViews *UserViewsOptions `json:"user_views,omitempty"` // Config settings for user views
OidcTestProvider *OidcTestProviderOptions `json:"oidc_test_provider,omitempty"` // Config settings for OIDC Provider
APIEndpoints *APIEndpoints `json:"api_endpoints,omitempty"` // Config settings for API endpoints
WarningThresholds *WarningThresholds `json:"warning_thresholds,omitempty"` // Warning thresholds related to _sync size
DisableCleanSkippedQuery bool `json:"disable_clean_skipped_query,omitempty"` // Clean skipped sequence processing bypasses final check (deprecated: CBG-2672)
OidcTlsSkipVerify bool `json:"oidc_tls_skip_verify,omitempty"` // Config option to enable self-signed certs for OIDC testing.
SgrTlsSkipVerify bool `json:"sgr_tls_skip_verify,omitempty"` // Config option to enable self-signed certs for SG-Replicate testing.
RemoteConfigTlsSkipVerify bool `json:"remote_config_tls_skip_verify,omitempty"` // Config option to enable self signed certificates for external JavaScript load.
GuestReadOnly bool `json:"guest_read_only,omitempty"` // Config option to restrict GUEST document access to read-only
ForceAPIForbiddenErrors bool `json:"force_api_forbidden_errors,omitempty"` // Config option to force the REST API to return forbidden errors
ConnectedClient bool `json:"connected_client,omitempty"` // Enables BLIP connected-client APIs
UseQueryBasedResyncManager bool `json:"use_query_resync_manager,omitempty"` // Config option to use Query based resync manager to perform Resync op
DCPReadBuffer int `json:"dcp_read_buffer,omitempty"` // Enables user to set their own DCP read buffer
KVBufferSize int `json:"kv_buffer,omitempty"` // Enables user to set their own KV pool buffer
BlipSendDocsWithChannelRemoval bool `json:"blip_send_docs_with_channel_removal,omitempty"` // Enables sending docs with channel removals using channel filters
}
type WarningThresholds struct {
XattrSize *uint32 `json:"xattr_size_bytes,omitempty"` // Number of bytes to be used as a threshold for xattr size limit warnings
ChannelsPerDoc *uint32 `json:"channels_per_doc,omitempty"` // Number of channels per document to be used as a threshold for channel count warnings
GrantsPerDoc *uint32 `json:"access_and_role_grants_per_doc,omitempty"` // Number of access and role grants per document to be used as a threshold for grant count warnings
ChannelsPerUser *uint32 `json:"channels_per_user,omitempty"` // Number of channels per user to be used as a threshold for channel count warnings
ChannelNameSize *uint32 `json:"channel_name_size,omitempty"` // Number of channel name characters to be used as a threshold for channel name warnings
}
// Options associated with the import of documents not written by Sync Gateway
type ImportOptions struct {
BackupOldRev bool // Create temporary backup of old revision body when available
ImportPartitions uint16 // Number of partitions for import
}
// Represents a simulated CouchDB database. A new instance is created for each HTTP request,
// so this struct does not have to be thread-safe.
type Database struct {
*DatabaseContext
user auth.User
}
func ValidateDatabaseName(dbName string) error {
// http://wiki.apache.org/couchdb/HTTP_database_API#Naming_and_Addressing
if match, _ := regexp.MatchString(`^[a-z][-a-z0-9_$()+/]*$`, dbName); !match {
return base.HTTPErrorf(http.StatusBadRequest,
"Illegal database name: %s", dbName)
}
return nil
}
// getNewDatabaseSleeperFunc returns a sleeper function during database connection
func getNewDatabaseSleeperFunc() base.RetrySleeper {
return base.CreateDoublingSleeperFunc(
13, // MaxNumRetries approx 40 seconds total retry duration
5, // InitialRetrySleepTimeMS
)
}
// connectToBucketErrorHandling takes the given spec and error and returns a formatted error, along with whether it was a fatal error.
func connectToBucketErrorHandling(ctx context.Context, spec base.BucketSpec, gotErr error) (fatalError bool, err error) {
if gotErr != nil {
if errors.Is(gotErr, base.ErrAuthError) {
username, _, _ := spec.Auth.GetCredentials()
base.WarnfCtx(ctx, "Unable to authenticate as user %q: %v", base.UD(username), gotErr)
// auth errors will be wrapped with HTTPError further up the stack where appropriate. Return the raw error that can still be checked.
return false, gotErr
}
// Fatal errors get an additional log message, but are otherwise still transformed below.
if errors.Is(gotErr, base.ErrFatalBucketConnection) {
base.WarnfCtx(ctx, "Fatal error connecting to bucket: %v", gotErr)
fatalError = true
}
// Remaining errors are appended to the end of a more helpful error message.
return fatalError, base.HTTPErrorf(http.StatusBadGateway,
" Unable to connect to Couchbase Server (connection refused). Please ensure it is running and reachable at the configured host and port. Detailed error: %s", gotErr)
}
return false, nil
}
type OpenBucketFn func(context.Context, base.BucketSpec, bool) (base.Bucket, error)
// ConnectToBucket opens a Couchbase connection and return a specific bucket. If failFast is set, fail immediately if the bucket doesn't exist, otherwise retry waiting for bucket to exist.
func ConnectToBucket(ctx context.Context, spec base.BucketSpec, failFast bool) (base.Bucket, error) {
if failFast {
bucket, err := base.GetBucket(ctx, spec)
_, err = connectToBucketErrorHandling(ctx, spec, err)
return bucket, err
}
// start a retry loop to connect to the bucket backing off double the delay each time
worker := func() (bool, error, interface{}) {
bucket, err := base.GetBucket(ctx, spec)
// Retry if there was a non-fatal error
fatalError, newErr := connectToBucketErrorHandling(ctx, spec, err)
shouldRetry := newErr != nil && !fatalError
return shouldRetry, newErr, bucket
}
description := fmt.Sprintf("Attempt to connect to bucket : %v", spec.BucketName)
err, ibucket := base.RetryLoop(ctx, description, worker, getNewDatabaseSleeperFunc())
if err != nil {
return nil, err
}
return ibucket.(base.Bucket), nil
}
// Returns Couchbase Server Cluster UUID on a timeout. If running against walrus, do return an empty string.
func getServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
gocbV2Bucket, err := base.AsGocbV2Bucket(bucket)
if err != nil {
return "", nil
}
// start a retry loop to get server ID
worker := func() (bool, error, interface{}) {
uuid, err := base.GetServerUUID(ctx, gocbV2Bucket)
return err != nil, err, uuid
}
err, uuid := base.RetryLoop(ctx, "Getting ServerUUID", worker, getNewDatabaseSleeperFunc())
return uuid.(string), err
}
// Creates a new DatabaseContext on a bucket. The bucket will be closed when this context closes.
func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, autoImport bool, options DatabaseContextOptions) (dbc *DatabaseContext, returnedError error) {
cleanupFunctions := make([]func(), 0)
defer func() {
if returnedError != nil {
for _, cleanupFunc := range cleanupFunctions {
cleanupFunc()
}
}
}()
if err := ValidateDatabaseName(dbName); err != nil {
return nil, err
}
// add db info to ctx before having a DatabaseContext (cannot call AddDatabaseLogContext),
// in order to pass it to RegisterImportPindexImpl
ctx = base.DatabaseLogCtx(ctx, dbName, options.LoggingConfig)
if err := base.RequireNoBucketTTL(ctx, bucket); err != nil {
return nil, err
}
serverUUID, err := getServerUUID(ctx, bucket)
if err != nil {
return nil, err
}
dbStats, statsError := initDatabaseStats(ctx, dbName, autoImport, options)
if statsError != nil {
return nil, statsError
}
// options.MetadataStore is always passed via rest._getOrAddDatabase...
// but in db package tests this is unlikely to be set. In this case we'll use the existing bucket connection to store metadata.
metadataStore := options.MetadataStore
if metadataStore == nil {
base.DebugfCtx(ctx, base.KeyAll, "MetadataStore was nil - falling back to use existing bucket connection %q for database %q", bucket.GetName(), dbName)
metadataStore = bucket.DefaultDataStore()
}
// Register the cbgt pindex type for the configGroup
RegisterImportPindexImpl(ctx, options.GroupID)
dbContext := &DatabaseContext{
Name: dbName,
UUID: cbgt.NewUUID(),
MetadataStore: metadataStore,
Bucket: bucket,
StartTime: time.Now(),
autoImport: autoImport,
Options: options,
DbStats: dbStats,
CollectionByID: make(map[uint32]*DatabaseCollection),
ServerUUID: serverUUID,
UserFunctionTimeout: defaultUserFunctionTimeout,
}
// set up cancellable context based on the background context (context lifecycle for the database
// must be distinct from the request context associated with the db create/update). Used to trigger
// teardown of connected replications on database close.
dbContext.CancelContext, dbContext.cancelContextFunc = context.WithCancel(context.Background())
cleanupFunctions = append(cleanupFunctions, func() {
dbContext.cancelContextFunc()
})
// Check if server version supports multi-xattr operations, required for mou handling
dbContext.EnableMou = bucket.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations)
// Initialize metadata ID and keys
metaKeys := base.NewMetadataKeys(options.MetadataID)
dbContext.MetadataKeys = metaKeys
cleanupFunctions = append(cleanupFunctions, func() {
base.SyncGatewayStats.ClearDBStats(dbName)
})
if dbContext.AllowConflicts() {
dbContext.RevsLimit = DefaultRevsLimitConflicts
} else {
dbContext.RevsLimit = DefaultRevsLimitNoConflicts
}
dbContext.terminator = make(chan bool)
dbContext.EventMgr = NewEventManager(dbContext.terminator)
dbContext.sequences, err = newSequenceAllocator(ctx, metadataStore, dbContext.DbStats.Database(), metaKeys)
if err != nil {
return nil, err
}
cleanupFunctions = append(cleanupFunctions, func() {
dbContext.sequences.Stop(ctx)
})
// Initialize the active channel counter
dbContext.activeChannels = channels.NewActiveChannels(dbContext.DbStats.Cache().NumActiveChannels)
cacheOptions := options.CacheOptions
if cacheOptions == nil {
defaultOpts := DefaultCacheOptions()
cacheOptions = &defaultOpts
}
channelCache, err := NewChannelCacheForContext(ctx, cacheOptions.ChannelCacheOptions, dbContext)
if err != nil {
return nil, err
}
dbContext.channelCache = channelCache
// Initialize sg cluster config. Required even if import and sgreplicate are disabled
// on this node, to support replication REST API calls
if base.IsEnterpriseEdition() {
sgCfg, err := base.NewCfgSG(ctx, metadataStore, metaKeys.SGCfgPrefix(dbContext.Options.GroupID))
if err != nil {
return nil, err
}
dbContext.CfgSG = sgCfg
} else {
sgCfg, err := base.NewCbgtCfgMem()
if err != nil {
return nil, err
}
dbContext.CfgSG = sgCfg
}
// Initialize the tap Listener for notify handling
dbContext.mutationListener.Init(bucket.GetName(), options.GroupID, dbContext.MetadataKeys)
if len(options.Scopes) == 0 {
return nil, fmt.Errorf("Setting scopes to be zero is invalid")
}
dbContext.Scopes = make(map[string]Scope, len(options.Scopes))
dbContext.CollectionNames = make(map[string]map[string]struct{}, len(options.Scopes))
// if any sync functions for any collection, we recommend running a resync
syncFunctionsChanged := false
// Create new backing store map to map from collection ID's to their associated rev cache backing stores for rev cache document loads
collectionIDToRevCacheBackingStore := make(map[uint32]RevisionCacheBackingStore)
for scopeName, scope := range options.Scopes {
dbContext.Scopes[scopeName] = Scope{
Collections: make(map[string]*DatabaseCollection, len(scope.Collections)),
}
collectionNameMap := make(map[string]struct{}, len(scope.Collections))
for collName, collOpts := range scope.Collections {
// intentional shadow - we want each collection to have its own context inside this loop body
ctx := ctx
if !base.IsDefaultCollection(scopeName, collName) {
ctx = base.CollectionLogCtx(ctx, scopeName, collName)
}
dataStore, err := bucket.NamedDataStore(base.ScopeAndCollectionName{Scope: scopeName, Collection: collName})
if err != nil {
return nil, err
}
stats, err := dbContext.DbStats.CollectionStat(scopeName, collName)
if err != nil {
return nil, err
}
dbCollection, err := newDatabaseCollection(ctx, dbContext, dataStore, stats)
if err != nil {
return nil, err
}
if collOpts.Sync != nil {
fnChanged, err := dbCollection.UpdateSyncFun(ctx, *collOpts.Sync)
if err != nil {
return nil, err
}
if fnChanged {
syncFunctionsChanged = true
}
} else {
defaultSyncFunction := channels.GetDefaultSyncFunction(scopeName, collName)
base.InfofCtx(ctx, base.KeyAll, "Using default sync function %q for database %s.%s.%s", defaultSyncFunction, base.MD(dbName), base.MD(scopeName), base.MD(collName))
}
if collOpts.ImportFilter != nil {
dbCollection.importFilterFunction = collOpts.ImportFilter
}
dbContext.Scopes[scopeName].Collections[collName] = dbCollection
collectionID := dbCollection.GetCollectionID()
dbContext.CollectionByID[collectionID] = dbCollection
collectionNameMap[collName] = struct{}{}
collectionIDToRevCacheBackingStore[collectionID] = dbCollection
}
dbContext.CollectionNames[scopeName] = collectionNameMap
}
// Init the rev cache
dbContext.revisionCache = NewRevisionCache(
dbContext.Options.RevisionCacheOptions,
collectionIDToRevCacheBackingStore,
dbContext.DbStats.Cache(),
)
if syncFunctionsChanged {
base.InfofCtx(ctx, base.KeyAll, "**NOTE:** %q's sync function has changed. The new function may assign different channels to documents, or permissions to users. You may want to re-sync the database to update these.", base.MD(dbContext.Name))
}
// Initialize sg-replicate manager
dbContext.SGReplicateMgr, err = NewSGReplicateManager(ctx, dbContext, dbContext.CfgSG)
if err != nil {
return nil, err
}
if dbContext.UseQueryBasedResyncManager() {
dbContext.ResyncManager = NewResyncManager(metadataStore, metaKeys)
} else {
dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys)
}
return dbContext, nil
}
func (context *DatabaseContext) GetOIDCProvider(providerName string) (*auth.OIDCProvider, error) {
// If providerName isn't specified, check whether there's a default provider
if providerName == "" {
provider := context.OIDCProviders.GetDefaultProvider()
if provider == nil {
return nil, errors.New("No default provider available.")
}
return provider, nil
}
if provider, ok := context.OIDCProviders[providerName]; ok {
return provider, nil
} else {
return nil, base.RedactErrorf("No provider found for provider name %q", base.MD(providerName))
}
}
// _stopOnlineProcesses is called to represent an error condition from startOnlineProcesses, or from DatabaseContext.Close. Most of the objects are not safe to close twice, since they have internal terminator objects and goroutines that wait on closed channels. Acquire the bucket lock, to avoid calling this function multiple times.
func (db *DatabaseContext) _stopOnlineProcesses(ctx context.Context) {
db.mutationListener.Stop(ctx)
db.changeCache.Stop(ctx)
if db.ImportListener != nil {
db.ImportListener.Stop()
db.ImportListener = nil
}
if db.Heartbeater != nil {
db.Heartbeater.Stop(ctx)
db.Heartbeater = nil
}
if db.SGReplicateMgr != nil {
db.SGReplicateMgr.Stop()
db.SGReplicateMgr = nil
}
}
func (context *DatabaseContext) Close(ctx context.Context) {
context.BucketLock.Lock()
defer context.BucketLock.Unlock()
context.OIDCProviders.Stop()
close(context.terminator)
if context.cancelContextFunc != nil {
context.cancelContextFunc()
}
// Stop All background processors
bgManagers := context.stopBackgroundManagers()
// Wait for database background tasks to finish.
waitForBGTCompletion(ctx, BGTCompletionMaxWait, context.backgroundTasks, context.Name)
context.sequences.Stop(ctx)
context._stopOnlineProcesses(ctx)
// Stop the channel cache and its background tasks.
context.channelCache.Stop(ctx)
waitForBackgroundManagersToStop(ctx, BGTCompletionMaxWait, bgManagers)
context.Bucket.Close(ctx)
context.Bucket = nil
base.RemovePerDbStats(context.Name)
}
// stopBackgroundManagers stops any running BackgroundManager.
// Returns a list of BackgroundManager it signalled to stop
func (context *DatabaseContext) stopBackgroundManagers() []*BackgroundManager {
bgManagers := make([]*BackgroundManager, 0)
if context.ResyncManager != nil {
if !isBackgroundManagerStopped(context.ResyncManager.GetRunState()) {
if err := context.ResyncManager.Stop(); err == nil {
bgManagers = append(bgManagers, context.ResyncManager)
}
}
}
if context.AttachmentCompactionManager != nil {
if !isBackgroundManagerStopped(context.AttachmentCompactionManager.GetRunState()) {
if err := context.AttachmentCompactionManager.Stop(); err == nil {
bgManagers = append(bgManagers, context.AttachmentCompactionManager)
}
}
}
if context.TombstoneCompactionManager != nil {
if !isBackgroundManagerStopped(context.TombstoneCompactionManager.GetRunState()) {
if err := context.TombstoneCompactionManager.Stop(); err == nil {
bgManagers = append(bgManagers, context.TombstoneCompactionManager)
}
}
}
return bgManagers
}
// waitForBackgroundManagersToStop wait for given BackgroundManagers to stop within given time
func waitForBackgroundManagersToStop(ctx context.Context, waitTimeMax time.Duration, bgManagers []*BackgroundManager) {
timeout := time.NewTicker(waitTimeMax)
defer timeout.Stop()
retryInterval := 1 * time.Millisecond
maxRetryInterval := 1 * time.Second
for {
select {
case <-timeout.C:
runningBackgroundManagerNames := ""
for _, bgManager := range bgManagers {
if !isBackgroundManagerStopped(bgManager.GetRunState()) {
runningBackgroundManagerNames += fmt.Sprintf(" %s", bgManager.GetName())
}
}
base.WarnfCtx(ctx, "Background Managers [%s] failed to stop within deadline of %s.", runningBackgroundManagerNames, waitTimeMax)
return
case <-time.After(retryInterval):
stoppedServices := 0
for _, bgManager := range bgManagers {
state := bgManager.GetRunState()
if isBackgroundManagerStopped(state) {
stoppedServices += 1
}
}
if stoppedServices == len(bgManagers) {
return
}
// exponential backoff with max wait
if retryInterval < maxRetryInterval {
retryInterval = retryInterval * 2
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
}
}
}
}
func isBackgroundManagerStopped(state BackgroundProcessState) bool {
return state == BackgroundProcessStateStopped || state == BackgroundProcessStateCompleted || state == BackgroundProcessStateError || state == ""
}
// waitForBGTCompletion waits for all the background tasks to finish.
func waitForBGTCompletion(ctx context.Context, waitTimeMax time.Duration, tasks []BackgroundTask, dbName string) {
waitTime := waitTimeMax
for _, t := range tasks {
start := time.Now()
select {
case <-t.doneChan:
waitTime -= time.Now().Sub(start)
continue
case <-time.After(waitTime):
// Timeout after waiting for background task to terminate.
}
base.InfofCtx(ctx, base.KeyAll, "Timeout after %v of waiting for background task %q to "+
"terminate, database: %s", waitTimeMax, t.taskName, dbName)
}
}
func (context *DatabaseContext) IsClosed() bool {
context.BucketLock.RLock()
defer context.BucketLock.RUnlock()
return context.Bucket == nil
}
// For testing only!
func (context *DatabaseContext) RestartListener(ctx context.Context) error {
context.mutationListener.Stop(ctx)
// Delay needed to properly stop
time.Sleep(2 * time.Second)
context.mutationListener.Init(context.Bucket.GetName(), context.Options.GroupID, context.MetadataKeys)
cacheFeedStatsMap := context.DbStats.Database().CacheFeedMapStats
if err := context.mutationListener.Start(ctx, context.Bucket, cacheFeedStatsMap.Map, context.Scopes, context.MetadataStore); err != nil {
return err
}
return nil
}
// Removes previous versions of Sync Gateway's design docs found on the server
func (dbCtx *DatabaseContext) RemoveObsoleteDesignDocs(ctx context.Context, previewOnly bool) (removedDesignDocs []string, err error) {
ds := dbCtx.Bucket.DefaultDataStore()
viewStore, ok := ds.(sgbucket.ViewStore)
if !ok {
return []string{}, fmt.Errorf("Datastore does not support views")
}
return removeObsoleteDesignDocs(ctx, viewStore, previewOnly, dbCtx.UseViews())
}
// getDataStores returns all datastores on the database, including metadatastore
func (dbCtx *DatabaseContext) getDataStores() []sgbucket.DataStore {
datastores := make([]sgbucket.DataStore, 0, len(dbCtx.CollectionByID))
for _, collection := range dbCtx.CollectionByID {
datastores = append(datastores, collection.dataStore)
}
_, hasDefaultCollection := dbCtx.CollectionByID[base.DefaultCollectionID]
if !hasDefaultCollection {
datastores = append(datastores, dbCtx.MetadataStore)
}
return datastores
}
// Removes previous versions of Sync Gateway's indexes found on the server. Returns a map of indexes removed by collection name.
func (dbCtx *DatabaseContext) RemoveObsoleteIndexes(ctx context.Context, previewOnly bool) ([]string, error) {
if !dbCtx.Bucket.IsSupported(sgbucket.BucketStoreFeatureN1ql) {
return nil, nil
}
var errs *base.MultiError
var removedIndexes []string
for _, dataStore := range dbCtx.getDataStores() {
collectionName := fmt.Sprintf("`%s`.`%s`", dataStore.ScopeName(), dataStore.CollectionName())
n1qlStore, ok := base.AsN1QLStore(dataStore)
if !ok {
err := fmt.Sprintf("Cannot remove obsolete indexes for non-gocb collection %s - skipping.", base.MD(collectionName))
base.WarnfCtx(ctx, err)
errs = errs.Append(errors.New(err))
continue
}
collectionRemovedIndexes, err := removeObsoleteIndexes(ctx, n1qlStore, previewOnly, dbCtx.UseXattrs(), dbCtx.UseViews(), sgIndexes)
if err != nil {
errs = errs.Append(err)
continue
}
onlyDefaultCollection := dbCtx.OnlyDefaultCollection()
for _, idxName := range collectionRemovedIndexes {
if onlyDefaultCollection {
removedIndexes = append(removedIndexes, idxName)
} else {
removedIndexes = append(removedIndexes,
fmt.Sprintf("%s.%s", collectionName, idxName))
}
}
}
return removedIndexes, errs.ErrorOrNil()
}
// Trigger terminate check handling for connected continuous replications.
// TODO: The underlying code (NotifyCheckForTermination) doesn't actually leverage the specific username - should be refactored
//
// to remove
func (context *DatabaseContext) NotifyTerminatedChanges(ctx context.Context, username string) {
context.mutationListener.NotifyCheckForTermination(ctx, base.SetOf(base.UserPrefixRoot+username))
}
func (dc *DatabaseContext) TakeDbOffline(ctx context.Context, reason string) error {
if atomic.CompareAndSwapUint32(&dc.State, DBOnline, DBStopping) {
// notify all active _changes feeds to close
close(dc.ExitChanges)
// Block until all current calls have returned, including _changes feeds
dc.AccessLock.Lock()
defer dc.AccessLock.Unlock()
dc.changeCache.Stop(ctx)
// set DB state to Offline
atomic.StoreUint32(&dc.State, DBOffline)
if err := dc.EventMgr.RaiseDBStateChangeEvent(ctx, dc.Name, "offline", reason, dc.Options.AdminInterface); err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Error raising database state change event: %v", err)
}
return nil
} else {
dbState := atomic.LoadUint32(&dc.State)
// If the DB is already transitioning to: offline or is offline silently return
if dbState == DBOffline || dbState == DBResyncing || dbState == DBStopping {
return nil
}
msg := "Unable to take Database offline, database must be in Online state but was " + RunStateString[dbState]
if dbState == DBOnline {
msg = "Unable to take Database offline, another operation was already in progress. Please try again."
}
base.InfofCtx(ctx, base.KeyCRUD, msg)
return base.NewHTTPError(http.StatusServiceUnavailable, msg)
}
}
func (db *Database) TakeDbOffline(nonContextStruct base.NonCancellableContext, reason string) error {
return db.DatabaseContext.TakeDbOffline(nonContextStruct.Ctx, reason)
}
func (context *DatabaseContext) Authenticator(ctx context.Context) *auth.Authenticator {
context.BucketLock.RLock()
defer context.BucketLock.RUnlock()
sessionCookieName := auth.DefaultCookieName
if context.Options.SessionCookieName != "" {
sessionCookieName = context.Options.SessionCookieName
}
var channelsWarningThreshold *uint32
if context.Options.UnsupportedOptions != nil && context.Options.UnsupportedOptions.WarningThresholds != nil {
channelsWarningThreshold = context.Options.UnsupportedOptions.WarningThresholds.ChannelsPerUser
}
// Authenticators are lightweight & stateless, so it's OK to return a new one every time
authenticator := auth.NewAuthenticator(context.MetadataStore, context, auth.AuthenticatorOptions{
ClientPartitionWindow: context.Options.ClientPartitionWindow,
ChannelsWarningThreshold: channelsWarningThreshold,
SessionCookieName: sessionCookieName,
BcryptCost: context.Options.BcryptCost,
LogCtx: ctx,
Collections: context.CollectionNames,
MetaKeys: context.MetadataKeys,
})
return authenticator
}
func (context *DatabaseContext) IsServerless() bool {
return context.Options.Serverless
}
// Makes a Database object given its name and bucket.
func GetDatabase(context *DatabaseContext, user auth.User) (*Database, error) {
return &Database{DatabaseContext: context, user: user}, nil
}
func CreateDatabase(context *DatabaseContext) (*Database, error) {
return &Database{DatabaseContext: context}, nil
}
func (db *Database) SameAs(otherdb *Database) bool {
return db != nil && otherdb != nil &&
db.Bucket == otherdb.Bucket
}
func (db *Database) IsCompactRunning() bool {
return atomic.LoadUint32(&db.CompactState) == DBCompactRunning
}
func (db *Database) User() auth.User {
return db.user
}
func (db *Database) SetUser(user auth.User) {
db.user = user
}
// Reloads the database's User object, in case its persistent properties have been changed.
func (db *Database) ReloadUser(ctx context.Context) error {
if db.user == nil {
return nil
}
user, err := db.Authenticator(ctx).GetUser(db.user.Name())
if err != nil {
return err
}
if user == nil {
return errors.New("User not found during reload")
} else {
db.user = user
return nil
}
}
// ////// ALL DOCUMENTS:
type IDRevAndSequence struct {
DocID string
RevID string
Sequence uint64
}
// The ForEachDocID options for limiting query results
type ForEachDocIDOptions struct {
Startkey string
Endkey string
Limit uint64
}
type ForEachDocIDFunc func(id IDRevAndSequence, channels []string) (bool, error)
// Iterates over all documents in the database, calling the callback function on each
func (c *DatabaseCollection) ForEachDocID(ctx context.Context, callback ForEachDocIDFunc, resultsOpts ForEachDocIDOptions) error {
results, err := c.QueryAllDocs(ctx, resultsOpts.Startkey, resultsOpts.Endkey)
if err != nil {
return err
}
err = c.processForEachDocIDResults(ctx, callback, resultsOpts.Limit, results)
if err != nil {
return err
}
return results.Close()
}
// Iterate over the results of an AllDocs query, performing ForEachDocID handling for each row
func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, callback ForEachDocIDFunc, limit uint64, results sgbucket.QueryResultIterator) error {
count := uint64(0)
for {
var queryRow AllDocsIndexQueryRow
var found bool
var docid, revid string
var seq uint64
var channels []string
if c.useViews() {
var viewRow AllDocsViewQueryRow
found = results.Next(ctx, &viewRow)
if found {
docid = viewRow.Key
revid = viewRow.Value.RevID
seq = viewRow.Value.Sequence
channels = viewRow.Value.Channels
}
} else {
found = results.Next(ctx, &queryRow)
if found {
docid = queryRow.Id
revid = queryRow.RevID
seq = queryRow.Sequence
channels = make([]string, 0)
// Query returns all channels, but we only want to return active channels
for channelName, removal := range queryRow.Channels {
if removal == nil {
channels = append(channels, channelName)
}
}
}
}