Doubled buckets #619
Replies: 3 comments 25 replies
-
|
Thanks, amazing work. I especially like how you leveraged TLA+ for this. My comments below.
|
Beta Was this translation helpful? Give feedback.
-
|
Thanks, amazing work. I especially like how you leveraged TLA+ for this. My comments below.
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Associated tickets
Changelog
1. First iteration fixes
optsfield instead of thegenerationin the_bucket;_bucketafter master switchnow doesn't require to be cancelled synchronously;
M.rebalance_allowedto theM.bucket_are_in_sync;2. Second iteration fixes
generationtosend_generation.3. Third iteration fixes
send_generationtogenerationback)4. Fourth iteration fixes
PREPAREDtoREADONLYto comply with another RFC.Allow to skip disabled nodes when syncingpart to alternatives.Disclaimer for users
Currently during rebalance it's unsafe to make vshard requests, if a master change is possible. Read-only requests may return invalid data, and write requests can be lost even after a user has received a write confirmation.
Thus, users of vshard must disable failover prior to moving any bucket before this bunch of problems is fixed in vshard.
P.S. Note, that vshard is supposed to be used via
vshard.router.callAPI, notvshard.router.route*() + rs:call*, which is unsafe.Here's the list of tickets, which must be closed for safe requests during rebalancing when using clean vshard:
This RFC focuses on the last two problems.
However, not only vshard is unsafe, but also high-level products (such as crud (tarantool/crud#448) or tdg (https://github.com/tarantool/tdg2/issues/2175)) use vshard's API unsafely, which leads to even higher chance of broken requests.
Projects which use crud or tdg (not all requests are unsafe here, see the issue in the tdg) cannot rely on RW or RO requests during rebalancing at all, the load on the cluster should be stopped during rebalancing. Requests can read incorrect data, write requests can be lost. After rebalancing end the bucket cache on all of the routers must be emptied.
Here's the list of tickets, which must be closed for safe requests during rebalancing in high-level projects:
router.map_callro#333 (required for tdg)map_callrwwith split args #559 (required for tdg)These vshard issues are already designed and wait to be implemented. The following issues should be resolved by the project themselves:
Disclaimer for reviewers and developers
I'm using TLA+ specification of the storage to proof, that these problems with doubled buckets are real and the changes, I'm proposing in this RFC, solves the problem. You can find the TLA+ spec and the solutions implemented here.
However, note, that sophisticated cases of doubled buckets require big depth of states (e.g. stray TCP issue's depth is 15-19), which in case of general specification (e.g. proofs/tla/test/storage/DoubledBucketsTest.tla) requires hours of execution on my hardware at constant 100% on all cores to reach that depth. And even this spec is very limited (no more that 1 reorder or drop of messages in the network, the maximum number of bucket sends is 2, connection from one instance to another doesn't have more than 3 unprocessed messages, refs are disabled completely, 1 instance in each of the 3 replicasets).
We'll have to test general specifications on stronger hardware, but even then we cannot be sure, that doubled bucket problem is completely solved, since it may be reproduced outside of constraints, we'll set, or it may be missing from TLA spec, since it's just an abstraction of the real code.
But anyway, some testing of the algorithm is anyway better, than nothing. We should move towards developing of the TLA+ spec, since any error in the algorithm of bucket rebalancing causes data loss and inconsistencies.
1. Problem overview
The doubled bucket problem is when there're two or more
ACTIVEbuckets with the samebucket_idon the masters of different shards. This is crucial problem, since routers doesn't know, which of these buckets is the real one and will send requests to the random one, this leads to incorrect reads and lost writes.During incident it was discovered, that it's very easy to achieve doubled buckets in the cluster, if failover is enabled and changes masters (#576). Here's what happened:
Instance 1is the master ofrs1, it sendsbucket Nto another replicaset, makes itSENT, on another shard the bucket isACTIVE. AnotherInstance 2in the replicaset doesn't have that change yet, bucket isACTIVEthere. Master switch happens.Instance 2is the new master, it has replication lag from the old master. It starts to process RW on top ofbucket N, then the update of thebucket Nstate comes from the old master.stoppeddue to RW refs break replication when master changes #573.SENTbucket cannot have RW refs. The bucket remains in theACTIVEstate.ACTIVEbuckets with numberNin the cluster. All of the updates for the bucket in thers1will be lost as soon as replication is restored and the bucket becomes deleted.If
Instance 2doesn't have RW refs in the 3rd step, then the data is lost even without breaking the replication. But this will happen for a small amount of time.This problem has really small depth (6-7) in the TLA spec (see MasterDoubledTest.tla or DoubledBucketsSmallTest.tla) and consequently can be reproduced very easily. Very dangerous bug. It probably happens in a cluster pretty often, but the consistency of requests just wasn't checked.
There's other way doubled buckets can happen (#214), which is not related to master switch:
rs1sends a bucket tors2. The bucket becomesSENDINGonrs1, but the message, with is supposed to addRECEIVINGtors2is lost.rs1timeouts on sending bucket, recreates the connection, checks bucket on thers2, makes the bucketACTIVE.rs1sends the same bucket tors3.rs2arrives, makes the bucket onrs2RECEIVING.rs2's recovery goes tors1, doesn't find the bucket there (or finds it there asSENTorGARBAGE) and makes the bucketACTIVE.ACTIVEonrs2andrs3.The important words here is "recreate the connection", since TCP guarantees the order of messages in the scope of one connection, but after recreation the new request (checking for recovery) may come earlier, than the older one (bucket_recv call).
The depth of the problem is 15-19 in the spec (see StrayTCPDoubledTest.tla and DoubledBucketsTest.tla). It's very difficult to encounter such situation in real life, but it's still possible and we should fix it too.
2. Solution
2.0 Summary
From now on rebalancing of replicaset won't work, if there're unavailable nodes in the replicaset. Even if a replica is manually disabled, we cannot allow rebalancing in the replicaset, it's way too unsafe, since user may enable the instance back at any time. We expect user to have fully working cluster. Rebalancing happens very rarely and we can require that from user, IMHO.
rebalance_apply_routesprocess works as follows (please, note, that another RFC for #573 also addsREADONLYstate of the bucket):rw_lockon them, no new RW refs locally. Solution for Rebalancer should firstly try to pick buckets which already have no refs #351.SENDINGand increment theirgenerationversion (persisted), wait for vclock of all connected nodes to reach the master's vclock. Solution for Rebalancer should firstly try to pick buckets which already have no refs #351.generationversion tobucket_recv().After a node becomes a master:
_bucketspace. Needed for Stray TCP message with big delay may duplicate a bucket #214.rebalancer_apply_routesorrecovery_bucket_statbefore these checks are completed.Solution for #214. Recovery uses bucket
generationfrom now on. So, firstly, the recovery goes to the sender:generation, local one isGARBAGE;generationis equal to the local one, we use the old logic;The format of the
_bucketspace looks as follows,optsis the new field, it'smapand it's nullable for backward compatibility:2.1 Doubled buckets after master switch (#576)
Solution summary: after making the bunch of buckets
SENDINGwe synchronize with our replicaset and do not send, until all of the nodes confirm the update.In order to reproduce that issue, the new master must have replication lag from the old master and do not get the update of the sent bucket statuses. This way we get the situation, when there's two ACTIVE buckets until the new master synchronizes with the old one.
The simplest solution here is to make the current master synchronize with all instances in the replicaset as soon as it makes the bucket
SENDING. The master cannot make the bucketSENTuntil all of the nodes confirmed, that they have the bucket inSENDING.But in order to to fortify the checks I propose not to start sending bucket to another replicaset at all until all the nodes confirm the
SENDINGbucket state. This also decreases the load on network, if some nodes in the replicaset are unavailable: syncing with replicaset is way cheaper, than sending a data from a bucket and should be done first.We don't restore the state of bucket in the
bucket_send(), if we didn't manage to sync with replicasets during timeout. Recovery service will do that for us in the future, just don't forget to wakeup it in case of error.Rejected alternatives for #576
Alternative 1. Synchro _bucket space
In case of synchro
_bucketand election failover it's enough to make sure that the majority of them got the change of theSENDINGbucket. The node, which didn't get that change won't be able to become a master in the future, so the new master will always have a consistent state of the_bucket.The only problem here is that, Tarantool is not ready right now to become synchronous only DB and we cannot make the
_bucketsync, while there're async spaces. Otherwise, user will start to encounterSplitBrainreplication errors.Alternative 2. Allow to skip disabled nodes when syncing
It was proposed by @sergepetrenko to allow rebalancing, when there're disabled nodes in the replicaset, and skip them, when we need to sync the
SENDINGorREADONLYstate. However, this cannot be done, because it's way too unsafe. Even allowing excluded from the configuration nodes to be skipped is unsafe (described in the #623, leave as is for now).Our own above-standing projects may disable/enable instances, when they want to (now cartridge disables instance, when the roles wasn't applied (applying is not done yet or error during role applying) and the storage may remain in this disabled state as long as it's needed, at any time the node may become enabled again).
Now we consider the situation:
Why expelling is better? Because expelling deletes the node from the _cluster space, which makes the node to rejoin (that guarantees us, that the node will have correct bucket state after rejoin is done) 1. We have the same logic in Tarantool 3, but it's not enabled by default 2.
2.2 Doubled buckets after stray TCP (#214)
Solution summary: recovery uses bucket
generations and fullscans the masters of a cluster. Recovery and rebalancer are not started until the new master synchronzies with other instances in the replicaset.We have a problem with clean stray TCP, described in the "Problem overview" part. The simplest way, which may seem to solve it, is to make the recovery service fullscan the cluster for buckets. But unfortunately, it doesn't work (see alternatives), so we must move towards bucket versioning here.
From now on
_bucketspace will look like that:The new field is added:
opts, which has themaptype. This is done in order to simplify adding new fields in the future. Theoptsis nullable for backward compatibility. There may be only one option for now:opts.generation(below I refer to it asgenerationinstead ofopts.generation. We wantgenerationto be persistent, so that after restart recovery could still work.When bucket is sent, its
generationis incremented (we make the bucketSENDINGand incrementgenerationin one transaction) and is sent alongside the data of the bucket to thebucket_recv, the receiver side persists thatgenerationin the_bucket.Recovery uses that generation in order to distinguish, which bucket is more recent, if it cannot find a bucket on the sender node. So, firstly, the node goes to the sender, if there's a bucket with any state and greater generation, local one is
GARBAGE, we don't care about the status here. If bucket generation is equal to the local one, we use the old logic, if the bucket is missing from remote node, then fullscan all masters of the cluster. When all of the nodes replied, if there exists higher generation, the local isGARBAGE,ACTIVEotherwise.Why do we scan only for masters, you may ask? The following can happen:
rs1sends the bucket tors2, message is lost, bucket is recovered. Generation is 2 onrs1andrs2.rs1to thers3. Generation is 3 onrs1andrs3.rs3, new node doesn't have the bucket at all.rs2gets the message, makes the bucketRECEIVINGwithgeneration2, scans masters only, doesn't find the bucket at all and recovers it toACTIVE.rs3gets the change from the old master, twoACTIVEbuckets.In order to fight with this we make the new master sync the
_bucketspace. And before this sync happens, the node doesn't start rebalancer or recovery services, it also doesn't respond to the requests for these services from another nodes (e.g.rebalance_apply_routesorrecovery_bucket_stat.Here is either fullscan of ALL replicas in the cluster, or new master always synces with its replicaset. And I like the latter more, since the number of replicas may be 2 - 5 times the number of masters (it really can, I've never seen more that 3, but I've heard of them).
This becomes a new background service
on_master_enable_service, which will be started, when instance becomes RW inautomode, and during reconfiguration in case of "manual" master mode. It's not a loop (which is common among other services), but a function, which is executed in background once the node becomes master and then exits.It's a service, since we should not block the reconfiguration for that. It's crucial that before starting the new service the old one already cannot interfere in our flags, so we use
testcancelbefore any change of theM.bucket_are_in_sync(see below).Right before starting the new service old one is cancelled, the flag
M.bucket_are_in_syncwill be set tofalseand the rebalancer and recovery services won't be able to start until it becomestrue, the node also doesn't respond to requests from these services.This service will wait for the current node to reach vclock of all instances in the replicaset in order to get the latest updates from the
_bucketspace. As soon as these conditions are satisified, theM.bucket_are_in_syncis set totrue, rebalancing is allowed, the service dies, now recovery and rebalancer can do their stuff.Rejected alternatives for #214
Alternative 1. Recovery fullscan and that's it
The simplest solution here may seem to make recovery service fullscan the cluster, if it finds the bucket in
SENT,GARBAGEstate or missing on the sender. But, unfortunately, it won't work:rs1sends the bucket tors2, message is lost,rs1recovers the bucket fromSENDINGtoACTIVE.rs2sends the bucket tors3, makes the bucketSENT, the message, which is supposed to make the bucketACTIVEonrs3is lost (final = truein code).rs2andrs3are supposed to decide, which of theirRECEIVINGbuckets is correct, but they won't be able to do that without knowing, which of them came later.The recovery should make the bucket on
rs3ACTIVE, since it may have data, which was written by thers1, while the bucket wasACTIVEbetween bucket sends. The bucket onrs2consequently should becomeGARBAGE.Fullscan won't work, to my deepest regret. We need to apply more sophisitcated solution here.
Alternative 2. Never delete the bucket info from space, even for transferred buckets.
This is the alternative solution for fullscan of the cluster, when the bucket is not found.
I know clients with tens (or even hunderds) of millions buckets, and that solution requires storing the versions of all buckets in the cluster (at least when the cluster worked long enough), and I'm afraid that it may become costly to store.
Rebalancing happens once a year or smth like that, it happens, when load on the cluster is disabled or minimal. It's better to increase the load on the network once a year than make all users to pay with RAM for rebalancing.
Alternative 3.
generationfield instead of theoptsin the_bucketThe initial proposition was to use the following format:
What if in the future we'll want to persist some new info about the buckets, then we'll have to add new fields to the bucket again and it'll become similar to the
_funcspace:I propose not to add the
generationfield but instead introduce the newoptsfield, which will have themaptype. It'd be great to place thedestinationthere too, but I'm afraid we might break too many above standing code (in our products and also in the client's code).We could build indexes other the
map, when we'll need them in the future: linkAlternative 4. Try using the generation for making discovery cheaper
It'd be great, if we could use
generationin the future in order to make the discovery cheaper: link. Unfortunately, thegenerationwhich is proposed in the RFC, won't help us to solve the issue. It's proposed to useprev_generation_of_this_bucket + 1, which won't allow us to make the discovery cheaper: rs1 sends bucket 1 to rs2, the generation on rs1 and rs2 becomes 1 (maximum over generations), then rs1 sends bucket 2 to rs2, the generation remains 1 on both storages, router won't be able to notice the change, even though it should.So, assigning new generation as
max_local_generation + 1is required for that to work. But this will break the logic of doubled bucket generation:The generation which is needed for making discovery process cheaper, must have absolutely another logic from the
generation, which is used for deudplication of the buckets.These two different generations can be merged into one (see bucket vclock below), but this is way too costly memory-wise, and we decided to add new generation to
optsfor making discovery cheaper, when it'll be needed.Vclock for buckets
Drop
generation, let's savevclockfor every bucket instead! The vclock looks as follows:In the RFC for doubled buckets all checks are done only for
destinationfield: e.g. if bucket is not found on the sender, then we fullscan the cluster and check only thedestination(sender id) component of the vclock. It'll work the same: if found vclock[sender] > local vclock[sender], then GARBAGE,ACTIVEotherwise.Now to the discovery process:
1. Storage
When storage starts, it initializes bucket vclock as {<r_id> =
max over all vclock[r_id]}. It updates the maximums in that bucket vclock from theon_replacetrigger on_bucket. There're N indexes, built over every replicaset from vclock on_bucket. When replicaset is deleted, index is dropped, when added - new index is created. All of the fields in vclocks are nullable, null = 0.2. Router
When router starts, it has a table
{rs_id = <vclock>}, all vclocks are initialized with 0s. It goes to every replicaset with this vclock, if the keys in vclock (replicaset_ids) from the router and storage doesn't match, storage returns error, incorrect configuration. Then storage goes over vclock from the router, for every replicaset it sends buckets with bsn > router's. The storage also returns the maximum vclock of sent buckets, so that router can iteratively update the map.There're still problems with redownloading of the map on reconnect/storage restart, if the max bucket vclock is not persisted. But speaking of
Can't download removal of sent and deleted bucketspart, I don't think it's a problem: yeah, most of the storages will just delete the buckets, they will send nothing in response to discovery of the router, but one of replicasets will get a lot of buckets, which will be sent to router and it will update the route map, as needed.Beta Was this translation helpful? Give feedback.
All reactions