diff --git a/Makefile b/Makefile index 2239ed7e..e5ecbb29 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,7 @@ docker-run: -p 8080:8080 \ --name ${container_name} \ ${image}:${image_tag} + docker rm ${container_name} docker-new: docker-build docker-run @@ -85,6 +86,8 @@ docker-rm: docker-stop compose_file = docker/mongodb-cluster.yml ifeq ($(db_setup),redis) compose_file = docker/redis-cluster.yml +else ifeq ($(db_setup),mongodb-full) + compose_file = docker/mongodb-cluster-full.yml endif compose-new: diff --git a/docker/mongodb-cluster-full.yml b/docker/mongodb-cluster-full.yml new file mode 100644 index 00000000..c3da8a11 --- /dev/null +++ b/docker/mongodb-cluster-full.yml @@ -0,0 +1,87 @@ +version: '3' + +services: + mongodb0: + container_name: mongodb0 + image: mongo + ports: + - 27017:27017 + networks: + - mongo + depends_on: + - mongodb1 + - mongodb2 + links: + - mongodb1 + - mongodb2 + restart: always + entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "mongoReplSet" ] + + mongoinit: + image: mongo + volumes: + - ./mongodb/init.sh:/scripts/init.sh + networks: + - mongo + depends_on: + - mongodb0 + links: + - mongodb0 + restart: "no" + entrypoint: [ "bash", "-c", "sleep 10 && /scripts/init.sh"] + + mongodb1: + container_name: mongodb1 + image: mongo + ports: + - 27018:27017 + networks: + - mongo + restart: always + entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "mongoReplSet" ] + + mongodb2: + container_name: mongodb2 + image: mongo + ports: + - 27019:27017 + networks: + - mongo + restart: always + entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "mongoReplSet" ] + + tcs0: + build: + context: .. + dockerfile: Dockerfile + image: multi-factor-auth:latest + ports: + - 8080:8080 + networks: + - mongo + + tcs1: + build: + context: .. + dockerfile: Dockerfile + image: multi-factor-auth:latest + ports: + - 8081:8080 + networks: + - mongo + + nginx: + image: nginx:latest + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro + depends_on: + - tcs0 + - tcs1 + networks: + - mongo + ports: + - 5000:5000 + +networks: + mongo: + driver: bridge diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf new file mode 100644 index 00000000..c0ffecf0 --- /dev/null +++ b/docker/nginx/nginx.conf @@ -0,0 +1,58 @@ +user www-data; +worker_processes auto; +worker_rlimit_nofile 32768; + +error_log /var/log/nginx/error.log; +pid /var/run/nginx.pid; +include /usr/share/nginx/modules/*.conf; +debug_points abort; + +events { + worker_connections 8192; +} + +http { + real_ip_header X-Forwarded-For; + set_real_ip_from 10.0.0.0/8; + proxy_cache_path /var/cache/nginx/cache keys_zone=elasticsearch:10m inactive=60m; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + error_log /var/log/nginx/error.log; + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + types_hash_max_size 2048; + include /etc/nginx/mime.types; + default_type application/octet-stream; + + upstream tcs { + server tcs0:8080; + server tcs1:8080; + keepalive 16; + } + + server { + listen 5000; + server_name tcs_proxy; + + location / { + proxy_http_version 1.1; + + proxy_connect_timeout 5s; + proxy_read_timeout 10s; + + client_max_body_size 100M; + + proxy_pass http://tcs; + + # timeout for an idle keepalive connection to an upstream server will stay open + keepalive_timeout 50s; + } + } +} diff --git a/go.sum b/go.sum index 4d6f754c..d43ba24a 100644 --- a/go.sum +++ b/go.sum @@ -141,7 +141,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/mongodb/dbClient.go b/mongodb/dbClient.go index 96849870..93889ff4 100644 --- a/mongodb/dbClient.go +++ b/mongodb/dbClient.go @@ -5,11 +5,14 @@ import ( "fmt" "github.com/multiversx/multi-factor-auth-go-service/core" + "github.com/multiversx/multi-factor-auth-go-service/handlers/storage" logger "github.com/multiversx/mx-chain-logger-go" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" ) var log = logger.GetOrCreate("mongodb") @@ -20,8 +23,12 @@ type CollectionID string const ( // UsersCollectionID specifies mongodb collection for users UsersCollectionID CollectionID = "users" + + // IndexCollectionID specifies mongodb collection for global index + IndexCollectionID CollectionID = "index" ) +const numInitialShardChunks = 4 const incrementIndexStep = 1 const minNumUsersColls = 1 @@ -35,6 +42,11 @@ type counterMongoEntry struct { Value uint32 `bson:"value"` } +type otpInfoWrapper struct { + Key string `bson:"_id"` + OTPInfo *core.OTPInfo `bson:"otpinfo"` +} + type mongodbClient struct { client *mongo.Client db *mongo.Database @@ -111,6 +123,36 @@ func (mdc *mongodbClient) Put(collID CollectionID, key []byte, data []byte) erro opts := options.Update().SetUpsert(true) + log.Trace("Put", "key", string(key), "value", string(data)) + + _, err := coll.UpdateOne(mdc.ctx, filter, update, opts) + if err != nil { + return err + } + + return nil +} + +func (mdc *mongodbClient) PutStruct(collID CollectionID, key []byte, data *core.OTPInfo) error { + coll, ok := mdc.collections[collID] + if !ok { + return ErrCollectionNotFound + } + + otpInfo := &otpInfoWrapper{ + Key: string(key), + OTPInfo: data, + } + + filter := bson.M{"_id": string(key)} + update := bson.M{ + "$set": otpInfo, + } + + opts := options.Update().SetUpsert(true) + + log.Trace("PutStruct", "key", string(key), "value", data.LastTOTPChangeTimestamp) + _, err := coll.UpdateOne(mdc.ctx, filter, update, opts) if err != nil { return err @@ -147,12 +189,49 @@ func (mdc *mongodbClient) Get(collID CollectionID, key []byte) ([]byte, error) { return nil, err } + log.Trace("Get", "key", string(key)) + return entry.Value, nil } +func (mdc *mongodbClient) GetStruct(collID CollectionID, key []byte) (*core.OTPInfo, error) { + coll, ok := mdc.collections[collID] + if !ok { + return nil, ErrCollectionNotFound + } + + filter := bson.D{{Key: "_id", Value: string(key)}} + + entry := &otpInfoWrapper{} + err := coll.FindOne(mdc.ctx, filter).Decode(entry) + if err != nil { + return nil, err + } + + return entry.OTPInfo, nil +} + // Has will return true if the provided key exists in the collection func (mdc *mongodbClient) Has(collID CollectionID, key []byte) error { _, err := mdc.findOne(collID, key) + log.Trace("Has", "key", string(key)) + return err +} + +func (mdc *mongodbClient) HasStruct(collID CollectionID, key []byte) error { + coll, ok := mdc.collections[collID] + if !ok { + return ErrCollectionNotFound + } + + filter := bson.D{{Key: "_id", Value: string(key)}} + + entry := &otpInfoWrapper{} + err := coll.FindOne(mdc.ctx, filter).Decode(entry) + if err != nil { + return err + } + return err } @@ -191,6 +270,104 @@ func (mdc *mongodbClient) GetIndex(collID CollectionID, key []byte) (uint32, err return entry.Value, nil } +// ReadWriteWithCheck will perform read and write operation with a provided checker +func (mdc *mongodbClient) ReadWriteWithCheck( + collID CollectionID, + key []byte, + checker func(data interface{}) (interface{}, error), +) error { + session, err := mdc.client.StartSession() + if err != nil { + return err + } + defer session.EndSession(mdc.ctx) + + wc := writeconcern.New(writeconcern.WMajority()) + txnOptions := options.Transaction().SetWriteConcern(wc) + txnOptions.SetReadPreference(readpref.Primary()) + + sessionCallback := func(ctx mongo.SessionContext) error { + err := session.StartTransaction(txnOptions) + if err != nil { + return err + } + + coll, ok := mdc.collections[collID] + if !ok { + return ErrCollectionNotFound + } + + filter := bson.M{"_id": string(key)} + + entry := &otpInfoWrapper{} + err = coll.FindOne(ctx, filter).Decode(entry) + if err != nil { + _ = session.AbortTransaction(ctx) + return err + } + + retValue, err := checker(entry.OTPInfo) + if err != nil { + _ = session.AbortTransaction(ctx) + return err + } + retValueBytes, ok := retValue.(*core.OTPInfo) + if !ok { + _ = session.AbortTransaction(ctx) + return core.ErrInvalidValue + } + + otpInfo := &otpInfoWrapper{ + Key: string(key), + OTPInfo: retValueBytes, + } + + update := bson.M{ + "$set": otpInfo, + } + + opts := options.Update().SetUpsert(true) + + _, err = coll.UpdateOne(mdc.ctx, filter, update, opts) + if err != nil { + _ = session.AbortTransaction(ctx) + return err + } + + return session.CommitTransaction(ctx) + } + + err = mongo.WithSession(mdc.ctx, session, + func(sctx mongo.SessionContext) error { + return runTxWithRetry(sctx, sessionCallback) + }, + ) + if err != nil { + return err + } + + return nil +} + +func runTxWithRetry(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error { + for { + err := txnFn(sctx) + if err == nil { + return nil + } + + log.Trace("Transaction aborted. Caught exception during transaction.") + + cmdErr, ok := err.(mongo.CommandError) + if ok && cmdErr.HasErrorLabel("TransientTransactionError") { + log.Trace("TransientTransactionError, retrying transaction...") + continue + } + + return err + } +} + // PutIndexIfNotExists will set an index value to the specified key if not already exists func (mdc *mongodbClient) PutIndexIfNotExists(collID CollectionID, key []byte, index uint32) error { coll, ok := mdc.collections[collID] @@ -247,6 +424,29 @@ func (mdc *mongodbClient) IncrementIndex(collID CollectionID, key []byte) (uint3 return entry.Value, nil } +// ShardHashedCollection will shard collection with a hashed shard key +func (mdc *mongodbClient) ShardHashedCollection(collID CollectionID) error { + coll, ok := mdc.collections[collID] + if !ok { + return ErrCollectionNotFound + } + + collectionPath := fmt.Sprintf("%s.%s", mdc.db.Name(), coll.Name()) + + cmd := bson.D{ + {Key: "shardCollection", Value: collectionPath}, + {Key: "key", Value: bson.D{{Key: "_id", Value: "hashed"}}}, + {Key: "numInitialChunks", Value: numInitialShardChunks}, + } + + err := mdc.db.RunCommand(mdc.ctx, cmd).Err() + if err != nil { + return err + } + + return nil +} + // Close will close the mongodb client func (mdc *mongodbClient) Close() error { err := mdc.client.Disconnect(mdc.ctx) diff --git a/mongodb/dbClient_test.go b/mongodb/dbClient_test.go index 653cbc3c..4f0f1cff 100644 --- a/mongodb/dbClient_test.go +++ b/mongodb/dbClient_test.go @@ -303,6 +303,103 @@ func TestMongoDBClient_Remove(t *testing.T) { }) } +// func TestMongoDBClient_IncrementWithTransaction(t *testing.T) { +// t.Parallel() + +// mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) +// defer mt.Close() + +// mt.Run("failed to create session", func(mt *mtest.T) { +// mt.Parallel() + +// client, err := mongodb.NewClient(mt.Client, "dbName", 1) +// require.Nil(mt, err) + +// mt.AddMockResponses( +// mtest.CreateCommandErrorResponse(mtest.CommandError{ +// Code: 1, +// Message: expectedErr.Error(), +// }), +// ) + +// _, err = client.IncrementIndex(usersCollID, []byte("key1")) +// require.Equal(mt, expectedErr.Error(), err.Error()) +// }) + +// mt.Run("should work", func(mt *mtest.T) { +// mt.Parallel() + +// mt.AddMockResponses( +// mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{ +// {Key: "_id", Value: "key1"}, +// {Key: "value", Value: 1}, +// }), +// ) + +// client, err := mongodb.NewClient(mt.Client, "dbName", 1) +// require.Nil(mt, err) + +// val, err := client.IncrementIndex(usersCollID, []byte("key1")) +// require.Nil(mt, err) +// require.Equal(mt, uint32(1), val) +// }) +// } + +func TestMongoDBClient_ReadWriteWithCheck(t *testing.T) { + t.Parallel() + + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("failed to create session", func(mt *mtest.T) { + mt.Parallel() + + client, err := mongodb.NewClient(mt.Client, "dbName", 1) + require.Nil(mt, err) + + mt.AddMockResponses( + mtest.CreateCommandErrorResponse(mtest.CommandError{ + Code: 1, + Message: expectedErr.Error(), + }), + ) + + checker := func(data interface{}) (interface{}, error) { + return nil, nil + } + + err = client.ReadWriteWithCheck(usersCollID, []byte("key1"), checker) + require.Equal(mt, expectedErr.Error(), err.Error()) + }) + + mt.Run("should work", func(mt *mtest.T) { + mt.Parallel() + + client, err := mongodb.NewClient(mt.Client, "dbName", 1) + require.Nil(mt, err) + + mt.AddMockResponses( + mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{ + {Key: "_id", Value: "key1"}, + {Key: "otpinfo", Value: &core.OTPInfo{LastTOTPChangeTimestamp: 101}}, + }), + mtest.CreateSuccessResponse(), + mtest.CreateSuccessResponse(), + mtest.CreateSuccessResponse(), + ) + + checker := func(data interface{}) (interface{}, error) { + if data.(*core.OTPInfo).LastTOTPChangeTimestamp == 101 { + return &core.OTPInfo{}, nil + } + return nil, errors.New("error") + } + + err = client.ReadWriteWithCheck(usersCollID, []byte("key1"), checker) + require.Nil(mt, err) + }) +} + func TestMongoDBClient_IncrementIndex(t *testing.T) { t.Parallel() diff --git a/mongodb/integrationTests/mongo_test.go b/mongodb/integrationTests/mongo_test.go new file mode 100644 index 00000000..536e43f7 --- /dev/null +++ b/mongodb/integrationTests/mongo_test.go @@ -0,0 +1,55 @@ +package integrationtests + +// func TestMongoDBClient_ConcurrentCalls(t *testing.T) { +// t.Parallel() + +// if os.Getenv("CI") != "" { +// t.Skip("Skipping testing in CI environment") +// } + +// inMemoryMongoDB, err := memongo.StartWithOptions(&memongo.Options{MongoVersion: "4.4.0", ShouldUseReplica: true}) +// require.Nil(t, err) +// defer inMemoryMongoDB.Stop() + +// client, err := mongodb.CreateMongoDBClient(config.MongoDBConfig{ +// URI: inMemoryMongoDB.URI(), +// DBName: memongo.RandomDatabase(), +// }) +// require.Nil(t, err) + +// checker := func(data interface{}) (interface{}, error) { +// return &core.OTPInfo{}, nil +// } + +// err = client.PutStruct(mongodb.UsersCollectionID, []byte("key"), &core.OTPInfo{LastTOTPChangeTimestamp: 101}) +// require.Nil(t, err) + +// numCalls := 60 + +// var wg sync.WaitGroup +// wg.Add(numCalls) +// for i := 0; i < numCalls; i++ { +// go func(idx int) { +// switch idx % 5 { +// case 0: +// err := client.PutStruct(mongodb.UsersCollectionID, []byte("key"), &core.OTPInfo{LastTOTPChangeTimestamp: 101}) +// require.Nil(t, err) +// case 1: +// _, err := client.GetStruct(mongodb.UsersCollectionID, []byte("key")) +// require.Nil(t, err) +// case 2: +// require.Nil(t, client.HasStruct(mongodb.UsersCollectionID, []byte("key"))) +// case 3: +// err := client.ReadWriteWithCheck(mongodb.UsersCollectionID, []byte("key"), checker) +// require.Nil(t, err) +// case 4: +// _, err := client.UpdateTimestamp(mongodb.UsersCollectionID, []byte("key"), 0) +// require.Nil(t, err) +// default: +// assert.Fail(t, "should not hit default") +// } +// wg.Done() +// }(i) +// } +// wg.Wait() +// }