Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ concurrency:
cancel-in-progress: ${{ !contains(github.ref, 'release/') && !contains(github.ref, 'main') }}

env:
GO_VERSION: &go_version 1.25.6
GO_VERSION: &go_version 1.25.7
GO_TEST_ANNOTATIONS_VERSION: &go_test_annotations_version guyarb/golang-test-annotations@v0.8.0

jobs:
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pipeline {
}

tools {
go '1.25.6'
go '1.25.7'
}

stages {
Expand Down
33 changes: 32 additions & 1 deletion base/constants_syncdocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
MetaKeyRolePrefix // "role:"
MetaKeyUserEmailPrefix // "useremail:"
MetaKeySessionPrefix // "session:"
MetaKeyResyncHeartBeaterPrefix // "resync_hb:"
MetaKeyResyncCfgPrefix // "resync_cfg
)

var metadataKeyNames = []string{
Expand All @@ -55,7 +57,8 @@ var metadataKeyNames = []string{
"role:", // stores a role
"useremail:", // stores a role
"session:", // stores a session

"resync_hb:", // document prefix used to store resync data
"resync_cfg:", // document prefix used to store resync cfg data
}

func (m metadataKey) String() string {
Expand Down Expand Up @@ -108,6 +111,8 @@ type MetadataKeys struct {
rolePrefix string
userEmailPrefix string
sessionPrefix string
resyncHeartbeaterPrefix string
resyncCfgPrefix string
}

// sha1HashLength is the number of characters in a sha1
Expand All @@ -130,6 +135,8 @@ var DefaultMetadataKeys = &MetadataKeys{
rolePrefix: formatDefaultMetadataKey(MetaKeyRolePrefix),
userEmailPrefix: formatDefaultMetadataKey(MetaKeyUserEmailPrefix),
sessionPrefix: formatDefaultMetadataKey(MetaKeySessionPrefix),
resyncHeartbeaterPrefix: formatDefaultMetadataKey(MetaKeyResyncHeartBeaterPrefix),
resyncCfgPrefix: formatDefaultMetadataKey(MetaKeyResyncCfgPrefix),
}

// NewMetadataKeys returns MetadataKeys for the specified MetadataID If metadataID is empty string, returns the default (legacy) metadata keys.
Expand All @@ -153,6 +160,8 @@ func NewMetadataKeys(metadataID string) *MetadataKeys {
rolePrefix: formatInvertedMetadataKey(metadataID, MetaKeyRolePrefix),
userEmailPrefix: formatInvertedMetadataKey(metadataID, MetaKeyUserEmailPrefix),
sessionPrefix: formatInvertedMetadataKey(metadataID, MetaKeySessionPrefix),
resyncHeartbeaterPrefix: formatInvertedMetadataKey(metadataID, MetaKeyResyncHeartBeaterPrefix),
resyncCfgPrefix: formatInvertedMetadataKey(metadataID, MetaKeyResyncCfgPrefix),
}
}
}
Expand Down Expand Up @@ -224,6 +233,28 @@ func (m *MetadataKeys) SGCfgPrefix(groupID string) string {
return m.sgCfgPrefix
}

// ResyncHeartbeaterPrefix returns a document prefix to use for resync heartbeat documents.
//
// format: _sync:{m_$}:resync_hb:[groupID:] (collections)
// format: _sync:resync_hb:[groupID:] (default)
func (m *MetadataKeys) ResyncHeartbeaterPrefix(groupID string) string {
if groupID != "" {
return m.resyncHeartbeaterPrefix + groupID + ":"
}
return m.resyncHeartbeaterPrefix
}

// ResyncCfgPrefix returns a document prefix to use for resync cfg documents
//
// format: _sync:{m_$}:cfg[groupID:] (collections)
// format: _sync:cfg:[groupID:] (default)
func (m *MetadataKeys) ResyncCfgPrefix(groupID string) string {
if groupID != "" {
return m.resyncCfgPrefix + groupID + ":"
}
return m.resyncCfgPrefix
}

// PersistentConfigKey returns a document key to use for persisted database configurations
//
// format: _sync:{m_$}:db_config:[groupID]
Expand Down
10 changes: 10 additions & 0 deletions base/constants_syncdocs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func TestMetadataKeys(t *testing.T) {
sessionKey string
backgroundProcessHeartbeatPrefix string // with backgroundID
backgroundProcessStatusPrefix string // with backgroundID
resyncHeatbeatPrefix string
resyncCfgPrefix string
}{
{
metadataID: "",
Expand All @@ -162,6 +164,8 @@ func TestMetadataKeys(t *testing.T) {
sessionKey: "_sync:session:aSessionID",
backgroundProcessHeartbeatPrefix: "_sync:background_process:heartbeat:backgroundID",
backgroundProcessStatusPrefix: "_sync:background_process:status:backgroundID",
resyncHeatbeatPrefix: "_sync:resync_hb:",
resyncCfgPrefix: "_sync:resync_cfg:",
},
{
metadataID: "",
Expand All @@ -184,6 +188,8 @@ func TestMetadataKeys(t *testing.T) {
sessionKey: "_sync:session:aSessionID",
backgroundProcessHeartbeatPrefix: "_sync:background_process:heartbeat:backgroundID",
backgroundProcessStatusPrefix: "_sync:background_process:status:backgroundID",
resyncHeatbeatPrefix: "_sync:resync_hb:aGroupID",
resyncCfgPrefix: "_sync:resync_cfg:",
},
{
metadataID: "aMetadataID",
Expand All @@ -206,6 +212,8 @@ func TestMetadataKeys(t *testing.T) {
sessionKey: "_sync:session:aMetadataID:aSessionID",
backgroundProcessHeartbeatPrefix: "_sync:m_aMetadataID:background_process:heartbeat:backgroundID",
backgroundProcessStatusPrefix: "_sync:m_aMetadataID:background_process:status:backgroundID",
resyncHeatbeatPrefix: "_sync:m_aMetadataID:resync_hb:",
resyncCfgPrefix: "_sync:m_aMetadataID:resync_cfg:",
},
{
metadataID: "aMetadataID",
Expand All @@ -228,6 +236,8 @@ func TestMetadataKeys(t *testing.T) {
sessionKey: "_sync:session:aMetadataID:aSessionID",
backgroundProcessHeartbeatPrefix: "_sync:m_aMetadataID:background_process:heartbeat:backgroundID",
backgroundProcessStatusPrefix: "_sync:m_aMetadataID:background_process:status:backgroundID",
resyncHeatbeatPrefix: "_sync:m_aMetadataID:resync_hb:aGroupID:",
resyncCfgPrefix: "_sync:m_aMetadataID:resync_cfg:aGroupID:",
},
}

Expand Down
1 change: 1 addition & 0 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const kCheckpointTimeThreshold = 1 * time.Minute
// DCP Feed IDs are used to build unique DCP identifiers
const DCPCachingFeedID = "SG"
const DCPImportFeedID = "SGI"
const DCPResyncFeedID = "SGR"

type DCPCommon struct {
dbStatsExpvars *expvar.Map
Expand Down
45 changes: 35 additions & 10 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
CBGTCfgNodeDefsKnown = SyncDocPrefix + "cfgnodeDefs-known"
CBGTCfgNodeDefsWanted = SyncDocPrefix + "cfgnodeDefs-wanted"
CBGTCfgPlanPIndexes = SyncDocPrefix + "cfgplanPIndexes"
CBGTIndexTypeSyncGatewayResync = "syncGateway-resync-"
)

// firstVersionToSupportCollections represents the earliest Sync Gateway release that supports collections.
Expand All @@ -42,6 +43,18 @@ var firstVersionToSupportCollections = &ComparableBuildVersion{
patch: 0,
}

type DestType int

const (
ImportDestType DestType = iota // "import"
ResyncDestType // "resync"
)

var destTypeNames = []string{
"import",
"resync",
}

// nodeExtras is the contents of the JSON value of the cbgt.NodeDef.Extras field as used by Sync Gateway.
type nodeExtras struct {
// Version is the node's version.
Expand All @@ -63,7 +76,7 @@ type CbgtContext struct {

// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
// dbName is used to define a unique path name for local file storage of pindex files
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) {
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg, resyncIndex bool) (*CbgtContext, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could take DestType instead of a boolean parameter, which would be generally preferred since it would be more reasonable. The name DestType might not be appropriate, but it could be ShardedDCPFeedType instead?

It might be worth figuring out below what is actually needed to do to split on the feed type and whether to pass these arguments into this function or pass a type parameter. Review this comment after looking through the rest of the code and the comments.

// Ensure we don't try to start collections-enabled feed if there are any pre-collection SG nodes in the cluster.
minVersion, err := getMinNodeVersion(cfg)
if err != nil {
Expand All @@ -85,7 +98,7 @@ func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string,
ctx = CorrelationIDLogCtx(ctx, DCPImportFeedID)

// Start Manager. Registers this node in the cfg
err = cbgtContext.StartManager(ctx, dbName, configGroup, bucket, scope, collections, numPartitions)
err = cbgtContext.StartManager(ctx, dbName, configGroup, bucket, scope, collections, numPartitions, resyncIndex)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,15 +133,21 @@ func GenerateLegacyIndexName(dbName string) string {
// to the manager's cbgt cfg. Nodes that have registered for this indexType with the manager via
// RegisterPIndexImplType (see importListener.RegisterImportPindexImpl)
// will receive PIndexImpl callbacks (New, Open) for assigned PIndex to initiate DCP processing.
func createCBGTIndex(ctx context.Context, c *CbgtContext, dbName string, configGroupID string, bucket Bucket, scope string, collections []string, numPartitions uint16) error {
func createCBGTIndex(ctx context.Context, c *CbgtContext, dbName string, configGroupID string, bucket Bucket, scope string, collections []string, numPartitions uint16, resyncIndex bool) error {
sourceType := SOURCE_DCP_SG

sourceParams, err := cbgtFeedParams(ctx, scope, collections, dbName)
if err != nil {
return err
}

indexParams, err := cbgtIndexParams(ImportDestKey(dbName, scope, collections))
var destType DestType
if resyncIndex {
destType = ResyncDestType
} else {
destType = ImportDestType
}
indexParams, err := cbgtIndexParams(DestKey(dbName, scope, collections, destType))
if err != nil {
return err
}
Expand Down Expand Up @@ -156,7 +175,12 @@ func createCBGTIndex(ctx context.Context, c *CbgtContext, dbName string, configG

// Index types are namespaced by configGroupID to support delete and create of a database targeting the
// same bucket in a config group
indexType := CBGTIndexTypeSyncGatewayImport + configGroupID
var indexType string
if resyncIndex {
indexType = CBGTIndexTypeSyncGatewayResync + configGroupID
} else {
indexType = CBGTIndexTypeSyncGatewayImport + configGroupID
}
err = c.Manager.CreateIndex(
sourceType, // sourceType
c.sourceName, // bucket name
Expand Down Expand Up @@ -361,7 +385,7 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
}

// StartManager registers this node with cbgt, and the janitor will start feeds on this node.
func (c *CbgtContext) StartManager(ctx context.Context, dbName string, configGroup string, bucket Bucket, scope string, collections []string, numPartitions uint16) (err error) {
func (c *CbgtContext) StartManager(ctx context.Context, dbName string, configGroup string, bucket Bucket, scope string, collections []string, numPartitions uint16, resyncIndex bool) (err error) {
// TODO: Clarify the functional difference between registering the manager as 'wanted' vs 'known'.
registerType := cbgt.NODE_DEFS_WANTED
if err := c.Manager.Start(registerType); err != nil {
Expand All @@ -370,7 +394,7 @@ func (c *CbgtContext) StartManager(ctx context.Context, dbName string, configGro
}

// Add the index definition for this feed to the cbgt cfg, in case it's not already present.
err = createCBGTIndex(ctx, c, dbName, configGroup, bucket, scope, collections, numPartitions)
err = createCBGTIndex(ctx, c, dbName, configGroup, bucket, scope, collections, numPartitions, resyncIndex)
if err != nil {
if strings.Contains(err.Error(), "an index with the same name already exists") {
InfofCtx(ctx, KeyCluster, "Duplicate cbgt index detected during index creation (concurrent creation), using existing")
Expand Down Expand Up @@ -458,10 +482,11 @@ func (c *CbgtContext) RemoveFeedCredentials(dbName string) {
}

// Format of dest key for retrieval of import dest from cbgtDestFactories
func ImportDestKey(dbName string, scope string, collections []string) string {
func DestKey(dbName string, scope string, collections []string, destType DestType) string {
sort.Strings(collections)
collectionString := ""
onlyDefault := true
destTypeName := destTypeNames[destType]
for _, collection := range collections {
if collection != DefaultCollection {
onlyDefault = false
Expand All @@ -470,9 +495,9 @@ func ImportDestKey(dbName string, scope string, collections []string) string {
}
// format for _default._default
if collectionString == "" || (scope == DefaultScope && onlyDefault) {
return fmt.Sprintf("%s_import", dbName)
return fmt.Sprintf("%s_%s", dbName, destTypeName)
}
return fmt.Sprintf("%s_import_%x", dbName, sha256.Sum256([]byte(collectionString)))
return fmt.Sprintf("%s_%s_%x", dbName, destTypeName, sha256.Sum256([]byte(collectionString)))
}

func registerHeartbeatListener(ctx context.Context, heartbeater Heartbeater, cbgtContext *CbgtContext) (*importHeartbeatListener, error) {
Expand Down
2 changes: 1 addition & 1 deletion base/dcp_sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestImportDestKey(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
require.Equal(t, test.key, ImportDestKey(test.dbName, test.scopeName, test.collections))
require.Equal(t, test.key, DestKey(test.dbName, test.scopeName, test.collections, ImportDestType))
})
}
}
12 changes: 6 additions & 6 deletions base/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestCBGTIndexCreation(t *testing.T) {
}

// Create cbgt index via SG handling
err = createCBGTIndex(ctx, context, tc.dbName, configGroup, bucket, "", nil, 16)
err = createCBGTIndex(ctx, context, tc.dbName, configGroup, bucket, "", nil, 16, false)
require.NoError(t, err)

// Verify single index exists, and matches expected naming
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestCBGTIndexCreationSafeLegacyName(t *testing.T) {
require.NoError(t, err, "Unable to create legacy-style index")

// Create cbgt index
err = createCBGTIndex(ctx, context, testDbName, configGroup, bucket, "", nil, 16)
err = createCBGTIndex(ctx, context, testDbName, configGroup, bucket, "", nil, 16, false)
require.NoError(t, err)

// Verify single index created
Expand All @@ -322,7 +322,7 @@ func TestCBGTIndexCreationSafeLegacyName(t *testing.T) {
assert.Len(t, indexDefsMap, 1)

// Attempt to recreate index
err = createCBGTIndex(ctx, context, testDbName, configGroup, bucket, "", nil, 16)
err = createCBGTIndex(ctx, context, testDbName, configGroup, bucket, "", nil, 16, false)
require.NoError(t, err)

// Verify single index defined (acts as upsert to existing)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestCBGTIndexCreationUnsafeLegacyName(t *testing.T) {
require.NoError(t, err, "Unable to create legacy-style index")

// Create cbgt index
err = createCBGTIndex(ctx, context, unsafeTestDBName, configGroup, bucket, "", nil, 16)
err = createCBGTIndex(ctx, context, unsafeTestDBName, configGroup, bucket, "", nil, 16, false)
require.NoError(t, err)

// Verify single index created
Expand All @@ -400,7 +400,7 @@ func TestCBGTIndexCreationUnsafeLegacyName(t *testing.T) {
assert.Len(t, indexDefsMap, 1)

// Attempt to recreate index
err = createCBGTIndex(ctx, context, unsafeTestDBName, configGroup, bucket, "", nil, 16)
err = createCBGTIndex(ctx, context, unsafeTestDBName, configGroup, bucket, "", nil, 16, false)
require.NoError(t, err)

// Verify single index defined (acts as upsert to existing)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestConcurrentCBGTIndexCreation(t *testing.T) {

// StartManager starts the manager and creates the index
log.Printf("Starting manager for %s", managerUUID)
startErr := context.StartManager(ctx, testDBName, configGroup, bucket, "", nil, DefaultImportPartitions)
startErr := context.StartManager(ctx, testDBName, configGroup, bucket, "", nil, DefaultImportPartitions, false)
assert.NoError(t, startErr)
managerWg.Done()

Expand Down
Loading
Loading