-
Notifications
You must be signed in to change notification settings - Fork 1
Integrate mongo db client #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/mongo-db-concurrency
Are you sure you want to change the base?
Conversation
This reverts commit 5b336f3.
core/interface.go
Outdated
| IsInterfaceNil() bool | ||
| } | ||
|
|
||
| type StorageWithIndexChecker interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| // ShardedStorageFactory defines the methods available for a sharded storage factory | ||
| type ShardedStorageFactory interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps this interface should be renamed as well to StorageWithIndexFactory or similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| return handler.bucket.Has(key) | ||
| } | ||
|
|
||
| func (handler *bucketIndexHandler) UpdateWithCheck(key []byte, fn func(data interface{}) (interface{}, error)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing comment
also, this should be protected by a mutex(which should be different than the current mut which is used for last index key), as another Put may happen right between Get and Put..and reuse the mutex on Put/Get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we would be able to use the new keymutex defined in the other PR, to get a critical section per same key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| mut sync.RWMutex | ||
| } | ||
|
|
||
| // NewMongoDBIndexHandler returns a new instance of a bucket index handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // NewMongoDBIndexHandler returns a new instance of a bucket index handler | |
| // NewMongoDBIndexHandler returns a new instance of a mongo db index handler |
| return bucket.Has(key) | ||
| } | ||
|
|
||
| func (sswi *shardedStorageWithIndex) UpdateWithCheck(key []byte, fn func(data interface{}) (interface{}, error)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing comment
mongodb/mongoDBClientFactory.go
Outdated
| ) | ||
|
|
||
| const ( | ||
| connectTimeoutSec = 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we consider moving these constants to MongoDBConfig ?
mongodb/mongodbClientWrapper.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| } | ||
|
|
||
| return nil | ||
| PutCalled func(coll mongodb.CollectionID, key []byte, data []byte) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file name should be renamed as well
| return uint32(len(mock.cache)), nil | ||
| } | ||
|
|
||
| func (mock *shardedStorageWithIndexMock) UpdateWithCheck(key []byte, fn func(data interface{}) (interface{}, error)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing comment
core/interface.go
Outdated
| Close() error | ||
| AllocateBucketIndex() (uint32, error) | ||
| GetLastIndex() (uint32, error) | ||
| UpdateWithCheck(key []byte, fn func(data interface{}) (interface{}, error)) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps rename this interface to IndexHandler, as now it is implemented by mongodbIndexHandler as well
| } | ||
|
|
||
| err = handler.saveNewIndex(0) | ||
| err = saveNewIndex(handler.bucket, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this changed to give the bucket as parameter instead of having saveNewIndex as method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i changed to be able to use saveNewIndex also in mongoIndexHandler
| } | ||
|
|
||
| // ShardedStorageFactory defines the methods available for a sharded storage factory | ||
| type ShardedStorageFactory interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| return handler.bucket.Has(key) | ||
| } | ||
|
|
||
| func (handler *bucketIndexHandler) UpdateWithCheck(key []byte, fn func(data interface{}) (interface{}, error)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we would be able to use the new keymutex defined in the other PR, to get a critical section per same key.
mongodb/dbClient.go
Outdated
| filter := bson.D{{Key: "_id", Value: string(key)}} | ||
|
|
||
| entry := &mongoEntry{} | ||
| err := coll.FindOne(sessCtx, filter).Decode(entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use this instead?
https://www.mongodb.com/docs/v6.0/reference/method/db.collection.findOneAndUpdate/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i did not use it initially since we store counter as byte array and we need to split the operation to do the conversions
| return nil, err | ||
| } | ||
|
|
||
| bucketIndexHandlers := make(map[uint32]core.BucketIndexHandler, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need the len
| return nil, err | ||
| } | ||
|
|
||
| bucketIDProvider, err := bucket.NewBucketIDProvider(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave a todo to refactor the bucket usage with mongo, I think we can do without it as we anyway don't have direct control on the shards
| return handler, nil | ||
| } | ||
|
|
||
| err = saveNewIndex(handler.storer, initialIndexValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not safe if we configure multiple MFA instances
it should be through a transaction instead, like allocate Index.
also since we always write at the same key, this can be a bottleneck (will not be more performant no matter how many shards are configured as the same key will always be allocated to a single shard). - maybe add a todo here to see how we can improve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is no provided way from Mongo, maybe we can use the same bucket mechanism we have with localDB? (it would require that we can specify the shard in which we write/ from which we read)
|
Blocked for now, currently it is not a priority. It has to be refactored to use mongodb transactions in a cleaner way in service resolver. |
No description provided.