diff --git a/README.md b/README.md index 032193ef..34a21e96 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,10 @@ Any collection metadata mismatches will occur in a task with the type '`verifyCo In this case, '`failed_docs`' contains all the meta data mismatches, in this case an index named '`x_1`'. +# Resumability + +The migration-verifier periodically persists its change stream’s resume token so that, in the event of a catastrophic failure (e.g., memory exhaustion), when restarted the verifier will receive any change events that happened while the verifier was down. + # Performance The migration-verifier optimizes for the case where a migration’s initial sync is completed **and** change events are relatively infrequent. If you start verification before initial sync finishes, or if the source cluster is too busy, the verification may freeze. diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4b534f04..dfeb53c2 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -2,15 +2,17 @@ package verifier import ( "context" - "errors" "fmt" "time" "github.com/10gen/migration-verifier/internal/keystring" + "github.com/pkg/errors" + "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/exp/constraints" ) // ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'. @@ -32,6 +34,19 @@ type DocKey struct { ID interface{} `bson:"_id"` } +const ( + minChangeStreamPersistInterval = time.Second * 10 + metadataChangeStreamCollectionName = "changeStream" +) + +type UnknownEventError struct { + Event *ParsedEvent +} + +func (uee UnknownEventError) Error() string { + return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType) +} + // HandleChangeStreamEvent performs the necessary work for change stream events that occur during // operation. func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error { @@ -50,7 +65,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve case "update": return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent) default: - return errors.New(`Not supporting: "` + changeEvent.OpType + `" events`) + return UnknownEventError{Event: changeEvent} } } @@ -67,121 +82,241 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D { return []bson.D{stage} } -// StartChangeStream starts the change stream. -func (verifier *Verifier) StartChangeStream(ctx context.Context, startTime *primitive.Timestamp) error { - streamReader := func(cs *mongo.ChangeStream) { - var changeEvent ParsedEvent - for { - select { - // if the context is cancelled return immmediately - case <-ctx.Done(): - return - // if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain - // the remaining changes, but when TryNext returns false, we will exit, since there should - // be no message until the user has guaranteed writes to the source have ended. - case <-verifier.changeStreamEnderChan: - for cs.TryNext(ctx) { - if err := cs.Decode(&changeEvent); err != nil { - verifier.logger.Fatal().Err(err).Msg("Failed to decode change event") - } - err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) - if err != nil { - verifier.changeStreamErrChan <- err - verifier.logger.Fatal().Err(err).Msg("Error handling change event") - } - } - verifier.mux.Lock() - verifier.changeStreamRunning = false - if verifier.lastChangeEventTime != nil { - verifier.srcStartAtTs = verifier.lastChangeEventTime - } - verifier.mux.Unlock() - // since we have started Recheck, we must signal that we have - // finished the change stream changes so that Recheck can continue. - verifier.changeStreamDoneChan <- struct{}{} - // since the changeStream is exhausted, we now return - verifier.logger.Debug().Msg("Change stream is done") - return - // the default case is that we are still in the Check phase, in the check phase we still - // use TryNext, but we do not exit if TryNext returns false. - default: - if next := cs.TryNext(ctx); !next { - continue - } - if err := cs.Decode(&changeEvent); err != nil { - verifier.logger.Fatal().Err(err).Msg("") - } - err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) - if err != nil { - verifier.changeStreamErrChan <- err - return - } +func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { + var changeEvent ParsedEvent + + var lastPersistedTime time.Time + + persistResumeTokenIfNeeded := func() error { + if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval { + return nil + } + + err := verifier.persistChangeStreamResumeToken(ctx, cs) + if err == nil { + lastPersistedTime = time.Now() + } + + return err + } + + for { + var err error + + for cs.TryNext(ctx) { + if err = cs.Decode(&changeEvent); err != nil { + err = errors.Wrap(err, "failed to decode change event") + break + } + err = verifier.HandleChangeStreamEvent(ctx, &changeEvent) + if err != nil { + err = errors.Wrap(err, "failed to handle change event") + break + } + } + + if cs.Err() != nil { + err = errors.Wrap( + cs.Err(), + "change stream iteration failed", + ) + } + + if err == nil { + err = persistResumeTokenIfNeeded() + } + + if err != nil { + if !errors.Is(err, context.Canceled) { + verifier.changeStreamErrChan <- err } + + return + } + + select { + // If the changeStreamEnderChan has a message, the user has indicated that + // source writes are ended. This means we should exit rather than continue + // reading the change stream since there should be no more events. + case <-verifier.changeStreamEnderChan: + verifier.mux.Lock() + verifier.changeStreamRunning = false + if verifier.lastChangeEventTime != nil { + verifier.srcStartAtTs = verifier.lastChangeEventTime + } + verifier.mux.Unlock() + // since we have started Recheck, we must signal that we have + // finished the change stream changes so that Recheck can continue. + verifier.changeStreamDoneChan <- struct{}{} + // since the changeStream is exhausted, we now return + verifier.logger.Debug().Msg("Change stream is done") + return + default: } } +} + +// StartChangeStream starts the change stream. +func (verifier *Verifier) StartChangeStream(ctx context.Context) error { pipeline := verifier.GetChangeStreamFilter() opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) - if startTime != nil { - opts = opts.SetStartAtOperationTime(startTime) - verifier.srcStartAtTs = startTime + + savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) + if err != nil { + return errors.Wrap(err, "failed to load persisted change stream resume token") + } + + csStartLogEvent := verifier.logger.Info() + + if savedResumeToken != nil { + logEvent := csStartLogEvent. + Stringer("resumeToken", savedResumeToken) + + ts, err := extractTimestampFromResumeToken(savedResumeToken) + if err == nil { + logEvent = addUnixTimeToLogEvent(ts.T, logEvent) + } else { + verifier.logger.Warn(). + Err(err). + Msg("Failed to extract timestamp from persisted resume token.") + } + + logEvent.Msg("Starting change stream from persisted resume token.") + + opts = opts.SetStartAfter(savedResumeToken) + } else { + csStartLogEvent.Msg("Starting change stream from current source cluster time.") } + sess, err := verifier.srcClient.StartSession() if err != nil { - return err + return errors.Wrap(err, "failed to start session") } sctx := mongo.NewSessionContext(ctx, sess) srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts) + if err != nil { + return errors.Wrap(err, "failed to open change stream") + } + + err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream) if err != nil { return err } - if startTime == nil { - resumeToken := srcChangeStream.ResumeToken() - if resumeToken == nil { - return errors.New("Resume token is missing; cannot choose start time") - } - // Change stream token is always a V1 keystring in the _data field - resumeTokenDataValue := resumeToken.Lookup("_data") - resumeTokenData, ok := resumeTokenDataValue.StringValueOK() - if !ok { - return fmt.Errorf("Resume token _data is missing or the wrong type: %v", - resumeTokenDataValue.Type) - } - resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, resumeTokenData) - if err != nil { - return err - } - // First element is the cluster time we want - resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp) - if !ok { - return errors.New("Resume token lacks a cluster time") - } - verifier.srcStartAtTs = &resumeTokenTime - // On sharded servers the resume token time can be ahead of the actual cluster time by one - // increment. In that case we must use the actual cluster time or we will get errors. - clusterTimeRaw := sess.ClusterTime() - clusterTimeInner, err := clusterTimeRaw.LookupErr("$clusterTime") - if err != nil { - return err - } - clusterTimeTsVal, err := bson.Raw(clusterTimeInner.Value).LookupErr("clusterTime") - if err != nil { - return err - } - var clusterTimeTs primitive.Timestamp - clusterTimeTs.T, clusterTimeTs.I, ok = clusterTimeTsVal.TimestampOK() - if !ok { - return errors.New("Cluster time is not a timestamp") - } + csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken()) + if err != nil { + return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + } - verifier.logger.Debug().Msgf("Initial cluster time is %+v", clusterTimeTs) - if clusterTimeTs.Compare(resumeTokenTime) < 0 { - verifier.srcStartAtTs = &clusterTimeTs - } + clusterTime, err := getClusterTimeFromSession(sess) + if err != nil { + return errors.Wrap(err, "failed to read cluster time from session") + } + + verifier.srcStartAtTs = &csTimestamp + if csTimestamp.After(clusterTime) { + verifier.srcStartAtTs = &clusterTime } + verifier.mux.Lock() verifier.changeStreamRunning = true verifier.mux.Unlock() - go streamReader(srcChangeStream) + + go verifier.iterateChangeStream(ctx, srcChangeStream) + return nil } + +func addUnixTimeToLogEvent[T constraints.Integer](unixTime T, event *zerolog.Event) *zerolog.Event { + return event.Time("clockTime", time.Unix(int64(unixTime), int64(0))) +} + +func (v *Verifier) getChangeStreamMetadataCollection() *mongo.Collection { + return v.metaClient.Database(v.metaDBName).Collection(metadataChangeStreamCollectionName) +} + +func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) { + coll := verifier.getChangeStreamMetadataCollection() + + token, err := coll.FindOne( + ctx, + bson.D{{"_id", "resumeToken"}}, + ).Raw() + + if errors.Is(err, mongo.ErrNoDocuments) { + return nil, nil + } + + return token, err +} + +func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error { + token := cs.ResumeToken() + + coll := verifier.getChangeStreamMetadataCollection() + _, err := coll.ReplaceOne( + ctx, + bson.D{{"_id", "resumeToken"}}, + token, + options.Replace().SetUpsert(true), + ) + + if err == nil { + ts, err := extractTimestampFromResumeToken(token) + + logEvent := verifier.logger.Debug() + + if err == nil { + logEvent = addUnixTimeToLogEvent(ts.T, logEvent) + } else { + verifier.logger.Warn().Err(err). + Msg("failed to extract resume token timestamp") + } + + logEvent.Msg("Persisted change stream resume token.") + + return nil + } + + return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) +} + +func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) { + tokenStruct := struct { + Data string `bson:"_data"` + }{} + + // Change stream token is always a V1 keystring in the _data field + err := bson.Unmarshal(resumeToken, &tokenStruct) + if err != nil { + return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken) + } + + resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data) + if err != nil { + return primitive.Timestamp{}, err + } + // First element is the cluster time we want + resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp) + if !ok { + return primitive.Timestamp{}, errors.Errorf("resume token data's (%+v) first element is of type %T, not a timestamp", resumeTokenBson, resumeTokenBson[0].Value) + } + + return resumeTokenTime, nil +} + +func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { + ctStruct := struct { + ClusterTime struct { + ClusterTime primitive.Timestamp `bson:"clusterTime"` + } `bson:"$clusterTime"` + }{} + + clusterTimeRaw := sess.ClusterTime() + err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) + if err != nil { + return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) + } + + return ctStruct.ClusterTime.ClusterTime, nil +} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index da3977a5..6412eb12 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -3,9 +3,12 @@ package verifier import ( "context" "testing" + "time" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) @@ -27,6 +30,102 @@ func TestChangeStreamFilter(t *testing.T) { }, verifier.GetChangeStreamFilter()) } +// TestChangeStreamResumability creates a verifier, starts its change stream, +// terminates that verifier, updates the source cluster, starts a new +// verifier with change stream, and confirms that things look as they should. +func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() { + func() { + verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := verifier1.StartChangeStream(ctx) + suite.Require().NoError(err) + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err := suite.srcMongoClient. + Database("testDb"). + Collection("testColl"). + InsertOne( + ctx, + bson.D{{"_id", "heyhey"}}, + ) + suite.Require().NoError(err) + + verifier2 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + + suite.Require().Empty( + suite.fetchVerifierRechecks(ctx, verifier2), + "no rechecks should be enqueued before starting change stream", + ) + + newTime := suite.getClusterTime(ctx, suite.srcMongoClient) + + err = verifier2.StartChangeStream(ctx) + suite.Require().NoError(err) + + suite.Require().NotNil(verifier2.srcStartAtTs) + + suite.Assert().False( + verifier2.srcStartAtTs.After(newTime), + "verifier2's change stream should be no later than this new session", + ) + + recheckDocs := []bson.M{} + + require.Eventually( + suite.T(), + func() bool { + recheckDocs = suite.fetchVerifierRechecks(ctx, verifier2) + + return len(recheckDocs) > 0 + }, + time.Minute, + 500*time.Millisecond, + "the verifier should enqueue a recheck", + ) + + suite.Assert().Equal( + bson.M{ + "db": "testDb", + "coll": "testColl", + "generation": int32(0), + "docID": "heyhey", + }, + recheckDocs[0]["_id"], + "recheck doc should have expected ID", + ) +} + +func (suite *MultiSourceVersionTestSuite) getClusterTime(ctx context.Context, client *mongo.Client) primitive.Timestamp { + sess, err := client.StartSession() + suite.Require().NoError(err, "should start session") + + sctx := mongo.NewSessionContext(ctx, sess) + suite.Require().NoError(sess.Client().Ping(sctx, nil)) + + newTime, err := getClusterTimeFromSession(sess) + suite.Require().NoError(err, "should fetch cluster time") + + return newTime +} + +func (suite *MultiSourceVersionTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M { + recheckDocs := []bson.M{} + + recheckColl := verifier.verificationDatabase().Collection(recheckQueue) + cursor, err := recheckColl.Find(ctx, bson.D{}) + + if !errors.Is(err, mongo.ErrNoDocuments) { + suite.Require().NoError(err) + suite.Require().NoError(cursor.All(ctx, &recheckDocs)) + } + + return recheckDocs +} + func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() { verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) ctx, cancel := context.WithCancel(context.Background()) @@ -39,7 +138,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, origStartTs) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) verifier.changeStreamEnderChan <- struct{}{} @@ -59,7 +158,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, origStartTs) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( @@ -94,7 +193,7 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, nil) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().NotNil(verifier.srcStartAtTs) suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index b7a1338d..fa5b2fd0 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/10gen/migration-verifier/internal/retry" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -162,17 +161,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any verifier.mux.RUnlock() if !csRunning { verifier.logger.Debug().Msg("Change stream not running; starting change stream") - retryer := retry.New(retry.DefaultDurationLimit).SetRetryOnUUIDNotSupported() - // Ignore the error from this call -- if it fails, we use an alternate method - // where we use the change stream's initial resume token. - startAtTs, _ := GetLastOpTimeAndSyncShardClusterTime(ctx, - verifier.logger, - retryer, - verifier.srcClient, - true) - err = verifier.StartChangeStream(ctx, startAtTs) + + err = verifier.StartChangeStream(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to start change stream on source") } } // Log out the verification status when initially booting up so it's easy to see the current state diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index dc244fbc..82f4eac8 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -510,7 +510,9 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { suite.Require().NoError(err) event.OpType = "flibbity" err = verifier.HandleChangeStreamEvent(ctx, &event) - suite.Require().Equal(fmt.Errorf(`Not supporting: "flibbity" events`), err) + badEventErr := UnknownEventError{} + suite.Require().ErrorAs(err, &badEventErr) + suite.Assert().Equal("flibbity", badEventErr.Event.OpType) verifier.generation++ func() { diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 3aaf7267..c9ba7f8a 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -86,7 +86,10 @@ func (verifier *Verifier) insertRecheckDocs( SetFilter(filterDoc).SetReplacement(recheckDoc).SetUpsert(true)) } _, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite(ctx, models) - verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation) + + if err == nil { + verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation) + } return err } diff --git a/internal/verifier/util.go b/internal/verifier/util.go index eacb7abb..a51fe84b 100644 --- a/internal/verifier/util.go +++ b/internal/verifier/util.go @@ -1,17 +1,10 @@ package verifier import ( - "context" "fmt" "strings" - "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/partitions" - "github.com/10gen/migration-verifier/internal/retry" - "github.com/10gen/migration-verifier/internal/util" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) @@ -96,61 +89,3 @@ type QueryFilter struct { Namespace string `bson:"namespace" json:"namespace"` To string `bson:"to,omitempty" json:"to,omitempty"` } - -// GetLastOpTimeAndSyncShardClusterTime retrieves the last operation time on the source. If maxClusterTime is provided, -// the `AppendOplogNote` will only be performed on the shards that has a $clusterTime lower than the provided value. -// This function is exposed for testing purposes. -// This is slightly modified from the code in mongosync to use *mongo.Client instead of the -// mongosync util.Client. -func GetLastOpTimeAndSyncShardClusterTime( - ctx context.Context, - logger *logger.Logger, - retryer retry.Retryer, - client *mongo.Client, - retryOnLockFailed bool, -) (*primitive.Timestamp, error) { - // 'AppendOplogNote' will perform a no-op write on the source cluster. When run against - // sharded clusters, this command will perform the no-op write on all shards. In receiving - // the shard responses, the driver notes the highest $clusterTime amongst all of them. - var response bson.Raw - appendOplogNoteCmd := bson.D{ - primitive.E{Key: "appendOplogNote", Value: 1}, - primitive.E{Key: "data", Value: primitive.E{Key: "migration-verifier", Value: "last op fetching"}}} - - if retryOnLockFailed { - retryer = retryer.WithErrorCodes(util.LockFailed) - } - err := retryer.RunForTransientErrorsOnly(ctx, logger, func(ri *retry.Info) error { - ri.Log(logger.Logger, - "appendOplogNote", - "source", - "", - "", - fmt.Sprintf("Running appendOplogNote command. %v", appendOplogNoteCmd)) - ret := client.Database("admin").RunCommand(ctx, appendOplogNoteCmd) - var err error - if response, err = ret.Raw(); err != nil { - return err - } - - return nil - }) - - // When we issue a maxClusterTime lower than any shard's current $cluster_time, we will receive an StaleClusterTime error. - // The command will essentially be a noop on that particular shard. - // Since mongos will broadcast the command to all the shards, this error doesn't affect correctness. - if err != nil && !util.IsStaleClusterTimeError(err) { - return nil, errors.Wrap(err, - "failed to issue appendOplogNote command on source cluster") - } - - // Get the `operationTime` from the response and return it. - rawOperationTime, err := response.LookupErr("operationTime") - if err != nil { - return nil, errors.Wrap(err, - "failed to get operationTime from source cluster's appendOplogNote response") - } - - t, i := rawOperationTime.Timestamp() - return &primitive.Timestamp{T: t, I: i}, nil -}