diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index a43baeae485..50df08a212a 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -294,7 +294,8 @@ handle_compact_req(#httpd{method = 'POST'} = Req, Db) -> chttpd:validate_ctype(Req, "application/json"), case Req#httpd.path_parts of [_DbName, <<"_compact">>] -> - ok = fabric:compact(Db), + SrcGen = list_to_integer(chttpd:qs_value(Req, "gen", "0")), + ok = fabric:compact({Db, SrcGen}), send_json(Req, 202, {[{ok, true}]}); [DbName, <<"_compact">>, DesignName | _] -> case ddoc_cache:open(DbName, <<"_design/", DesignName/binary>>) of @@ -427,9 +428,10 @@ handle_design_info_req(Req, _Db, _DDoc) -> create_db_req(#httpd{} = Req, DbName) -> couch_httpd:verify_is_server_admin(Req), ShardsOpt = parse_shards_opt(Req), + MaxGenOpt = parse_generations_opt(Req), EngineOpt = parse_engine_opt(Req), DbProps = parse_partitioned_opt(Req), - Options = lists:append([ShardsOpt, [{props, DbProps}], EngineOpt]), + Options = lists:append([ShardsOpt, MaxGenOpt, [{props, DbProps}], EngineOpt]), DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), case fabric:create_db(DbName, Options) of ok -> @@ -853,6 +855,19 @@ db_req(#httpd{method = 'GET', path_parts = [_, <<"_revs_limit">>]} = Req, Db) -> send_json(Req, fabric:get_revs_limit(Db)); db_req(#httpd{path_parts = [_, <<"_revs_limit">>]} = Req, _Db) -> send_method_not_allowed(Req, "PUT,GET"); +db_req(#httpd{method = 'PUT', path_parts = [_, <<"_max_generation">>]} = Req, Db) -> + Options = [{user_ctx, Req#httpd.user_ctx}], + case chttpd:json_body(Req) of + MaxGen when is_integer(MaxGen), MaxGen > 0 -> + case fabric:set_max_generation(Db, MaxGen, Options) of + ok -> + send_json(Req, {[{<<"ok">>, true}]}); + Error -> + throw(Error) + end; + _ -> + throw({bad_request, "`max_generation` must be positive integer"}) + end; db_req(#httpd{method = 'PUT', path_parts = [_, <<"_purged_infos_limit">>]} = Req, Db) -> Options = [{user_ctx, Req#httpd.user_ctx}], case chttpd:json_body(Req) of @@ -1983,6 +1998,10 @@ parse_shards_opt(Param, Req, Default) -> false -> throw({bad_request, Err}) end. +parse_generations_opt(Req) -> + Val = chttpd:qs_value(Req, "gen", "0"), + [{max_generation, list_to_integer(Val)}]. + parse_engine_opt(Req) -> case chttpd:qs_value(Req, "engine") of undefined -> diff --git a/src/couch/doc/generational-compaction/crash-safety.md b/src/couch/doc/generational-compaction/crash-safety.md new file mode 100644 index 00000000000..f396e127e3d --- /dev/null +++ b/src/couch/doc/generational-compaction/crash-safety.md @@ -0,0 +1,241 @@ +# Crash safety + +In 3.4 the process for ending compaction in `finish_compaction` goes like this: + + 0. state during compaction: + + db.couch + db.couch.compact.data + db.couch.compact.meta + + 1. rename the data file: + + db.couch + db.couch.compact + db.couch.compact.meta + + 2. delete the original DB file: + + db.couch.compact + db.couch.compact.meta + + 3. rename the .compact file: + + db.couch + db.couch.compact.meta + + 4. delete the meta file: + + db.couch + +Crash safety: if a crash occurs after step 2, then on restart the +`db.couch.compact` file is taken to be the canonical file, renamed to +`db.couch`, and we continue as normal. + +Question: why can't we get the same behaviour by directly renaming +`db.couch.compact.data` to `db.couch`, producing the same state as after step 3? +Rename in the same directory is atomic and we should have stopped writing to the +original `db.couch` file in order to perform this switch-over. + +Answer: this might not be true on Windows, you cannot remove a file that is in +use, we have to close `db.couch`, remove it, then rename the compacted file into +its place. The rename of `.compact.data` to `.compact` signals a state where +compaction completed and we were in the middle of swapping the files over, so +nothing new has been written to `db.couch` and we can resume by using +`db.couch.compact`. This is not necessarily the case if `db.couch.compact.data` +still exists. + +After `open_db_file` returns, the code in `init` that reads/creates the DB +header also deletes all the compaction files (`.compact`, `.compact.data`, +`.compact.meta`) if the header was newly created, rather than read from existing +file data. + +For generational compaction, there are several cases to consider. + + +## A. Compacting gen 0 + +In this case we're appending to `db.1.couch`; references to new data written +there ends up in `db.couch.compact.data`, along with references to _existing_ +data that was already in `db.{1,2,...}.couch`. The following files exist during +compaction: + + db.couch + db.1.couch + db.couch.compact.data + db.couch.compact.meta + +On completion, none of the generational files will be removed. Therefore all +pointers in `db.couch` and `db.couch.compact.data` remain valid and we are free +to use either file as our canonical DB file. The original cleanup procedure can +be used without modification. + + +## B. Compacting gen 1, 2, ... + +One generation up from 0, we have data being moved from gen 1 to 2, or in +general from G to G+1. Files existing during compaction are: + + db.couch + db.1.couch + db.2.couch + db.couch.compact.data + db.couch.compact.meta + +When compacting gen 1, if `db.couch` points to gen 1 then +`db.couch.compact.data` will refer to gen 2. Also, data from `db.couch` has to +be moved to `db.couch.compact.data` in order to avoid being lost on completion. +All pointers to gen 2 and above remain unmodified. + +On completion, `db.1.couch` will be removed on the grounds that all its data has +been moved to `db.2.couch` and nothing new has been written to `db.1.couch`. +This is because the compactor is the only thing that writes to `db.G.couch` and +only one compaction per shard runs at a time. + +However, it is only safe to remove `db.1.couch` when nothing is referring to it +any more, i.e. after `db.couch.compact.data` is renamed to `db.couch.compact`. +Once this has happened, any future DB access will either use `db.couch.compact`, +or the contents of it after moving to `db.couch`, not the original `db.couch`, +and so the old pointers into `db.1.couch` have expired. + +At any point before `db.couch.compact.data` is renamed, the data in `db.1.couch` +is still being referenced, and so it cannot be removed. + +Therefore we can extend the cleanup process to: + +1. Rename `db.couch.compact.data` to `db.couch.compact` +2. Delete `db.1.couch` +3. Delete `db.couch` +4. Rename `db.couch.compact` to `db.couch` +5. Delete `db.couch.compact.meta` + +The reason for putting the deletion of `db.1.couch` as early as possible is to +reduce the set of crash scenarios where this file remains in place. If +`db.1.couch` remains after a crash, this is not _unsafe_ (i.e. it does not cause +a consistency problem or data loss) but it does leave a pile of unreferenced +data that needs to be cleaned up. The simplest way to achieve this would be to +re-run compaction of gen 1, which could be triggered by noticing the file has an +active size of 0. Possibly Smoosh could prioritise the generation with the +largest proportion of garbage when deciding what to compact next. + + +## C. Compacting the last generation + +Say the DB has a maximum generation of 2. This means that normally, the existing +files are: + + db.couch + db.1.couch + db.2.couch + +During compaction of gen 2, a temporary additional generation is created along +with the usual compaction files: + + db.couch + db.1.couch + db.2.couch + db.2.couch.compact.maxgen + db.couch.compact.data + db.couch.compact.meta + +Live data is copied from `db.2.couch` to `db.2.couch.compact.maxgen`, but the +pointers stored in `db.couch.compact.data` refer to gen 2, with the intention +that `db.2.couch.compact.maxgen` will eventually be renamed to `db.2.couch` +rather than letting the generation number grow indefinitely. + +This requires a further change the cleanup process: + +1. Rename `db.couch.compact.data` to `db.couch.compact` +2. Delete `db.2.couch` +3. Rename `db.2.couch.compact.maxgen` to `db.2.couch` +4. Delete `db.couch` +5. Rename `db.couch.compact` to `db.couch` +6. Delete `db.couch.compact.meta` + +A crash after steps 1, 2, or 3 produces one of these states: + + Step 1 Step 2 Step 3 + ------------------------- ------------------------- ------------------------- + db.couch db.couch db.couch + db.2.couch (old) db.2.couch (new) + db.2.couch.compact.maxgen db.2.couch.compact.maxgen + db.couch.compact db.couch.compact db.couch.compact + +These states have an ambiguity to them; all of them will cause `db.couch` to be +used when the DB is re-opened, but it's not clear whether the data in +`db.2.couch` is valid for that file or not. It creates a state where you need to +determine which data to preserved based on the presence of all the other files, +which is complicated. In state 1 you can continue using the old `db.2.couch`, +but in the other states you need to decide to open `db.couch.compact` instead +and possibly clean up the `db.2.couch.compact.maxgen` file. This suggests that +removing `db.couch` earlier in the process might be good. + +1. Rename `db.couch.compact.data` to `db.couch.compact` +2. Delete `db.couch` +3. Delete `db.2.couch` +4. Rename `db.2.couch.compact.maxgen` to `db.2.couch` +5. Rename `db.couch.compact` to `db.couch` +6. Delete `db.couch.compact.meta` + +Having done that, we need to extend the recovery path where we fail to find +`db.couch` and check for `db.couch.compact`. The possible crash states of the +above process are: + + Step 1 Step 2 Step 3 Step 4 + ------------------------- ------------------------- ------------------------- ------------------------- + db.couch + db.2.couch (old) db.2.couch (old) db.2.couch (new) + db.2.couch.compact.maxgen db.2.couch.compact.maxgen db.2.couch.compact.maxgen + db.couch.compact db.couch.compact db.couch.compact db.couch.compact + +In the first state, we would open `db.couch` on restart, and this refers to data +in the _old_ `db.2.couch`. We can continue to use this and either leave +`db.2.couch.compact.maxgen` in place for some future compaction of gen 2, or we +can delete it as we're not using any data in it. + +In all the other states we will use `db.couch.compact` and therefore need to +complete the process of moving `db.2.couch.compact.maxgen` to `db.2.couch` +before using it. If `db.2.couch` exists, we remove it, and then we rename +`db.2.couch.compact.maxgen` to `db.2.couch`. This process is safe if we crash +after removing the old `db.2.couch`. + +We could also try to resolve these two questions independently: + +1. What to do if both `db.couch` and `db.couch.compact` exist +2. What to do if `db.G.couch` where `G > max_generation` exists + +But as we've seen, the cleanup operations for both these questions create states +where the mutual ordering of their operations is important and it would be wise +to minimise the set of possible such states. Therefore we suggest the following +recovery routine: + +1. Attempt to open `db.couch`. If this succeeds, remove any generation files + above `max_generation`. Otherwise... + +2. If `db.M+1.couch` where `M = max_generation` exists, then remove `db.M.couch` + then rename `db.M+1.couch` to `db.M.couch` + +3. Rename `db.couch.compact` to `db.couch` + +4. Open `db.couch` + +This process works correctly if the generation files are cleaned up _before_ the +rename of `db.couch.compact` to `db.couch` in `finish_compaction`. Otherwise the +`.compact` file that indicates the partial completion of this process may not +exist following a crash and this makes it harder to tell which generation files +are valid on recovery. + +This also highlights that letting users modify `max_generation` for a DB is not +safe while compaction is happening, because it may lead to confusion during +recovery that could cause data loss (i.e. mistaken deletion of +`db.2.couch.compact.maxgen`). + +If we allow post-creation changes to `max_generation`, then: + +- Increasing it is "free"; all existing data remains valid but it simply becomes + _possible_ for future compactions to create higher generations. + +- Decreasing it requires a "reversed" compaction to move data from higher + generations to lower ones followed by deleting the emptied generation(s). And + so this change should properly be thought of as requesting a compaction with + special properties. diff --git a/src/couch/doc/generational-compaction/design.md b/src/couch/doc/generational-compaction/design.md new file mode 100644 index 00000000000..b0369a4539c --- /dev/null +++ b/src/couch/doc/generational-compaction/design.md @@ -0,0 +1,165 @@ +# Generational compaction + +This describes where the implementation of this feature is -- what design we +have ended up with and why. I'm writing this up to gather feedback from the +CouchDB devs on any issues this might cause, things we might have overlooked, +and to make decisions on unresolved matters. + + +## Compaction in CouchDB 3.x + +We'll briefly describe relevant details of CouchDB's storage model and +compaction process and how they work today, before describing how we've changed +them. + +- A database shard is stored in a single file, with a name like `dbname.t.couch` + where `t` is a timestamp like `1743602741`. This contains both _data_, that is + document bodies and attachments the user has stored, as well as _structure_, + i.e. the internal data structures used by the CouchDB storage engine. This + includes the by-id and by-seq B+trees, the revtrees of all the documents, + purge history information, the database header, and any other bookkeeping + information needed by the engine. + +- Document bodies are written to disk as a single blob, and a pointer to its + location is stored as a single integer in the `#leaf.ptr` field. + +- Attachments may be stored in multiple chunks, and their pointers in the + attachment `Sp` field look like `[{Start, Length}, ...]` i.e. a list of one or + more pairs giving the start offset and length of each chunk. + +- Attachments are considered part of the document data; their metadata and + pointers are stored in the document body, and editing attachments creates a + new document revision. This means if you issue a `PUT /db/doc` to create a + document, followed by `PUT /db/doc/att` to add an attachment, then a fresh + copy of the document body will be created with the attachment metadata in it. + There will be two copies of the document body on disk; one for revpos 1 + without the attachment, and one at revpos 2 with the attachment. + +- Compaction works by creating a new empty database file named + `db.t.couch.compact.data`, which we'll call the _compaction target_. For each + document in `db.t.couch`, the doc body and attachments for the leaf revisions + are copied to the compaction target, along with a new revtree whose leaves + point at the new copies. The body/attachments for non-leaf revisions are not + copied over. When every document has been visited, the compaction target is + renamed to `db.t.couch` and becomes the new active shard file. + +- Compaction discards two types of garbage that will become more distinct under + a generational model. First, it discards garbage _data_: document bodies and + attachments which are not referenced by leaf revisions. Second, it discards + garbage _structure_: it builds new by-id and by-seq B+trees (among other + things) in the compaction target and thereby discards old B+tree nodes which + are no longer referenced from the current tree roots. + +- An extra file named `db.t.couch.compact.meta` is created when compaction + starts, to indicate an ongoing compaction of that shard and to store any + process state needed to resume it if it crashes. This file can serve as a lock + to prevent multiple compactions of the same shard happening concurrently. + + +## The generational model + +The main motivation for the generational model is that if a document is rarely +changed, then the same revision of it will be repeatedly copied on each +compaction. For large documents/attachments this creates an especially large +amount of overhead. Compaction still removes garbage _structure_ but also spends +most of its time copying an almost unchanged corpus of _data_ on each run. The +generational model aims to separate data from structure so that in many cases +compaction will run much faster, by avoiding copying the same set of data. + +At the same time, there are compatibility concerns. If we change how data is +stored, it becomes impossible to roll back to a previous release in the event of +production problems. We would therefore like to introduce as few changes as +possible and only change how data is stored if the user has explicitly opted in +to turn this feature on. Ideally we can do this without needed a lot of +conditional code paths that would increase complexity. + +### Design and implementation changes + +With these aims in mind we have ended up with the following design, which we +have a (mostly) working implementation of: + +- A shard continues to be stored in a file named `db.t.couch`, which we will now + refer to as "generation zero" or _gen-0_. All new data (including that from + replication) is written to this file, and the pointer format for bodies and + attachments remain unchanged for data stored in this file. This means we + continue to write data that can be read by 3.x in the gen-0 file. + +- A shard may also have a number of _generation_ files named `db.t.n.couch` + where `n` > 0. We'll use the shorthand _gen-n_ to refer to the file + `db.t.n.couch`. These files only store _data_, not _structure_, i.e. document + bodies and attachments can be written here, not B+tree nodes. When a datum is + stored in one of these files, its pointer is represented as `{Gen, Ptr}` where + `Gen` is the generation number (the `n` in the filename) and `Ptr` is the + regular pointer value (an int for bodies, and `[{Start, Length}, ...]` for + attachments. + +- The number of generation files is capped by a field called `max_generations` + stored in the database header. This field is not set by default, meaning the + database will not create generation files. The user would need to explicitly + set this to a value greater than 0 to turn the feature on. + +- Compaction is now parameterised by a generation `G`. It works as it does in + 3.x, with the alteration that leaf bodies or attachments currently stored in + generation `G` are moved to `G+1`, unless `G` is the maximum allowed + generation. That is, if a datum is currently stored in gen-0, it is appended + to the existing gen-1 file, rather than being stored in the compaction target. + If it currently in gen-1, it is copied to gen-2, and so on up to the max + generation. + +- The compactor still creates the compaction target `db.t.couch.compact.data` + and builds new database structures in that file, and renames it to + `db.t.couch` on completion. + +- Any leaf data that is not in the compaction generation `G` is left where it + is. If it is in gen-0, then it must be copied to the compaction target to + avoid being lost when compaction completes. This means that compactions of `G` + above 0 also implicitly compact the gen-0 file as it would be in 3.x. It also + means that if generational compaction is not enabled, compaction continues to + work as it does in 3.x. + +- Put another way, data is only copied to somewhere else if it is in gen-0 (when + it is copied to the compaction target) or if it is in gen `G` (when it is + copied to `G+1`). For all other data, its existing pointer is retained in the + new leaves built by the compactor. + +- Because attachment pointers are part of the document data, moving an + attachment requires creating a new copy of the document even if the document + itself is not being moved. For example, if a document is stored in gen-1 but + has an attachment stored in gen-2, and we are compacting with `G=2`, then the + attachment will be moved to gen-3. Since this changes its pointer, we need to + write a fresh copy of the document data in gen-1. When gen-1 is compacted + we'll end up with only one copy of the document in gen-2. + +- We still have the constraint that only one compaction of a shard can run at + once, for _any generation_. If a compaction with `G=0` is running, you cannot + start one at `G=1`. + +- Since the compactor is the only thing that ever writes data to gen-1 or above, + and only one compactor per shard runs at once, the generation's file can be + deleted on completion. For example, if compaction is running for `G=1`, it + will copy any leaf data in gen-1 into gen-2, and nothing new will have been + written to gen-1 while it was running. This means all data is gen-1 is now + unreferenced and can be trivially dropped by deleting/truncating the file. + +### Observations + +Taken together, these changes mean that if the feature is not explicitly turned +on by the user, compaction continues to work as it does today, and data +continues to be written in a format that can be read by 3.x, making rollback +possible. + +If the feature is turned on by setting `max_generations`, data written to gen-0 +remains 3.x compatible but the header will no longer be intelligible to 3.x as +it contains an extra field and may have its version incremented, so this might +be a change you cannot roll back from. It should be an error to reduce the +number of generations as this would lose data. + +The implementation also works without needing feature checks everywhere. It +works consistently all the time subject to the limit imposed by +`max_generations` which defaults to zero (i.e. feature is turned off). + +Writing pointers to gen-0 data using a 3.x-style pointer, rather than writing +`{Gen, Ptr}` with `Gen=0`, provides a free compatibility check. Since the code +needs to continue to understand old-style "bare" pointers, we know it can handle +files from 3.x installations without needing to explicitly test using files +retained from an actual 3.x system. diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 9c1df21b690..b369322fba9 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -80,7 +80,7 @@ update_seq = 0, deleted = false, rev_tree = [], - sizes = #size_info{} + sizes = [#size_info{}] }). -record(httpd, { diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 5ca3927e703..cf2d755fab5 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -828,6 +828,8 @@ attachment_field_api_test_() -> ]}. attachment_disk_term_test_() -> + FakeGen = 0, + FakeSp = 1337, BaseAttachment = new([ {name, <<"empty">>}, {type, <<"application/octet-stream">>}, @@ -835,13 +837,13 @@ attachment_disk_term_test_() -> {disk_len, 0}, {md5, <<212, 29, 140, 217, 143, 0, 178, 4, 233, 128, 9, 152, 236, 248, 66, 126>>}, {revpos, 4}, - {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}}, + {data, {stream, {couch_bt_engine_stream, {fake_fd, FakeGen, FakeSp}}}}, {encoding, identity} ]), BaseDiskTerm = { <<"empty">>, <<"application/octet-stream">>, - fake_sp, + FakeSp, 0, 0, 4, @@ -851,7 +853,7 @@ attachment_disk_term_test_() -> Headers = [{<<"X-Foo">>, <<"bar">>}], ExtendedAttachment = store(headers, Headers, BaseAttachment), ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]}, - FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd = fake_fd}}}]), + FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd = {fake_fd, fake_ref}}}}]), {"Disk term tests", [ ?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)), ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)), diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index ad84e0db853..4a483f0df24 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -35,6 +35,8 @@ get_disk_version/1, get_doc_count/1, get_epochs/1, + get_fd/1, + get_fd/2, get_purge_seq/1, get_oldest_purge_seq/1, get_purge_infos_limit/1, @@ -47,6 +49,7 @@ get_uuid/1, set_revs_limit/2, + set_max_generation/2, set_purge_infos_limit/2, set_security/2, set_props/2, @@ -60,6 +63,7 @@ serialize_doc/2, write_doc_body/2, + write_doc_body/3, write_doc_infos/3, purge_docs/3, copy_purge_infos/2, @@ -67,6 +71,7 @@ commit_data/1, open_write_stream/2, + open_write_stream/3, open_read_stream/2, is_active_stream/2, @@ -76,12 +81,14 @@ fold_purge_infos/5, count_changes_since/2, - start_compaction/4, + start_compaction/5, + open_additional_generation_file/3, + increment_generation/2, finish_compaction/4 ]). -export([ - init_state/4 + init_state/5 ]). -export([ @@ -128,8 +135,19 @@ delete(RootDir, FilePath, Async) -> %% as a recovery. delete_compaction_files(RootDir, FilePath, [{context, compaction}]), - % Delete the actual database file - couch_file:delete(RootDir, FilePath, Async). + % Delete the actual database files + delete_generational_files(RootDir, FilePath, 0, Async). + +delete_generational_files(RootDir, FilePath, Gen, Async) -> + GenPath = generation_file_path(FilePath, Gen), + case couch_file:delete(RootDir, GenPath, Async) of + ok -> + delete_generational_files(RootDir, FilePath, Gen + 1, Async); + {error, enoent} -> + ok; + Error -> + Error + end. delete_compaction_files(RootDir, FilePath, DelOpts) -> lists:foreach( @@ -155,8 +173,10 @@ init(FilePath, Options) -> delete_compaction_files(FilePath), Header0 = couch_bt_engine_header:new(), Header1 = init_set_props(Fd, Header0, Options), - ok = couch_file:write_header(Fd, Header1), - Header1; + MaxGen = proplists:get_value(max_generation, Options, 0), + Header2 = couch_bt_engine_header:set(Header1, [{max_generation, MaxGen}]), + ok = couch_file:write_header(Fd, Header2), + Header2; false -> case couch_file:read_header(Fd) of {ok, Header0} -> @@ -168,47 +188,75 @@ init(FilePath, Options) -> Header0 end end, - {ok, init_state(FilePath, Fd, Header, Options)}. + OpenGen = couch_bt_engine_header:max_generation(Header), + GenFds = maybe_open_generation_files(FilePath, OpenGen, Options), + {ok, init_state(FilePath, Fd, GenFds, Header, Options)}. terminate(_Reason, St) -> % If the reason we died is because our fd disappeared % then we don't need to try closing it again. - Ref = St#st.fd_monitor, - if - Ref == closed -> - ok; - true -> - ok = couch_file:close(St#st.fd), - receive - {'DOWN', Ref, _, _, _} -> - ok - after 500 -> - ok - end - end, - couch_util:shutdown_sync(St#st.fd), + lists:foreach( + fun({Fd, Ref}) -> + if + Ref == closed -> + ok; + true -> + ok = couch_file:close(Fd), + receive + {'DOWN', Ref, _, _, _} -> + ok + after 500 -> + ok + end + end, + couch_util:shutdown_sync(Fd) + end, + [St#st.fd | St#st.gen_fds] + ), ok. handle_db_updater_call(Msg, St) -> {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. -handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor = Ref} = St) -> - {stop, normal, St#st{fd = undefined, fd_monitor = closed}}. - -incref(St) -> - {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}. - -decref(St) -> - true = erlang:demonitor(St#st.fd_monitor, [flush]), +handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{} = St) -> + {Fd, GenFds} = update_fd_monitors(St#st.fd, St#st.gen_fds, Ref), + St1 = St#st{fd = Fd, gen_fds = GenFds}, + {stop, normal, St1}. + +update_fd_monitors({_Fd, Ref}, GenFds, Ref) -> + {{undefined, closed}, GenFds}; +update_fd_monitors(Fd, [{_Fd, Ref} | Rest], Ref) -> + {Fd, [{undefined, closed} | Rest]}; +update_fd_monitors(Fd0, [First | Rest], Ref) -> + {Fd, GenFds} = update_fd_monitors(Fd0, Rest, Ref), + {Fd, [First | GenFds]}. + +incref(#st{fd = {Fd, _}, gen_fds = GenFds0} = St) -> + Ref = erlang:monitor(process, Fd), + GenFds = [{GenFd, erlang:monitor(process, GenFd)} || {GenFd, _} <- GenFds0], + {ok, St#st{fd = {Fd, Ref}, gen_fds = GenFds}}. + +decref(#st{} = St) -> + lists:foreach( + fun({_, Ref}) -> + true = erlang:demonitor(Ref, [flush]) + end, + [St#st.fd | St#st.gen_fds] + ), ok. -monitored_by(St) -> - case erlang:process_info(St#st.fd, monitored_by) of - {monitored_by, Pids} -> - lists:filter(fun is_pid/1, Pids); - _ -> - [] - end. +monitored_by(#st{fd = Fd, gen_fds = GenFds}) -> + lists:flatmap( + fun(GenFd) -> + case erlang:process_info(GenFd, monitored_by) of + {monitored_by, Pids} -> + lists:filter(fun is_pid/1, Pids); + _ -> + [] + end + end, + [GenFd || {GenFd, _} <- [Fd | GenFds]] + ). get_compacted_seq(#st{header = Header}) -> couch_bt_engine_header:get(Header, compacted_seq). @@ -248,25 +296,27 @@ get_revs_limit(#st{header = Header}) -> couch_bt_engine_header:get(Header, revs_limit). get_size_info(#st{} = St) -> - {ok, FileSize} = couch_file:bytes(St#st.fd), {ok, DbReduction} = couch_btree:full_reduce(St#st.id_tree), - SizeInfo0 = element(3, DbReduction), - SizeInfo = - case SizeInfo0 of - SI when is_record(SI, size_info) -> - SI; - {AS, ES} -> - #size_info{active = AS, external = ES}; - AS -> - #size_info{active = AS} + SizeInfos = upgrade_sizes_to_list(element(3, DbReduction)), + lists:zipwith( + fun(Gen, SI) -> + Fd = get_fd(St, Gen), + {ok, FileSize} = couch_file:bytes(Fd), + ActiveSize = + case Gen of + 0 -> active_size(St, SI); + _ -> SI#size_info.active + end, + ExternalSize = SI#size_info.external, + [ + {active, ActiveSize}, + {external, ExternalSize}, + {file, FileSize} + ] end, - ActiveSize = active_size(St, SizeInfo), - ExternalSize = SizeInfo#size_info.external, - [ - {active, ActiveSize}, - {external, ExternalSize}, - {file, FileSize} - ]. + lists:seq(0, length(SizeInfos) - 1), + SizeInfos + ). partition_size_cb(traverse, Key, {DC, DDC, Sizes}, {Partition, DCAcc, DDCAcc, SizesAcc}) -> case couch_partition:is_member(Key, Partition) of @@ -294,7 +344,8 @@ get_partition_info(#st{} = St, Partition) -> InitAcc = {Partition, 0, 0, #size_info{}}, Options = [{start_key, StartKey}, {end_key, EndKey}], {ok, _, OutAcc} = couch_btree:fold(St#st.id_tree, Fun, InitAcc, Options), - {Partition, DocCount, DocDelCount, SizeInfo} = OutAcc, + {Partition, DocCount, DocDelCount, SizeInfo0} = OutAcc, + SizeInfo = couch_db_updater:sum_sizes(SizeInfo0), [ {partition, Partition}, {doc_count, DocCount}, @@ -310,7 +361,7 @@ get_security(#st{header = Header} = St) -> undefined -> []; Pointer -> - {ok, SecProps} = couch_file:pread_term(St#st.fd, Pointer), + {ok, SecProps} = couch_file:pread_term(get_fd(St), Pointer), SecProps end. @@ -319,7 +370,7 @@ get_props(#st{header = Header} = St) -> undefined -> []; Pointer -> - {ok, Props} = couch_file:pread_term(St#st.fd, Pointer), + {ok, Props} = couch_file:pread_term(get_fd(St), Pointer), Props end. @@ -338,6 +389,23 @@ set_revs_limit(#st{header = Header} = St, RevsLimit) -> }, {ok, increment_update_seq(NewSt)}. +set_max_generation(#st{filepath = FilePath, gen_fds = GenFds, header = Header} = St, MaxGen) -> + CurrentMaxGen = couch_bt_engine_header:max_generation(Header), + if + MaxGen >= CurrentMaxGen -> + NewSt = St#st{ + header = couch_bt_engine_header:set(Header, [ + {max_generation, MaxGen} + ]), + needs_commit = true + }, + GenFds1 = open_missing_generation_files(FilePath, GenFds, MaxGen), + NewSt1 = NewSt#st{gen_fds = GenFds1}, + {ok, increment_update_seq(NewSt1)}; + true -> + {error, invalid_max_generation} + end. + set_purge_infos_limit(#st{header = Header} = St, PurgeInfosLimit) -> NewSt = St#st{ header = couch_bt_engine_header:set(Header, [ @@ -349,7 +417,7 @@ set_purge_infos_limit(#st{header = Header} = St, PurgeInfosLimit) -> set_security(#st{header = Header} = St, NewSecurity) -> Options = [{compression, St#st.compression}], - {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options), + {ok, Ptr, _} = couch_file:append_term(get_fd(St), NewSecurity, Options), NewSt = St#st{ header = couch_bt_engine_header:set(Header, [ {security_ptr, Ptr} @@ -360,7 +428,7 @@ set_security(#st{header = Header} = St, NewSecurity) -> set_props(#st{header = Header} = St, Props) -> Options = [{compression, St#st.compression}], - {ok, Ptr, _} = couch_file:append_term(St#st.fd, Props, Options), + {ok, Ptr, _} = couch_file:append_term(get_fd(St), Props, Options), NewSt = St#st{ header = couch_bt_engine_header:set(Header, [ {props_ptr, Ptr} @@ -390,7 +458,9 @@ open_local_docs(#st{} = St, DocIds) -> ). read_doc_body(#st{} = St, #doc{} = Doc) -> - {ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body), + {Gen, Ptr} = couch_db_updater:generation_pointer(Doc#doc.body), + Fd = get_fd(St, Gen), + {ok, {Body, Atts}} = couch_file:pread_term(Fd, Ptr), Doc#doc{ body = Body, atts = Atts @@ -427,12 +497,28 @@ serialize_doc(#st{} = St, #doc{} = Doc) -> meta = [{comp_body, Body} | Doc#doc.meta] }. +get_fd(#st{fd = {Fd, _}}) -> + Fd. + +get_fd(#st{fd = {Fd, _}}, 0) -> + Fd; +get_fd(#st{gen_fds = GenFds}, Gen) -> + {Fd, _} = lists:nth(Gen, GenFds), + Fd. + write_doc_body(St, #doc{} = Doc) -> - #st{ - fd = Fd - } = St, + % New doc bodies start with generation 0 + write_doc_body(St, Doc, {0, 0}). + +write_doc_body(St, #doc{} = Doc, {FdGen, PtrGen}) -> + Fd = get_fd(St, FdGen), {ok, Ptr, Written} = couch_file:append_raw_chunk(Fd, Doc#doc.body), - {ok, Doc#doc{body = Ptr}, Written}. + GenPtr = + case PtrGen of + 0 -> Ptr; + G -> {G, Ptr} + end, + {ok, Doc#doc{body = GenPtr}, Written}. write_doc_infos(#st{} = St, Pairs, LocalDocs) -> #st{ @@ -553,10 +639,10 @@ copy_purge_infos(#st{} = St, PurgeInfos) -> commit_data(St) -> #st{ - fd = Fd, header = OldHeader, needs_commit = NeedsCommit } = St, + Fd = get_fd(St), NewHeader = update_header(St, OldHeader), @@ -575,13 +661,20 @@ commit_data(St) -> end. open_write_stream(#st{} = St, Options) -> - couch_stream:open({couch_bt_engine_stream, {St#st.fd, []}}, Options). + open_write_stream(St, {0, 0}, Options). + +open_write_stream(#st{} = St, {FdGen, PtrGen}, Options) -> + Fd = get_fd(St, FdGen), + couch_stream:open({couch_bt_engine_stream, {Fd, PtrGen, []}}, Options). -open_read_stream(#st{} = St, StreamSt) -> - {ok, {couch_bt_engine_stream, {St#st.fd, StreamSt}}}. +open_read_stream(#st{} = St, StreamSt0) -> + {Gen, StreamSt} = couch_db_updater:generation_pointer(StreamSt0), + Fd = get_fd(St, Gen), + {ok, {couch_bt_engine_stream, {Fd, Gen, StreamSt}}}. -is_active_stream(#st{} = St, {couch_bt_engine_stream, {Fd, _}}) -> - St#st.fd == Fd; +is_active_stream(#st{} = St, {couch_bt_engine_stream, {Fd, _, _}}) -> + Fds = [GenFd || {GenFd, _} <- [St#st.fd | St#st.gen_fds]], + lists:member(Fd, Fds); is_active_stream(_, _) -> false. @@ -633,18 +726,18 @@ count_changes_since(St, SinceSeq) -> {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts), Changes. -start_compaction(St, DbName, Options, Parent) -> - Args = [St, DbName, Options, Parent], +start_compaction(St, DbName, SrcGen, Options, Parent) -> + Args = [St, DbName, SrcGen, Options, Parent], Pid = spawn_link(couch_bt_engine_compactor, start, Args), {ok, St, Pid}. -finish_compaction(OldState, DbName, Options, CompactFilePath) -> - {ok, NewState1} = ?MODULE:init(CompactFilePath, Options), +finish_compaction(OldState, DbName, Options, {CompactFilePath, SrcGen}) -> + {ok, NewState1} = ?MODULE:init(CompactFilePath, [compacting | Options]), OldSeq = get_update_seq(OldState), NewSeq = get_update_seq(NewState1), case OldSeq == NewSeq of true -> - finish_compaction_int(OldState, NewState1); + finish_compaction_int(OldState, NewState1, SrcGen); false -> Level = list_to_existing_atom( config:get( @@ -657,7 +750,7 @@ finish_compaction(OldState, DbName, Options, CompactFilePath) -> [OldSeq, NewSeq] ), ok = decref(NewState1), - start_compaction(OldState, DbName, Options, self()) + start_compaction(OldState, DbName, SrcGen, Options, self()) end. id_tree_split(#full_doc_info{} = Info) -> @@ -811,7 +904,7 @@ set_update_seq(#st{header = Header} = St, UpdateSeq) -> copy_security(#st{header = Header} = St, SecProps) -> Options = [{compression, St#st.compression}], - {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options), + {ok, Ptr, _} = couch_file:append_term(get_fd(St), SecProps, Options), {ok, St#st{ header = couch_bt_engine_header:set(Header, [ {security_ptr, Ptr} @@ -821,7 +914,7 @@ copy_security(#st{header = Header} = St, SecProps) -> copy_props(#st{header = Header} = St, Props) -> Options = [{compression, St#st.compression}], - {ok, Ptr, _} = couch_file:append_term(St#st.fd, Props, Options), + {ok, Ptr, _} = couch_file:append_term(get_fd(St), Props, Options), {ok, St#st{ header = couch_bt_engine_header:set(Header, [ {props_ptr, Ptr} @@ -834,12 +927,13 @@ open_db_file(FilePath, Options) -> {ok, Fd} -> {ok, Fd}; {error, enoent} -> - % Couldn't find file. is there a compact version? This ca + % Couldn't find file. is there a compact version? This can % happen (rarely) if we crashed during the file switch. case couch_file:open(FilePath ++ ".compact", [nologifmissing]) of {ok, Fd} -> Fmt = "Recovering from compaction file: ~s~s", couch_log:info(Fmt, [FilePath, ".compact"]), + cleanup_any_compacted_generation(FilePath), ok = file:rename(FilePath ++ ".compact", FilePath), ok = couch_file:sync(Fd), {ok, Fd}; @@ -850,7 +944,82 @@ open_db_file(FilePath, Options) -> throw(Error) end. -init_state(FilePath, Fd, Header0, Options) -> +generation_file_path(FilePath, 0) -> + FilePath; +generation_file_path(FilePath, Gen) -> + G = integer_to_list(Gen), + string:replace(FilePath, ".couch", "." ++ G ++ ".couch", trailing). + +open_generation_file(FilePath, Gen, Options) -> + open_generation_file(FilePath, Gen, "", Options). + +open_generation_file(FilePath, Gen, Suffix, Options) -> + GenFilePath = generation_file_path(FilePath, Gen) ++ Suffix, + Fd = + case couch_file:open(GenFilePath, [nologifmissing | Options]) of + {ok, Db} -> + Db; + {error, enoent} -> + {ok, Db} = couch_file:open(GenFilePath, [create]), + Db + end, + {Fd, erlang:monitor(process, Fd)}. + +open_generation_files(_FilePath, 0, _Options) -> + []; +open_generation_files(FilePath, Generations, Options) -> + lists:map( + fun(Gen) -> + open_generation_file(FilePath, Gen, Options) + end, + lists:seq(1, Generations) + ). + +open_missing_generation_files(FilePath, GenFds, MaxGen) -> + open_missing_generation_files(FilePath, GenFds, MaxGen, 1). + +open_missing_generation_files(_FilePath, GenFds, MaxGen, Gen) when Gen > MaxGen -> + GenFds; +open_missing_generation_files(FilePath, [], MaxGen, Gen) -> + Rest = open_missing_generation_files(FilePath, [], MaxGen, Gen + 1), + Fd = open_generation_file(FilePath, Gen, []), + [Fd | Rest]; +open_missing_generation_files(FilePath, [Fd | Rest], MaxGen, Gen) -> + [Fd | open_missing_generation_files(FilePath, Rest, MaxGen, Gen + 1)]. + +maybe_open_generation_files(FilePath, Generations, Options) -> + case lists:member(compacting, Options) of + true -> []; + false -> open_generation_files(FilePath, Generations, Options) + end. + +open_additional_generation_file(#st{} = St, Gen, Options) -> + #st{ + filepath = FilePath, + header = Header, + gen_fds = GenFds + } = St, + MaxGen = couch_bt_engine_header:max_generation(Header), + case {MaxGen, Gen} of + {0, _} -> + GenFds; + {M, M} -> + Fd = open_generation_file(FilePath, M, ".compact.maxgen", Options), + GenFds ++ [Fd]; + _ -> + GenFds + end. + +reopen_generation_file(FilePath, Fds, Gen) -> + reopen_generation_file(FilePath, Gen, Fds, Gen). + +reopen_generation_file(FilePath, Gen, [_ | Fds], 1) -> + Fd = open_generation_file(FilePath, Gen, []), + [Fd | Fds]; +reopen_generation_file(FilePath, Gen, [Fd | Fds], G) -> + [Fd | reopen_generation_file(FilePath, Gen, Fds, G - 1)]. + +init_state(FilePath, Fd, GenFds, Header0, Options) -> ok = couch_file:sync(Fd), Compression = couch_compress:get_compression_method(), @@ -900,8 +1069,8 @@ init_state(FilePath, Fd, Header0, Options) -> St = #st{ filepath = FilePath, - fd = Fd, - fd_monitor = erlang:monitor(process, Fd), + fd = {Fd, erlang:monitor(process, Fd)}, + gen_fds = GenFds, header = Header, needs_commit = false, id_tree = IdTree, @@ -1068,16 +1237,26 @@ disk_tree(RevTree) -> sizes = Sizes, atts = Atts } = Leaf, - {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts} + {?b2i(Del), couch_db_updater:canonical_pointer(Ptr), Seq, split_sizes(Sizes), Atts} end, RevTree ). -split_sizes(#size_info{} = SI) -> - {SI#size_info.active, SI#size_info.external}. +split_sizes(Sizes) -> + couch_db_updater:map_sizes( + fun(#size_info{active = Active, external = External}) -> + {Active, External} + end, + Sizes + ). -join_sizes({Active, External}) when is_integer(Active), is_integer(External) -> - #size_info{active = Active, external = External}. +join_sizes(Sizes) -> + couch_db_updater:map_sizes( + fun({Active, External}) when is_integer(Active), is_integer(External) -> + #size_info{active = Active, external = External} + end, + Sizes + ). reduce_sizes(nil, _) -> nil; @@ -1088,11 +1267,23 @@ reduce_sizes(#size_info{} = S1, #size_info{} = S2) -> active = S1#size_info.active + S2#size_info.active, external = S1#size_info.external + S2#size_info.external }; +reduce_sizes([], S) -> + S; +reduce_sizes(S, []) -> + S; +reduce_sizes([S1 | R1], [S2 | R2]) -> + [reduce_sizes(S1, S2) | reduce_sizes(R1, R2)]; reduce_sizes(S1, S2) -> - US1 = couch_db_updater:upgrade_sizes(S1), - US2 = couch_db_updater:upgrade_sizes(S2), + US1 = upgrade_sizes_to_list(S1), + US2 = upgrade_sizes_to_list(S2), reduce_sizes(US1, US2). +upgrade_sizes_to_list(Sizes) -> + case couch_db_updater:upgrade_sizes(Sizes) of + S when is_list(S) -> S; + S -> [S] + end. + active_size(#st{} = St, #size_info{} = SI) -> Trees = [ St#st.id_tree, @@ -1170,10 +1361,11 @@ fold_docs_reduce_to_count(Reds) -> FinalRed = couch_btree:final_reduce(RedFun, Reds), element(1, FinalRed). -finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> +finish_compaction_int(#st{} = OldSt, #st{} = NewSt1, SrcGen) -> #st{ filepath = FilePath, - local_tree = OldLocal + local_tree = OldLocal, + gen_fds = OldGenFds } = OldSt, #st{ filepath = CompactDataPath, @@ -1197,6 +1389,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> local_tree = NewLocal2 }), + % CRASH SAFETY: this function must do these operations in this order: + % + % - Rename db.couch.compact.data to db.couch.compact + % - Delete the original db.couch + % - Deal with deleting/renaming generation files + % - Rename db.couch.compact to db.couch + % - Delete db.couch.compact.meta + % + % This order is also followed when recovering from a crash that leaves + % compaction files in place. Deviating from this order may create an + % ambiguous state that makes recovery from crashes harder. + % Rename our *.compact.data file to *.compact so that if we % die between deleting the old file and renaming *.compact % we can recover correctly. @@ -1206,6 +1410,28 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> RootDir = config:get("couchdb", "database_dir", "."), couch_file:delete(RootDir, FilePath), + % Assuming that the compactor is the only process that writes to generation + % files, and there is only one compaction per shard running at any time, + % then the compacted generation file G will have had all its data moved to + % G+1 and it will not have gained any data from G-1. Therefore it is empty, + % contains no data referenced by the root .couch file, and can be deleted + % and a fresh file opened in its place. + + MaxGen = couch_bt_engine_header:max_generation(Header), + + NewGenFds = + case SrcGen of + 0 -> + OldGenFds; + MaxGen -> + cleanup_compacted_max_generation(FilePath, MaxGen), + reopen_generation_file(FilePath, OldGenFds, MaxGen); + Gen -> + GenFilePath = generation_file_path(FilePath, Gen), + couch_file:delete(RootDir, GenFilePath), + reopen_generation_file(FilePath, OldGenFds, Gen) + end, + % Move our compacted file into its final location ok = file:rename(FilePath ++ ".compact", FilePath), @@ -1213,15 +1439,59 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> % the compaction file. couch_file:delete(RootDir, FilePath ++ ".compact.meta"), - % We're finished with our old state - decref(OldSt), + % We're finished with our old state; the gen_fds are excluded from this as + % they continue to be used by the new state. + decref(OldSt#st{gen_fds = []}), + + {_, DstGen} = increment_generation(NewSt2, SrcGen), % And return our finished new state {ok, NewSt2#st{ - filepath = FilePath + filepath = FilePath, + gen_fds = NewGenFds }, - undefined}. + DstGen}. + +increment_generation(#st{header = Header}, Gen) -> + MaxGen = couch_bt_engine_header:max_generation(Header), + case {MaxGen, Gen} of + {0, _} -> {0, 0}; + {M, M} -> {M + 1, M}; + {_, G} -> {G + 1, G + 1} + end. + +cleanup_any_compacted_generation(FilePath) -> + Dir = filename:dirname(FilePath), + {ok, Filenames} = file:list_dir(Dir), + lists:foreach( + fun(Filename) -> + CompactPath = filename:join(Dir, Filename), + [NewPath | _] = string:replace(CompactPath, ".compact.maxgen", "", trailing), + case NewPath of + CompactPath -> + ok; + GenPath -> + move_compacted_generation_file(GenPath, CompactPath) + end + end, + Filenames + ). + +cleanup_compacted_max_generation(FilePath, MaxGen) -> + GenPath = generation_file_path(FilePath, MaxGen), + CompactPath = GenPath ++ ".compact.maxgen", + move_compacted_generation_file(GenPath, CompactPath). + +move_compacted_generation_file(GenPath, CompactPath) -> + case is_file(CompactPath) of + true -> + RootDir = config:get("couchdb", "database_dir", "."), + couch_file:delete(RootDir, GenPath), + ok = file:rename(CompactPath, GenPath); + false -> + ok + end. is_file(Path) -> case file:read_file_info(Path, [raw]) of diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl index e3c1d49831a..f55c0c85f46 100644 --- a/src/couch/src/couch_bt_engine.hrl +++ b/src/couch/src/couch_bt_engine.hrl @@ -13,7 +13,7 @@ -record(st, { filepath, fd, - fd_monitor, + gen_fds = [], % deprecated but keeping it here to avoid altering the record size fsync_options_deprecated, header, diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 8ed55b5c399..4ce552366f0 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -13,7 +13,7 @@ -module(couch_bt_engine_compactor). -export([ - start/4 + start/5 ]). -include_lib("couch/include/couch_db.hrl"). @@ -21,6 +21,7 @@ -record(comp_st, { db_name, + src_generation, old_st, new_st, meta_fd, @@ -47,14 +48,17 @@ -define(COMP_EVENT(Name), ignore). -endif. -start(#st{} = St, DbName, Options, Parent) -> +start(#st{header = Header} = St, DbName, SrcGen0, Options, Parent) -> erlang:put(io_priority, {db_compact, DbName}), couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]), + MaxGen = couch_bt_engine_header:max_generation(Header), + SrcGen = erlang:min(SrcGen0, MaxGen), + couch_db_engine:trigger_on_compact(DbName), ?COMP_EVENT(init), - {ok, InitCompSt} = open_compaction_files(DbName, St, Options), + {ok, InitCompSt} = open_compaction_files(DbName, SrcGen, St, Options), ?COMP_EVENT(files_opened), Stages = [ @@ -84,10 +88,10 @@ start(#st{} = St, DbName, Options, Parent) -> ok = couch_file:close(MetaFd), ?COMP_EVENT(before_notify), - Msg = {compact_done, couch_bt_engine, FinalNewSt#st.filepath}, + Msg = {compact_done, couch_bt_engine, {FinalNewSt#st.filepath, SrcGen}}, gen_server:cast(Parent, Msg). -open_compaction_files(DbName, OldSt, Options) -> +open_compaction_files(DbName, SrcGen, OldSt, Options) -> #st{ filepath = DbFilePath, header = SrcHdr @@ -97,6 +101,7 @@ open_compaction_files(DbName, OldSt, Options) -> {ok, DataFd, DataHdr} = open_compaction_file(DataFile), {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr), + GenFds = couch_bt_engine:open_additional_generation_file(OldSt, SrcGen, Options), CompSt = case {DataHdr, MetaHdr} of {#comp_header{} = A, #comp_header{} = A} -> @@ -104,11 +109,12 @@ open_compaction_files(DbName, OldSt, Options) -> % before trying to swap out with the original db DbHeader = A#comp_header.db_header, St0 = couch_bt_engine:init_state( - DataFile, DataFd, DbHeader, Options + DataFile, DataFd, GenFds, DbHeader, Options ), St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st), #comp_st{ db_name = DbName, + src_generation = SrcGen, old_st = OldSt, new_st = St1, meta_fd = MetaFd, @@ -121,11 +127,12 @@ open_compaction_files(DbName, OldSt, Options) -> Header = couch_bt_engine_header:from(SrcHdr), ok = reset_compaction_file(MetaFd, Header), St0 = couch_bt_engine:init_state( - DataFile, DataFd, DataHdr, Options + DataFile, DataFd, GenFds, DataHdr, Options ), St1 = bind_emsort(St0, MetaFd, nil), #comp_st{ db_name = DbName, + src_generation = SrcGen, old_st = OldSt, new_st = St1, meta_fd = MetaFd, @@ -136,10 +143,11 @@ open_compaction_files(DbName, OldSt, Options) -> Header = couch_bt_engine_header:from(SrcHdr), ok = reset_compaction_file(DataFd, Header), ok = reset_compaction_file(MetaFd, Header), - St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options), + St0 = couch_bt_engine:init_state(DataFile, DataFd, GenFds, Header, Options), St1 = bind_emsort(St0, MetaFd, nil), #comp_st{ db_name = DbName, + src_generation = SrcGen, old_st = OldSt, new_st = St1, meta_fd = MetaFd, @@ -211,7 +219,7 @@ copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) -> NewIdTreeState = couch_bt_engine_header:id_tree_state(NewSt0#st.header), MetaFd = couch_emsort:get_fd(NewSt0#st.id_tree), MetaState = couch_emsort:get_state(NewSt0#st.id_tree), - NewSt1 = bind_id_tree(NewSt0, NewSt0#st.fd, NewIdTreeState), + NewSt1 = bind_id_tree(NewSt0, couch_bt_engine:get_fd(NewSt0), NewIdTreeState), #st{ id_tree = NewIdTree0, @@ -287,6 +295,7 @@ copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) -> copy_compact(#comp_st{} = CompSt) -> #comp_st{ db_name = DbName, + src_generation = SrcGen, old_st = St, new_st = NewSt0, retry = Retry @@ -323,7 +332,7 @@ copy_compact(#comp_st{} = CompSt) -> if AccUncopiedSize2 >= BufferSize -> NewSt2 = copy_docs( - St, AccNewSt, lists:reverse([DocInfo | AccUncopied]), Retry + St, SrcGen, AccNewSt, lists:reverse([DocInfo | AccUncopied]), Retry ), AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, if @@ -371,7 +380,7 @@ copy_compact(#comp_st{} = CompSt) -> [{start_key, NewUpdateSeq + 1}] ), - NewSt3 = copy_docs(St, NewSt2, lists:reverse(Uncopied), Retry), + NewSt3 = copy_docs(St, SrcGen, NewSt2, lists:reverse(Uncopied), Retry), ?COMP_EVENT(seq_done), @@ -390,7 +399,13 @@ copy_compact(#comp_st{} = CompSt) -> new_st = NewSt6 }. -copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> +pick_target_generation(SrcGen, DstGen, DataGen) -> + case DataGen of + SrcGen -> DstGen; + _ -> {DataGen, DataGen} + end. + +copy_docs(St, SrcGen, #st{} = NewSt, MixedInfos, Retry) -> DocInfoIds = [Id || #doc_info{id = Id} <- MixedInfos], LookupResults = couch_btree:lookup(St#st.id_tree, DocInfoIds), % COUCHDB-968, make sure we prune duplicates during compaction @@ -405,9 +420,14 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> fun(Info) -> {NewRevTree, FinalAcc} = couch_key_tree:mapfold( fun - ({RevPos, RevId}, #leaf{ptr = Sp} = Leaf, leaf, SizesAcc) -> - {Body, AttInfos} = copy_doc_attachments(St, Sp, NewSt), - #size_info{external = OldExternalSize} = Leaf#leaf.sizes, + ({RevPos, RevId}, #leaf{ptr = LeafPtr} = Leaf, leaf, SizesAcc) -> + {DocGen, _} = couch_db_updater:generation_pointer(LeafPtr), + DstGen = couch_bt_engine:increment_generation(St, SrcGen), + {Body, AttsChanged, AttInfos} = copy_doc_attachments( + St, NewSt, LeafPtr, SrcGen, DstGen + ), + #size_info{active = OldActiveSize, external = OldExternalSize} = + Leaf#leaf.sizes, ExternalSize = case OldExternalSize of 0 when is_binary(Body) -> @@ -417,19 +437,28 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> N -> N end, - Doc0 = #doc{ - id = Info#full_doc_info.id, - revs = {RevPos, [RevId]}, - deleted = Leaf#leaf.deleted, - body = Body, - atts = AttInfos - }, - Doc1 = couch_bt_engine:serialize_doc(NewSt, Doc0), - {ok, Doc2, ActiveSize} = - couch_bt_engine:write_doc_body(NewSt, Doc1), + {NewPtr, ActiveSize} = + case {AttsChanged, DocGen} of + {false, Gen} when Gen > 0, Gen =/= SrcGen -> + {LeafPtr, OldActiveSize}; + _Else -> + Doc0 = #doc{ + id = Info#full_doc_info.id, + revs = {RevPos, [RevId]}, + deleted = Leaf#leaf.deleted, + body = Body, + atts = AttInfos + }, + Doc1 = couch_bt_engine:serialize_doc(NewSt, Doc0), + NewGen = pick_target_generation(SrcGen, DstGen, DocGen), + {ok, Doc2, NewActiveSize} = couch_bt_engine:write_doc_body( + NewSt, Doc1, NewGen + ), + {Doc2#doc.body, NewActiveSize} + end, AttSizes = [{element(3, A), element(4, A)} || A <- AttInfos], NewLeaf = Leaf#leaf{ - ptr = Doc2#doc.body, + ptr = NewPtr, sizes = #size_info{ active = ActiveSize, external = ExternalSize @@ -443,17 +472,11 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> {0, 0, []}, Info#full_doc_info.rev_tree ), - {FinalAS, FinalES, FinalAtts} = FinalAcc, - TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), - NewActiveSize = FinalAS + TotalAttSize, - NewExternalSize = FinalES + TotalAttSize, + GenSizes = couch_db_updater:map_fold_sizes(FinalAcc), ?COMP_EVENT(seq_copy), Info#full_doc_info{ rev_tree = NewRevTree, - sizes = #size_info{ - active = NewActiveSize, - external = NewExternalSize - } + sizes = GenSizes } end, NewInfos0 @@ -504,8 +527,10 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> update_compact_task(length(NewInfos)), NewSt#st{id_tree = IdEms, seq_tree = SeqTree}. -copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) -> - {ok, {BodyData, BinInfos0}} = couch_file:pread_term(SrcSt#st.fd, SrcSp), +copy_doc_attachments(#st{} = SrcSt, DstSt, LeafPtr, SrcGen, DstGen) -> + {DocGen, SrcSp} = couch_db_updater:generation_pointer(LeafPtr), + Fd = couch_bt_engine:get_fd(SrcSt, DocGen), + {ok, {BodyData, BinInfos0}} = couch_file:pread_term(Fd, SrcSp), BinInfos = case BinInfos0 of _ when is_binary(BinInfos0) -> @@ -515,7 +540,7 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) -> BinInfos0 end, % copy the bin values - NewBinInfos = lists:map( + NewBinInfos0 = lists:map( fun ({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) -> % 010 UPGRADE CODE @@ -526,10 +551,17 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) -> couch_stream:close(DstStream), {ok, NewBinSp} = couch_stream:to_disk_term(NewStream), couch_util:check_md5(ExpectedMd5, ActualMd5), - {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity}; + {true, {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity}}; + ( + {_Name, _Type, {AttGen, _BinSp}, _AttLen, _DiskLen, _RevPos, _ExpectedMd5, _Enc1} = + BinInfo + ) when AttGen =/= SrcGen -> + {false, BinInfo}; ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) -> + {AttGen, _} = couch_db_updater:generation_pointer(BinSp), + NewGen = pick_target_generation(SrcGen, DstGen, AttGen), {ok, SrcStream} = couch_bt_engine:open_read_stream(SrcSt, BinSp), - {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, []), + {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, NewGen, []), ok = couch_stream:copy(SrcStream, DstStream), {NewStream, AttLen, _, ActualMd5, _IdentityMd5} = couch_stream:close(DstStream), @@ -546,11 +578,13 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) -> _ -> Enc1 end, - {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc} + {true, {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}} end, BinInfos ), - {BodyData, NewBinInfos}. + {Statuses, NewBinInfos} = lists:unzip(NewBinInfos0), + Changed = lists:foldl(fun(A, B) -> A orelse B end, false, Statuses), + {BodyData, Changed, NewBinInfos}. sort_meta_data(#comp_st{new_st = St0} = CompSt) -> ?COMP_EVENT(md_sort_init), @@ -573,10 +607,10 @@ sort_meta_data(#comp_st{new_st = St0} = CompSt) -> copy_meta_data(#comp_st{new_st = St} = CompSt) -> #st{ - fd = Fd, header = Header, id_tree = Src } = St, + Fd = couch_bt_engine:get_fd(St), SrcFd = couch_emsort:get_fd(Src), DstState = couch_bt_engine_header:id_tree_state(Header), {ok, IdTree0} = couch_btree:open(DstState, Fd, [ @@ -648,13 +682,13 @@ commit_compaction_data(#comp_st{new_st = St} = CompSt) -> }; commit_compaction_data(#st{} = St) -> commit_compaction_data(St, couch_emsort:get_fd(St#st.id_tree)), - commit_compaction_data(St, St#st.fd). + commit_compaction_data(St, couch_bt_engine:get_fd(St)). commit_compaction_data(#st{header = OldHeader} = St0, Fd) -> DataState = couch_bt_engine_header:id_tree_state(OldHeader), MetaFd = couch_emsort:get_fd(St0#st.id_tree), MetaState = couch_emsort:get_state(St0#st.id_tree), - St1 = bind_id_tree(St0, St0#st.fd, DataState), + St1 = bind_id_tree(St0, couch_bt_engine:get_fd(St0), DataState), Header = couch_bt_engine:update_header(St1, St1#st.header), CompHeader = #comp_header{ db_header = Header, diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 3581b1e398f..9f0e8412da0 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -38,7 +38,8 @@ revs_limit/1, uuid/1, epochs/1, - compacted_seq/1 + compacted_seq/1, + max_generation/1 ]). -include_lib("stdlib/include/assert.hrl"). @@ -71,7 +72,8 @@ epochs, compacted_seq, purge_infos_limit = 1000, - props_ptr + props_ptr, + max_generation = 0 }). -define(PARTITION_DISK_VERSION, 8). @@ -87,7 +89,8 @@ from(Header0) -> #db_header{ uuid = Header#db_header.uuid, epochs = Header#db_header.epochs, - compacted_seq = Header#db_header.compacted_seq + compacted_seq = Header#db_header.compacted_seq, + max_generation = Header#db_header.max_generation }. is_header(Header) -> @@ -179,6 +182,9 @@ compacted_seq(Header) -> purge_infos_limit(Header) -> get_field(Header, purge_infos_limit). +max_generation(Header) -> + get_field(Header, max_generation, 0). + get_field(Header, Field) -> get_field(Header, Field, undefined). diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl index 253877e7728..0f2599b7738 100644 --- a/src/couch/src/couch_bt_engine_stream.erl +++ b/src/couch/src/couch_bt_engine_stream.erl @@ -20,41 +20,43 @@ to_disk_term/1 ]). -foldl({_Fd, []}, _Fun, Acc) -> +foldl({_Fd, _Gen, []}, _Fun, Acc) -> Acc; -foldl({Fd, [{Pos, _} | Rest]}, Fun, Acc) -> - foldl({Fd, [Pos | Rest]}, Fun, Acc); -foldl({Fd, [Bin | Rest]}, Fun, Acc) when is_binary(Bin) -> +foldl({Fd, Gen, [{Pos, _} | Rest]}, Fun, Acc) -> + foldl({Fd, Gen, [Pos | Rest]}, Fun, Acc); +foldl({Fd, Gen, [Bin | Rest]}, Fun, Acc) when is_binary(Bin) -> % We're processing the first bit of data % after we did a seek for a range fold. - foldl({Fd, Rest}, Fun, Fun(Bin, Acc)); -foldl({Fd, [Pos | Rest]}, Fun, Acc) when is_integer(Pos) -> + foldl({Fd, Gen, Rest}, Fun, Fun(Bin, Acc)); +foldl({Fd, Gen, [Pos | Rest]}, Fun, Acc) when is_integer(Pos) -> {ok, Bin} = couch_file:pread_binary(Fd, Pos), - foldl({Fd, Rest}, Fun, Fun(Bin, Acc)). + foldl({Fd, Gen, Rest}, Fun, Fun(Bin, Acc)). -seek({Fd, [{Pos, Length} | Rest]}, Offset) -> +seek({Fd, Gen, [{Pos, Length} | Rest]}, Offset) -> case Length =< Offset of true -> - seek({Fd, Rest}, Offset - Length); + seek({Fd, Gen, Rest}, Offset - Length); false -> - seek({Fd, [Pos | Rest]}, Offset) + seek({Fd, Gen, [Pos | Rest]}, Offset) end; -seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) -> +seek({Fd, Gen, [Pos | Rest]}, Offset) when is_integer(Pos) -> {ok, Bin} = couch_file:pread_binary(Fd, Pos), case iolist_size(Bin) =< Offset of true -> - seek({Fd, Rest}, Offset - size(Bin)); + seek({Fd, Gen, Rest}, Offset - size(Bin)); false -> <<_:Offset/binary, Tail/binary>> = Bin, - {ok, {Fd, [Tail | Rest]}} + {ok, {Fd, Gen, [Tail | Rest]}} end. -write({Fd, Written}, Data) when is_pid(Fd) -> +write({Fd, Gen, Written}, Data) when is_pid(Fd) -> {ok, Pos, _} = couch_file:append_binary(Fd, Data), - {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}. + {ok, {Fd, Gen, [{Pos, iolist_size(Data)} | Written]}}. -finalize({Fd, Written}) -> - {ok, {Fd, lists:reverse(Written)}}. +finalize({Fd, Gen, Written}) -> + {ok, {Fd, Gen, lists:reverse(Written)}}. -to_disk_term({_Fd, Written}) -> - {ok, Written}. +to_disk_term({_Fd, 0, Written}) -> + {ok, Written}; +to_disk_term({_Fd, Gen, Written}) -> + {ok, {Gen, Written}}. diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index e33e695c02e..608a263358e 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -63,6 +63,7 @@ is_partitioned/1, set_revs_limit/2, + set_max_generation/2, set_purge_infos_limit/2, set_security/2, set_user_ctx/2, @@ -120,6 +121,7 @@ owner_of/2, start_compact/1, + start_compact/2, cancel_compact/1, wait_for_compaction/1, wait_for_compaction/2, @@ -257,7 +259,10 @@ monitor(#db{main_pid = MainPid}) -> erlang:monitor(process, MainPid). start_compact(#db{} = Db) -> - gen_server:call(Db#db.main_pid, start_compact). + start_compact(Db, 0). + +start_compact(#db{} = Db, Generation) -> + gen_server:call(Db#db.main_pid, {start_compact, Generation}). cancel_compact(#db{main_pid = Pid}) -> gen_server:call(Pid, cancel_compact). @@ -536,6 +541,12 @@ purge_client_exists(DbName, DocId, Props) -> true end. +set_max_generation(#db{main_pid = Pid} = Db, MaxGen) when MaxGen > 0 -> + check_is_admin(Db), + gen_server:call(Pid, {set_max_generation, MaxGen}, infinity); +set_max_generation(_Db, _MaxGen) -> + throw(invalid_max_generation). + set_purge_infos_limit(#db{main_pid = Pid} = Db, Limit) when Limit > 0 -> check_is_admin(Db), gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity); @@ -614,7 +625,7 @@ get_db_info(Db) -> } = Db, {ok, DocCount} = get_doc_count(Db), {ok, DelDocCount} = get_del_doc_count(Db), - SizeInfo = couch_db_engine:get_size_info(Db), + SizeInfos = couch_db_engine:get_size_info(Db), DiskVersion = couch_db_engine:get_disk_version(Db), Uuid = case get_uuid(Db) of @@ -639,7 +650,7 @@ get_db_info(Db) -> {update_seq, get_update_seq(Db)}, {purge_seq, couch_db_engine:get_purge_seq(Db)}, {compact_running, Compactor /= nil}, - {sizes, {SizeInfo}}, + {sizes, lists:map(fun(SI) -> {SI} end, SizeInfos)}, {instance_start_time, StartTime}, {disk_format_version, DiskVersion}, {committed_update_seq, CommittedUpdateSeq}, diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 54f2c148287..31aa6475ed9 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -276,6 +276,9 @@ -callback set_revs_limit(DbHandle :: db_handle(), RevsLimit :: pos_integer()) -> {ok, NewDbHandle :: db_handle()}. +-callback set_max_generation(DbHandle :: db_handle(), MaxGen :: pos_integer()) -> + {ok, NewDbHandle :: db_handle()}. + -callback set_purge_infos_limit(DbHandle :: db_handle(), Limit :: pos_integer()) -> {ok, NewDbHandle :: db_handle()}. @@ -628,6 +631,7 @@ -callback start_compaction( DbHandle :: db_handle(), DbName :: binary(), + Generation :: non_neg_integer(), Options :: db_open_options(), Parent :: pid() ) -> @@ -684,6 +688,7 @@ set_revs_limit/2, set_security/2, + set_max_generation/2, set_purge_infos_limit/2, set_props/2, @@ -711,7 +716,7 @@ fold_purge_infos/5, count_changes_since/2, - start_compaction/1, + start_compaction/2, finish_compaction/2, trigger_on_compact/1 ]). @@ -850,6 +855,15 @@ set_revs_limit(#db{} = Db, RevsLimit) -> {ok, NewSt} = Engine:set_revs_limit(EngineState, RevsLimit), {ok, Db#db{engine = {Engine, NewSt}}}. +set_max_generation(#db{} = Db, MaxGen) -> + #db{engine = {Engine, EngineState}} = Db, + case Engine:set_max_generation(EngineState, MaxGen) of + {ok, NewSt} -> + {ok, Db#db{engine = {Engine, NewSt}}}; + Error -> + Error + end. + set_purge_infos_limit(#db{} = Db, PurgedDocsLimit) -> #db{engine = {Engine, EngineState}} = Db, {ok, NewSt} = Engine:set_purge_infos_limit(EngineState, PurgedDocsLimit), @@ -952,14 +966,14 @@ count_changes_since(#db{} = Db, StartSeq) -> #db{engine = {Engine, EngineState}} = Db, Engine:count_changes_since(EngineState, StartSeq). -start_compaction(#db{} = Db) -> +start_compaction(#db{} = Db, Generation) -> #db{ engine = {Engine, EngineState}, name = DbName, options = Options } = Db, {ok, NewEngineState, Pid} = Engine:start_compaction( - EngineState, DbName, Options, self() + EngineState, DbName, Generation, Options, self() ), {ok, Db#db{ engine = {Engine, NewEngineState}, @@ -974,8 +988,9 @@ finish_compaction(Db, CompactInfo) -> } = Db, NewDb = case Engine:finish_compaction(St, DbName, Options, CompactInfo) of - {ok, NewState, undefined} -> + {ok, NewState, DstGen} when is_integer(DstGen) -> couch_event:notify(DbName, compacted), + couch_event:notify(DbName, {compacted_into_generation, DstGen}), Db#db{ engine = {Engine, NewState}, compactor_pid = nil diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 3f6c8886dc5..e9851bb301e 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -13,8 +13,9 @@ -module(couch_db_updater). -behaviour(gen_server). --export([add_sizes/3, upgrade_sizes/1]). +-export([add_sizes/3, map_sizes/2, map_fold_sizes/1, sum_sizes/1, upgrade_sizes/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2]). +-export([generation_pointer/1, canonical_pointer/1]). -include_lib("couch/include/couch_db.hrl"). -include("couch_db_int.hrl"). @@ -62,8 +63,8 @@ terminate(Reason, Db) -> handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; -handle_call(start_compact, _From, Db) -> - {noreply, NewDb} = handle_cast(start_compact, Db), +handle_call({start_compact, Generation}, _From, Db) -> + {noreply, NewDb} = handle_cast({start_compact, Generation}, Db), {reply, {ok, NewDb#db.compactor_pid}, NewDb}; handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) -> {reply, Pid, Db}; @@ -88,6 +89,14 @@ handle_call({set_revs_limit, Limit}, _From, Db) -> Db3 = commit_data(Db2), ok = couch_server:db_updated(Db3), {reply, ok, Db3}; +handle_call({set_max_generation, MaxGen}, _From, Db) -> + case couch_db_engine:set_max_generation(Db, MaxGen) of + {ok, Db2} -> + ok = couch_server:db_updated(Db2), + {reply, ok, Db2}; + {error, Error} -> + {reply, Error, Db} + end; handle_call({set_purge_infos_limit, Limit}, _From, Db) -> {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), ok = couch_server:db_updated(Db2), @@ -127,7 +136,7 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) -> Db2 = Db#db{validate_doc_funs = ValidationFuns}, ok = couch_server:db_updated(Db2), {noreply, Db2}; -handle_cast(start_compact, Db) -> +handle_cast({start_compact, Generation}, Db) -> case Db#db.compactor_pid of nil -> % For now we only support compacting to the same @@ -142,7 +151,7 @@ handle_cast(start_compact, Db) -> ) ), couch_log:Level("Starting compaction for db \"~s\" at ~p", Args), - {ok, Db2} = couch_db_engine:start_compaction(Db), + {ok, Db2} = couch_db_engine:start_compaction(Db, Generation), ok = couch_server:db_updated(Db2), {noreply, Db2}; _ -> @@ -378,14 +387,10 @@ flush_trees( {0, 0, []}, Unflushed ), - {FinalAS, FinalES, FinalAtts} = FinalAcc, - TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), + GenSizes = map_fold_sizes(FinalAcc), NewInfo = InfoUnflushed#full_doc_info{ rev_tree = Flushed, - sizes = #size_info{ - active = FinalAS + TotalAttSize, - external = FinalES + TotalAttSize - } + sizes = GenSizes }, flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]). @@ -416,7 +421,40 @@ check_doc_atts(Db, Doc) -> end end. -add_sizes(leaf, #leaf{sizes = Sizes, atts = AttSizes}, Acc) -> +canonical_pointer({0, Ptr}) -> + Ptr; +canonical_pointer(Ptr) -> + Ptr. + +generation_pointer(Ptr) when is_integer(Ptr) -> + {0, Ptr}; +generation_pointer(Ptr) when is_list(Ptr) -> + {0, Ptr}; +generation_pointer(undefined) -> + {0, undefined}; +generation_pointer({Gen, Ptr}) -> + {Gen, Ptr}. + +add_sizes(leaf, #leaf{ptr = Ptr} = Leaf, Acc) -> + {Gen, _} = generation_pointer(Ptr), + case add_sizes_int(Leaf, Gen, Acc) of + [A] -> A; + A -> A + end; +add_sizes(_, #leaf{}, Acc) -> + % For intermediate nodes external and active contribution is 0 + Acc. + +add_sizes_int(Leaf, Gen, []) -> + add_sizes_int(Leaf, Gen, [{0, 0, []}]); +add_sizes_int(Leaf, 0, [Acc | Rest]) -> + [add_sizes_leaf(Leaf, Acc) | Rest]; +add_sizes_int(Leaf, Gen, [Acc | Rest]) -> + [Acc | add_sizes_int(Leaf, Gen - 1, Rest)]; +add_sizes_int(Leaf, Gen, Acc) -> + add_sizes_int(Leaf, Gen, [Acc]). + +add_sizes_leaf(#leaf{sizes = Sizes, atts = AttSizes}, Acc) -> % Maybe upgrade from disk_size only #size_info{ active = ActiveSize, @@ -426,11 +464,41 @@ add_sizes(leaf, #leaf{sizes = Sizes, atts = AttSizes}, Acc) -> NewASAcc = ActiveSize + ASAcc, NewESAcc = ExternalSize + ESAcc, NewAttsAcc = lists:umerge(AttSizes, AttsAcc), - {NewASAcc, NewESAcc, NewAttsAcc}; -add_sizes(_, #leaf{}, Acc) -> - % For intermediate nodes external and active contribution is 0 - Acc. + {NewASAcc, NewESAcc, NewAttsAcc}. + +map_fold_sizes(FinalSizesAcc) -> + map_sizes(fun fold_sizes/1, FinalSizesAcc). +map_sizes(Fun, Sizes) when is_list(Sizes) -> + lists:map(Fun, Sizes); +map_sizes(Fun, Sizes) -> + Fun(Sizes). + +fold_sizes({ActiveSize, ExternalSize, AttSizes}) -> + TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, AttSizes), + #size_info{ + active = ActiveSize + TotalAttSize, + external = ExternalSize + TotalAttSize + }. + +sum_sizes(SI) when is_list(SI) -> + lists:foldl( + fun(A, B) -> + #size_info{ + active = A#size_info.active + B#size_info.active, + external = A#size_info.external + B#size_info.external + } + end, + #size_info{active = 0, external = 0}, + SI + ); +sum_sizes(#size_info{} = SI) -> + SI. + +upgrade_sizes([S]) -> + upgrade_sizes(S); +upgrade_sizes(S) when is_list(S) -> + lists:map(fun upgrade_sizes/1, S); upgrade_sizes(#size_info{} = SI) -> SI; upgrade_sizes({D, E}) -> @@ -784,9 +852,9 @@ estimate_size(#full_doc_info{} = FDI) -> (_Rev, _Value, branch, SizesAcc) -> SizesAcc end, - {_, FinalES, FinalAtts} = couch_key_tree:fold(Fun, {0, 0, []}, RevTree), - TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), - FinalES + TotalAttSize. + Sizes = couch_key_tree:fold(Fun, {0, 0, []}, RevTree), + #size_info{external = ES} = sum_sizes(map_fold_sizes(Sizes)), + ES. purge_docs(Db, []) -> {ok, Db, []}; diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 5c566ed789a..dd11002e349 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -203,7 +203,8 @@ handle_compact_req(#httpd{method = 'POST'} = Req, Db) -> ok = couch_db:check_is_admin(Db), couch_httpd:validate_ctype(Req, "application/json"), _ = couch_httpd:body(Req), - {ok, _} = couch_db:start_compact(Db), + SrcGen = list_to_integer(couch_httpd:qs_value(Req, "gen", "0")), + {ok, _} = couch_db:start_compact(Db, SrcGen), send_json(Req, 202, {[{ok, true}]}); [_DbName, <<"_compact">>, DesignName | _] -> DesignId = <<"_design/", DesignName/binary>>, diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_tests.erl index 4ed57668efa..63ede51fb7f 100644 --- a/src/couch/test/eunit/couch_bt_engine_compactor_tests.erl +++ b/src/couch/test/eunit/couch_bt_engine_compactor_tests.erl @@ -19,8 +19,11 @@ -define(WAIT_DELAY_COUNT, 50). setup() -> + setup(0). + +setup(MaxGen) -> DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX, {max_generation, MaxGen}]), ok = couch_db:close(Db), create_docs(DbName), DbName. @@ -40,25 +43,57 @@ basic_compaction_test_() -> fun setup/0, fun teardown/1, [ - fun compaction_resume/1, + fun compaction_resume_0/1, fun is_compacting_works/1 ] } }. -compaction_resume(DbName) -> +generational_compaction_test_() -> + { + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + { + foreach, + fun() -> setup(2) end, + fun teardown/1, + [ + fun compaction_resume_0/1, + fun compaction_resume_1/1, + fun compaction_resume_2/1, + fun is_compacting_works/1 + ] + } + }. + +compaction_resume_0(DbName) -> + compaction_resume(DbName, 0). + +compaction_resume_1(DbName) -> + compaction_resume(DbName, 1). + +compaction_resume_2(DbName) -> + compaction_resume(DbName, 2). + +compaction_resume(DbName, Gen) -> + lists:foreach( + fun(G) -> compact_db(DbName, G) end, + lists:seq(0, Gen - 1) + ), + ?_test(begin check_db_validity(DbName), - compact_db(DbName), + compact_db(DbName, Gen), check_db_validity(DbName), % Force an error when copying document ids with_mecked_emsort(fun() -> - compact_db(DbName) + compact_db(DbName, Gen) end), check_db_validity(DbName), - compact_db(DbName), + compact_db(DbName, Gen), check_db_validity(DbName) end). @@ -120,8 +155,12 @@ create_docs(DbName) -> end). compact_db(DbName) -> + compact_db(DbName, 0). + +compact_db(DbName, Gen) -> + couch_log:error("compact_db(Gen = ~p)", [Gen]), couch_util:with_db(DbName, fun(Db) -> - {ok, _} = couch_db:start_compact(Db) + {ok, _} = couch_db:start_compact(Db, Gen) end), wait_db_compact_done(DbName, ?WAIT_DELAY_COUNT). diff --git a/src/couch/test/eunit/couch_stream_tests.erl b/src/couch/test/eunit/couch_stream_tests.erl index f29c04c82c3..fb9a380b637 100644 --- a/src/couch/test/eunit/couch_stream_tests.erl +++ b/src/couch/test/eunit/couch_stream_tests.erl @@ -14,7 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). --define(ENGINE(FdVar), {couch_bt_engine_stream, {FdVar, []}}). +-define(ENGINE(FdVar), {couch_bt_engine_stream, {FdVar, 0, []}}). setup() -> {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]), diff --git a/src/couch/test/eunit/couchdb_file_compression_tests.erl b/src/couch/test/eunit/couchdb_file_compression_tests.erl index 122900d4ba5..dcb7ec84673 100644 --- a/src/couch/test/eunit/couchdb_file_compression_tests.erl +++ b/src/couch/test/eunit/couchdb_file_compression_tests.erl @@ -208,10 +208,20 @@ view_external_size(DbName) -> external_size(Info). active_size(Info) -> - couch_util:get_nested_json_value({Info}, [sizes, active]). + Size = + case couch_util:get_value(sizes, Info) of + [{Size} | _] -> Size; + {Size} -> Size + end, + couch_util:get_value(active, Size). external_size(Info) -> - couch_util:get_nested_json_value({Info}, [sizes, external]). + Size = + case couch_util:get_value(sizes, Info) of + [{Size} | _] -> Size; + {Size} -> Size + end, + couch_util:get_value(external, Size). wait_compaction(DbName, Kind, Line) -> WaitFun = fun() -> diff --git a/src/couch_mrview/test/eunit/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_purge_docs_tests.erl index 44500bdf1bb..e242dee2452 100644 --- a/src/couch_mrview/test/eunit/couch_mrview_purge_docs_tests.erl +++ b/src/couch_mrview/test/eunit/couch_mrview_purge_docs_tests.erl @@ -565,7 +565,12 @@ db_disk_size(DbName) -> active_size(Info). active_size(Info) -> - couch_util:get_nested_json_value({Info}, [sizes, active]). + Size = + case couch_util:get_value(sizes, Info) of + [{Size} | _] -> Size; + {Size} -> Size + end, + couch_util:get_value(active, Size). wait_compaction(DbName, Kind, Line) -> WaitFun = fun() -> diff --git a/src/docs/src/api/database/common.rst b/src/docs/src/api/database/common.rst index bfc86118f54..388cb6f6cca 100644 --- a/src/docs/src/api/database/common.rst +++ b/src/docs/src/api/database/common.rst @@ -156,6 +156,9 @@ :query integer n: Replicas. The number of copies of the database in the cluster. The default is 3, unless overridden in the :config:option:`cluster config ` . + :query integer gen: Maximum generation. The number of generation files in + which to store database content. The default is 0, which means the + database will not use generational storage. :query boolean partitioned: Whether to create a partitioned database. Default is false. :
header Content-Type: - :mimetype:`application/json` + - :mimetype:`text/plain; charset=utf-8` + :>json boolean ok: Operation status + :code 200: Request completed successfully + :code 400: Invalid JSON data + :code 401: Unauthorized request to a protected API + :code 403: Insufficient permissions / :ref:`Too many requests with invalid credentials` + + **Request**: + + .. code-block:: http + + PUT /db/_max_generation HTTP/1.1 + Accept: application/json + Content-Length: 1 + Content-Type: application/json + Host: localhost:5984 + + 2 + + **Response**: + + .. code-block:: http + + HTTP/1.1 200 OK + Cache-Control: must-revalidate + Content-Length: 12 + Content-Type: application/json + Date: Wed, 14 Jun 2017 14:45:34 GMT + Server: CouchDB (Erlang/OTP) + + { + "ok": true + } + .. _api/db/missing_revs: ======================= diff --git a/src/docs/src/maintenance/compaction.rst b/src/docs/src/maintenance/compaction.rst index abc0cfbb0c7..c5684214d55 100644 --- a/src/docs/src/maintenance/compaction.rst +++ b/src/docs/src/maintenance/compaction.rst @@ -377,3 +377,60 @@ exist anymore) you can trigger a :ref:`view cleanup `:: .. code-block:: javascript {"ok":true} + +Generational Compaction +======================= + +Normally, compaction works by copying all live data (the document bodies and +attachments for the latest versions of all documents) into a new file with the +``.compact`` extension, and then replacing the normal database file with this +new one. That is, if the database resides in a file named ``mydb.couch``, then +compaction copies its live data into ``mydb.couch.compact``, and then renames +``mydb.couch.compact`` to ``mydb.couch``. + +If you have a lot of documents that do not change for long periods of time, then +compaction will have to copy a lot of the same data every time it runs. For +large data sets, especially those where individual documents or attachments are +very large, this can be wasteful and it leads to compaction taking a long time +to run. + +To improve compaction times for such data sets, CouchDB offers a mode called +`generational compaction`. When this is enabled, the database is stored in +multiple files known as `generations`, named ``mydb.1.couch``, ``mydb.2.couch`` +and so on, rather than in a single file. Rather than building a whole new copy +of the ``mydb.couch`` file, generational compaction `promotes` live data from +its current generation to the next one up. + +All new document and attachment data is originally stored in `generation 0`, and +when it is compacted, its live data is promoted to `generation 1`. The next time +generation 0 is compacted, CouchDB will not have to re-copy all the data that +was already promoted to generation 1. + +Generational storage and compaction can be enabled either by setting the ``gen`` +parameter when the database is first created: + +.. code-block:: bash + + curl -X PUT http://admin:pass@localhost:5984/dbname?gen=G + +Or, by setting the ``max_generation`` property on an existing database: + +.. codeb-block:: bash + + curl -X PUT http://admin:pass@localhost:5984/dbname/_max_generation -d G + +The value ``G`` sets the database's ``max_generation`` number; a value of ``0`` +selects the normal single-file storage model. A value of ``1`` produces a single +extra generational file, ``2`` produces two extra files, and so on. + +When compacting a generational database, you may specify a generation to compact +via the ``gen`` parameter: + +.. code-block:: bash + + curl -X POST http://admin:pass@localhost:5984/mydb/_compact?gen=1 + +Compaction targets generation 0 by default. CouchDB will only ever compact a +single generation of a database at the time, and Smoosh monitors the extra +generation files for when they exceed a configured slack/ratio threshold, just +like it does when using single-file storage. diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index d552a387ddf..fc60af98c2a 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -29,6 +29,7 @@ get_revs_limit/1, get_security/1, get_security/2, get_all_security/1, get_all_security/2, + set_max_generation/3, get_purge_infos_limit/1, set_purge_infos_limit/3, get_purged_infos/1, @@ -185,6 +186,13 @@ set_security(DbName, SecObj) -> set_security(DbName, SecObj, Options) -> fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). +%% @doc sets the upper bound for the number of storage generations +-spec set_max_generation(dbname(), pos_integer(), [option()]) -> ok. +set_max_generation(DbName, MaxGen, Options) when + is_integer(MaxGen), MaxGen > 0 +-> + fabric_db_meta:set_max_generation(dbname(DbName), MaxGen, opts(Options)). + %% @doc sets the upper bound for the number of stored purge requests -spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok. set_purge_infos_limit(DbName, Limit, Options) when @@ -237,9 +245,9 @@ get_all_security(DbName) -> get_all_security(DbName, Options) -> fabric_db_meta:get_all_security(dbname(DbName), opts(Options)). -compact(DbName) -> +compact({DbName, SrcGen}) -> [ - rexi:cast(Node, {fabric_rpc, compact, [Name]}) + rexi:cast(Node, {fabric_rpc, compact, [{Name, SrcGen}]}) || #shard{node = Node, name = Name} <- mem3:shards(dbname(DbName)) ], ok. diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index 5461404c508..33dcd853e38 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -108,7 +108,7 @@ merge_results(Info) -> (compact_running, X, Acc) -> [{compact_running, lists:member(true, X)} | Acc]; (sizes, X, Acc) -> - [{sizes, {merge_object(X)}} | Acc]; + [{sizes, merge_sizes(X)} | Acc]; (disk_format_version, X, Acc) -> [{disk_format_version, lists:max(X)} | Acc]; (cluster, [X], Acc) -> @@ -122,6 +122,53 @@ merge_results(Info) -> Dict ). +merge_sizes(ByShard) -> + ByGen = fill_transpose({[]}, ByShard), + Merged = [{merge_object(S)} || S <- ByGen], + case Merged of + [S] -> S; + S -> S + end. + +fill_transpose(Filler, Lists) -> + fill_transpose(Filler, Lists, []). + +fill_transpose(_Filler, [], Acc) -> + lists:reverse(Acc); +fill_transpose(Filler, Lists, Acc) -> + {Heads, Tails} = lists:unzip( + lists:map( + fun(List) -> + case List of + [] -> {undefined, []}; + [H | T] -> {H, T} + end + end, + Lists + ) + ), + case lists:all(fun all_undefined/1, Heads) of + true -> + fill_transpose(Filler, [], Acc); + _ -> + Filled = fill_list(Filler, Heads), + fill_transpose(Filler, Tails, [Filled | Acc]) + end. + +all_undefined(undefined) -> true; +all_undefined(_) -> false. + +fill_list(Filler, List) -> + lists:map( + fun(Item) -> + case Item of + undefined -> Filler; + X -> X + end + end, + List + ). + merge_object(Objects) -> Dict = lists:foldl( fun({Props}, D) -> diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl index 1013b958d40..e643013da74 100644 --- a/src/fabric/src/fabric_db_meta.erl +++ b/src/fabric/src/fabric_db_meta.erl @@ -16,6 +16,7 @@ set_revs_limit/3, set_security/3, get_all_security/2, + set_max_generation/3, set_purge_infos_limit/3 ]). @@ -51,6 +52,21 @@ handle_revs_message(ok, Worker, {Workers, Waiting}) -> handle_revs_message(Error, _, _Acc) -> {error, Error}. +set_max_generation(DbName, MaxGen, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_max_generation, [MaxGen, Options]), + Handler = fun handle_purge_message/3, + Acc0 = {Workers, length(Workers) - 1}, + case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of + {ok, ok} -> + ok; + {timeout, {DefunctWorkers, _}} -> + fabric_util:log_timeout(DefunctWorkers, "set_max_generation"), + {error, timeout}; + Error -> + Error + end. + set_purge_infos_limit(DbName, Limit, Options) -> Shards = mem3:shards(DbName), Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]), diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 67f529e0935..3e846f6556f 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -39,7 +39,9 @@ ]). -export([get_all_security/2, open_shard/2]). -export([compact/1, compact/2]). --export([get_purge_seq/2, get_purged_infos/1, purge_docs/3, set_purge_infos_limit/3]). +-export([ + get_purge_seq/2, get_purged_infos/1, purge_docs/3, set_max_generation/3, set_purge_infos_limit/3 +]). -export([ get_db_info/2, @@ -259,6 +261,9 @@ get_all_security(DbName, Options) -> set_revs_limit(DbName, Limit, Options) -> with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). +set_max_generation(DbName, MaxGen, Options) -> + with_db(DbName, Options, {couch_db, set_max_generation, [MaxGen]}). + set_purge_infos_limit(DbName, Limit, Options) -> with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). @@ -334,8 +339,8 @@ open_shard(Name, Opts) -> couch_stats:increment_counter([fabric, open_shard, timeouts]) end. -compact(DbName) -> - with_db(DbName, [], {couch_db, start_compact, []}). +compact({DbName, SrcGen}) -> + with_db(DbName, [], {couch_db, start_compact, [SrcGen]}). compact(ShardName, DesignName) -> {ok, Pid} = couch_index_server:get_index( diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index 2579649f973..48c1e8d2310 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -752,7 +752,7 @@ purge_docs(DbName, DocIdRevs) -> compact(DbName) -> InitFileSize = get_db_file_size(DbName), - ok = with_proc(fun() -> fabric:compact(DbName) end), + ok = with_proc(fun() -> fabric:compact({DbName, 0}) end), test_util:wait( fun() -> case {compact_running(DbName), get_db_file_size(DbName)} of diff --git a/src/smoosh/src/smoosh.erl b/src/smoosh/src/smoosh.erl index 68e8d1828ea..ada07d9cf1f 100644 --- a/src/smoosh/src/smoosh.erl +++ b/src/smoosh/src/smoosh.erl @@ -36,7 +36,7 @@ status() -> enqueue_all_dbs() -> fold_local_shards( fun(#shard{name = Name}, _Acc) -> - sync_enqueue(Name) + sync_enqueue({Name, 0}) end, ok ). diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl index 3cfbcdec69c..5099c822aac 100644 --- a/src/smoosh/src/smoosh_channel.erl +++ b/src/smoosh/src/smoosh_channel.erl @@ -329,21 +329,23 @@ start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) -> _ -> false end; -start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) -> +start_compact(#state{name = Name} = State, {DbName, SrcGen} = Key) when + is_binary(DbName), is_integer(SrcGen) +-> case couch_db:open_int(DbName, []) of {ok, Db} -> try - start_compact(State, Db) + start_compact(State, {Db, SrcGen}) after couch_db:close(Db) end; Error = {not_found, no_db_file} -> LogMsg = "~s : Error starting compaction for ~p: ~p", - LogArgs = [Name, smoosh_utils:stringify(DbName), Error], + LogArgs = [Name, smoosh_utils:stringify(Key), Error], couch_log:warning(LogMsg, LogArgs), false end; -start_compact(#state{} = State, {Shard, GroupId} = Key) -> +start_compact(#state{} = State, {Shard, GroupId} = Key) when is_binary(Shard), is_binary(GroupId) -> #state{name = Name, starting = Starting} = State, case smoosh_utils:ignore_db({Shard, GroupId}) of false -> @@ -362,16 +364,17 @@ start_compact(#state{} = State, {Shard, GroupId} = Key) -> _ -> false end; -start_compact(#state{} = State, Db) -> +start_compact(#state{} = State, {Db, SrcGen}) when is_integer(SrcGen) -> #state{name = Name, starting = Starting, active = Active} = State, - Key = couch_db:name(Db), - case smoosh_utils:ignore_db(Key) of + DbName = couch_db:name(Db), + Key = {DbName, SrcGen}, + case smoosh_utils:ignore_db(DbName) of false -> case couch_db:get_compactor_pid(Db) of nil -> DbPid = couch_db:get_pid(Db), Ref = erlang:monitor(process, DbPid), - DbPid ! {'$gen_call', {self(), Ref}, start_compact}, + DbPid ! {'$gen_call', {self(), Ref}, {start_compact, SrcGen}}, State#state{starting = Starting#{Ref => Key}}; % Compaction is already running, so monitor existing compaction pid. CPid when is_pid(CPid) -> @@ -532,7 +535,7 @@ teardown_purge_seq({Ctx, DbName}) -> t_start_db_with_missing_db({_, _}) -> State = #state{name = "ratio_dbs"}, meck:reset(couch_log), - try_compact(State, <<"missing_db">>), + try_compact(State, {<<"missing_db">>, 0}), ?assertEqual(1, meck:num_calls(couch_log, warning, 2)). t_start_view_with_missing_db({_, _}) -> diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl index 0ae759016e9..a1b46c7c740 100644 --- a/src/smoosh/src/smoosh_server.erl +++ b/src/smoosh/src/smoosh_server.erl @@ -119,10 +119,13 @@ sync_enqueue(Object0) -> end. handle_db_event(DbName, local_updated, St) -> - enqueue(DbName), + enqueue({DbName, 0}), {ok, St}; handle_db_event(DbName, updated, St) -> - enqueue(DbName), + enqueue({DbName, 0}), + {ok, St}; +handle_db_event(DbName, {compacted_into_generation, DstGen}, St) -> + enqueue({DbName, DstGen}), {ok, St}; handle_db_event(DbName, {index_commit, IdxName}, St) -> enqueue({DbName, IdxName}), @@ -313,12 +316,12 @@ enqueue_request(State, Object) -> ok end. +find_channel(#state{} = State, {DbName, SrcGen}) when is_integer(SrcGen) -> + find_channel(State, State#state.db_channels, {DbName, SrcGen}); find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) -> find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName}); find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) -> - find_channel(State, State#state.view_channels, {Shard, GroupId}); -find_channel(#state{} = State, DbName) -> - find_channel(State, State#state.db_channels, DbName). + find_channel(State, State#state.view_channels, {Shard, GroupId}). find_channel(#state{} = _State, [], _Object) -> false; @@ -381,7 +384,7 @@ get_priority(_Channel, {?INDEX_CLEANUP, DbName}) -> error:database_does_not_exist -> 0 end; -get_priority(Channel, {Shard, GroupId}) -> +get_priority(Channel, {Shard, GroupId}) when is_binary(GroupId) -> try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of {ok, Pid} -> try @@ -409,11 +412,11 @@ get_priority(Channel, {Shard, GroupId}) -> throw:{not_found, _} -> 0 end; -get_priority(Channel, DbName) when is_binary(DbName) -> +get_priority(Channel, {DbName, SrcGen}) when is_binary(DbName), is_integer(SrcGen) -> case couch_db:open_int(DbName, []) of {ok, Db} -> try - get_priority(Channel, Db) + get_priority(Channel, {Db, SrcGen}) after couch_db:close(Db) end; @@ -421,9 +424,9 @@ get_priority(Channel, DbName) when is_binary(DbName) -> % It's expected that a db might be deleted while waiting in queue 0 end; -get_priority(Channel, Db) -> +get_priority(Channel, {Db, SrcGen}) when is_integer(SrcGen) -> {ok, DocInfo} = couch_db:get_db_info(Db), - {SizeInfo} = couch_util:get_value(sizes, DocInfo), + {SizeInfo} = lists:nth(SrcGen + 1, couch_util:get_value(sizes, DocInfo)), DiskSize = couch_util:get_value(file, SizeInfo), ActiveSize = couch_util:get_value(active, SizeInfo), NeedsUpgrade = needs_upgrade(DocInfo), diff --git a/src/smoosh/src/smoosh_utils.erl b/src/smoosh/src/smoosh_utils.erl index 087b98d2da4..89dd176639b 100644 --- a/src/smoosh/src/smoosh_utils.erl +++ b/src/smoosh/src/smoosh_utils.erl @@ -44,6 +44,8 @@ split(CSV) -> stringify({?INDEX_CLEANUP, DbName}) -> io_lib:format("~s index_cleanup", [DbName]); +stringify({DbName, Gen}) when is_integer(Gen) -> + io_lib:format("~s (gen=~p)", [DbName, Gen]); stringify({DbName, GroupId}) -> io_lib:format("~s ~s", [DbName, GroupId]); stringify(DbName) -> @@ -125,14 +127,14 @@ capacity(ChannelName) -> % validate_arg({?INDEX_CLEANUP, DbName}) when is_list(DbName) -> validate_arg({?INDEX_CLEANUP, ?l2b(DbName)}); -validate_arg(DbName) when is_list(DbName) -> - validate_arg(?l2b(DbName)); +validate_arg({DbName, Gen}) when is_list(DbName), is_integer(Gen) -> + validate_arg({?l2b(DbName), Gen}); validate_arg({DbName, GroupId}) when is_list(DbName) -> validate_arg({?l2b(DbName), GroupId}); validate_arg({DbName, GroupId}) when is_list(GroupId) -> validate_arg({DbName, ?l2b(GroupId)}); -validate_arg(DbName) when is_binary(DbName) -> - DbName; +validate_arg({DbName, Gen}) when is_binary(DbName), is_integer(Gen) -> + {DbName, Gen}; validate_arg({DbName, GroupId}) when is_binary(DbName), is_binary(GroupId) -> {DbName, GroupId}; validate_arg({?INDEX_CLEANUP, DbName}) when is_binary(DbName) -> @@ -145,8 +147,8 @@ validate_arg(_) -> -include_lib("couch/include/couch_eunit.hrl"). smoosh_util_validate_test() -> - ?assertEqual(<<"x">>, validate_arg(<<"x">>)), - ?assertEqual(<<"x">>, validate_arg("x")), + ?assertEqual({<<"x">>, 0}, validate_arg({<<"x">>, 0})), + ?assertEqual({<<"x">>, 0}, validate_arg({"x", 0})), ?assertEqual({<<"x">>, <<"y">>}, validate_arg({"x", "y"})), ?assertEqual({<<"x">>, <<"y">>}, validate_arg({<<"x">>, "y"})), ?assertEqual({<<"x">>, <<"y">>}, validate_arg({"x", <<"y">>})), diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl index 6861db5e188..4b4f972b43c 100644 --- a/src/smoosh/test/smoosh_tests.erl +++ b/src/smoosh/test/smoosh_tests.erl @@ -327,15 +327,15 @@ t_manual_enqueue_api_works(DbName) -> config:set("smoosh.ratio_dbs", "min_priority", "1", false), config:set("smoosh.ratio_views", "min_priority", "1", false), - ?assertEqual(ok, smoosh_server:sync_enqueue(<<"invalid">>)), + ?assertEqual(ok, smoosh_server:sync_enqueue({<<"invalid">>, 0})), ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, <<"invalid">>})), ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/invalid">>})), - ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)), + ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, 0})), ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})), ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})), - ?assertEqual(ok, smoosh:enqueue(Shard)), + ?assertEqual(ok, smoosh:enqueue({Shard, 0})), ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})), ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})), @@ -345,14 +345,14 @@ t_manual_enqueue_api_works(DbName) -> % Enqueuing the same items in a loop should work lists:foreach( fun(_) -> - ?assertEqual(ok, smoosh:enqueue(Shard)), + ?assertEqual(ok, smoosh:enqueue({Shard, 0})), ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})), ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})) end, lists:seq(1, 1000) ), - ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)), + ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, 0})), ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})), ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})), @@ -454,7 +454,7 @@ wait_for_channels(N) when is_integer(N), N >= 0 -> test_util:wait(WaitFun). wait_to_enqueue(DbName) when is_binary(DbName) -> - wait_enqueue(shard_name(DbName)); + wait_enqueue({shard_name(DbName), 0}); wait_to_enqueue({DbName, View}) when is_binary(DbName) -> wait_enqueue({shard_name(DbName), View}); wait_to_enqueue({index_cleanup, DbName}) when is_binary(DbName) -> diff --git a/test/elixir/lib/couch/dbtest.ex b/test/elixir/lib/couch/dbtest.ex index 8a1a32449db..a5f32c54adb 100644 --- a/test/elixir/lib/couch/dbtest.ex +++ b/test/elixir/lib/couch/dbtest.ex @@ -300,8 +300,8 @@ defmodule Couch.DBTest do resp.body end - def compact(db_name) do - resp = Couch.post("/#{db_name}/_compact") + def compact(db_name, gen \\ 0) do + resp = Couch.post("/#{db_name}/_compact?gen=#{gen}") assert resp.status_code == 202 retry_until( diff --git a/test/elixir/test/compact_generation_test.exs b/test/elixir/test/compact_generation_test.exs new file mode 100644 index 00000000000..f3a482d7ee0 --- /dev/null +++ b/test/elixir/test/compact_generation_test.exs @@ -0,0 +1,521 @@ +defmodule CompactGenerationTest do + use CouchTestCase + + @moduletag :compact + + @moduledoc """ + Test generational CouchDB compaction + """ + + @db_name "testdb" + + test "increase the max generation" do + db_path = setup_db(0) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + gen_compact(0) + check_files(db_path, "DOC BODY", [1]) + check_files(db_path, "ATT DATA", [1]) + + Couch.put("/#{@db_name}/_max_generation", body: "2") + + gen_compact(0) + check_files(db_path, "DOC BODY", [0, 1, 0]) + check_files(db_path, "ATT DATA", [0, 1, 0]) + + gen_compact(1) + check_files(db_path, "DOC BODY", [0, 0, 1]) + check_files(db_path, "ATT DATA", [0, 0, 1]) + end + + test "do not decrease the max generation" do + db_path = setup_db(0) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + gen_compact(0) + check_files(db_path, "DOC BODY", [1]) + check_files(db_path, "ATT DATA", [1]) + + Couch.put("/#{@db_name}/_max_generation", body: "2") + + gen_compact(0) + check_files(db_path, "DOC BODY", [0, 1, 0]) + check_files(db_path, "ATT DATA", [0, 1, 0]) + + Couch.put("/#{@db_name}/_max_generation", body: "1") + + gen_compact(1) + check_files(db_path, "DOC BODY", [0, 0, 1]) + check_files(db_path, "ATT DATA", [0, 0, 1]) + end + + test "promote docs and atts into gen-1" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [2, 0]) + check_files(db_path, "ATT DATA", [1, 0]) + + gen_compact(0) + check_files(db_path, "DOC BODY", [0, 1]) + check_files(db_path, "ATT DATA", [0, 1]) + end + + test "read doc and attachment after compaction" do + db_path = setup_db(2) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + gen_compact(0) + gen_compact(1) + + %{ "value" => value } = Couch.get("/#{@db_name}/the-doc").body + assert value == "DOC BODY" + + att = Couch.get("/#{@db_name}/the-doc/the-att").body + assert att == "ATT DATA" + end + + test "read doc after open file timeout" do + db_path = setup_db(2) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + gen_compact(0) + gen_compact(1) + Process.sleep(70000) + + %{ "value" => value } = Couch.get("/#{@db_name}/the-doc").body + assert value == "DOC BODY" + + att = Couch.get("/#{@db_name}/the-doc/the-att").body + assert att == "ATT DATA" + end + + test "add an attachment to a compacted doc" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + gen_compact(0) + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [1, 1]) + check_files(db_path, "ATT DATA", [1, 0]) + end + + test "compact an attachment added to a compacted doc" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + gen_compact(0) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 2]) + check_files(db_path, "ATT DATA", [0, 1]) + end + + test "compact a doc and att an additional generation" do + db_path = setup_db(2) + + make_doc("the-doc", %{ value: "DOC BODY" }) + gen_compact(0) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 1]) + check_files(db_path, "ATT DATA", [0, 0, 1]) + end + + test "update a compacted doc" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + update_doc("the-doc", %{ value: "NEW TEXT" }) + + check_files(db_path, "DOC BODY", [0, 1]) + check_files(db_path, "ATT DATA", [0, 1]) + check_files(db_path, "NEW TEXT", [1 ,0]) + end + + test "compact an updated doc" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + update_doc("the-doc", %{ value: "NEW TEXT" }) + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 0]) + check_files(db_path, "ATT DATA", [0, 1]) + check_files(db_path, "NEW TEXT", [0, 1]) + end + + test "compact, update and compact a doc" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + update_doc("the-doc", %{ value: "NEW TEXT" }) + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 1]) + check_files(db_path, "ATT DATA", [0, 1]) + check_files(db_path, "NEW TEXT", [0, 1]) + end + + test "compact, update then compact twice" do + db_path = setup_db(2) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + update_doc("the-doc", %{ value: "NEW TEXT" }) + gen_compact(0) + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1]) + check_files(db_path, "NEW TEXT", [0, 0, 1]) + end + + test "compact through multiple generations" do + db_path = setup_db(3) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + gen_compact(0) + gen_compact(1) + make_att("the-doc", "extra-att", "EXTRA STUFF") + + check_files(db_path, "DOC BODY", [1, 0, 1, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + check_files(db_path, "EXTRA STUFF", [1, 0, 0, 0]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 1, 1, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + check_files(db_path, "EXTRA STUFF", [0, 1, 0, 0]) + + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 2, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + check_files(db_path, "EXTRA STUFF", [0, 0, 1, 0]) + + gen_compact(2) + + check_files(db_path, "DOC BODY", [0, 0, 0, 1]) + check_files(db_path, "ATT DATA", [0, 0, 0, 1]) + check_files(db_path, "EXTRA STUFF", [0, 0, 0, 1]) + end + + test "compact max generation (0)" do + db_path = setup_db(0) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [2]) + check_files(db_path, "ATT DATA", [1]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [1]) + check_files(db_path, "ATT DATA", [1]) + end + + test "compact max generation (1)" do + db_path = setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [2, 0]) + check_files(db_path, "ATT DATA", [1, 0]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 1]) + check_files(db_path, "ATT DATA", [0, 1]) + + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 1]) + check_files(db_path, "ATT DATA", [0, 1]) + end + + test "retention in gen 0" do + db_path = setup_db(2) + + make_doc("doc-1", %{ the: ["first", "doc"] }) + make_att("doc-1", "att-1", "something") + + check_files(db_path, "first", [2, 0, 0]) + check_files(db_path, "something", [1, 0, 0]) + + gen_compact(0) + + check_files(db_path, "first", [0, 1, 0]) + check_files(db_path, "something", [0, 1, 0]) + + make_doc("doc-2", %{ a: ["second", "doc"] }) + make_att("doc-2", "att-2", "anything") + + check_files(db_path, "first", [0, 1, 0]) + check_files(db_path, "something", [0, 1, 0]) + check_files(db_path, "second", [2, 0, 0]) + check_files(db_path, "anything", [1, 0, 0]) + + gen_compact(1) + gen_compact(1) + gen_compact(1) + + check_files(db_path, "first", [0, 0, 1]) + check_files(db_path, "something", [0, 0, 1]) + check_files(db_path, "second", [1, 0, 0]) + check_files(db_path, "anything", [1, 0, 0]) + end + + test "compact final generation" do + db_path = setup_db(3) + make_doc("the-doc", %{ value: "DOC BODY" }) + + gen_compact(0) + gen_compact(1) + gen_compact(2) + + check_files(db_path, "DOC BODY", [0, 0, 0, 1]) + + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [1, 0, 0, 1]) + check_files(db_path, "ATT DATA", [1, 0, 0, 0]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 1, 0, 1]) + check_files(db_path, "ATT DATA", [0, 1, 0, 0]) + + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 1, 1]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + + gen_compact(2) + + check_files(db_path, "DOC BODY", [0, 0, 0, 2]) + check_files(db_path, "ATT DATA", [0, 0, 0, 1]) + + gen_compact(3) + + check_files(db_path, "DOC BODY", [0, 0, 0, 1]) + check_files(db_path, "ATT DATA", [0, 0, 0, 1]) + end + + test "copy compacted doc on moving attachment" do + db_path = setup_db(3) + make_doc("the-doc", %{ value: "DOC BODY" }) + + gen_compact(0) + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 1, 0]) + + make_att("the-doc", "the-att", "ATT DATA") + + check_files(db_path, "DOC BODY", [1, 0, 1, 0]) + check_files(db_path, "ATT DATA", [1, 0, 0, 0]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 1, 1, 0]) + check_files(db_path, "ATT DATA", [0, 1, 0, 0]) + + update_doc("the-doc", %{ value: "NEW TEXT" }) + + check_files(db_path, "DOC BODY", [0, 1, 1, 0]) + check_files(db_path, "ATT DATA", [0, 1, 0, 0]) + check_files(db_path, "NEW TEXT", [1, 0, 0, 0]) + + gen_compact(1) + + check_files(db_path, "DOC BODY", [0, 0, 1, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + check_files(db_path, "NEW TEXT", [1, 0, 0, 0]) + + gen_compact(0) + + check_files(db_path, "DOC BODY", [0, 0, 1, 0]) + check_files(db_path, "ATT DATA", [0, 0, 1, 0]) + check_files(db_path, "NEW TEXT", [0, 1, 0, 0]) + + gen_compact(2) + + check_files(db_path, "DOC BODY", [0, 0, 0, 0]) + check_files(db_path, "ATT DATA", [0, 0, 0, 1]) + check_files(db_path, "NEW TEXT", [0, 2, 0, 0]) + end + + test "compact with partition limit" do + Couch.put("/_node/_local/_config/couchdb/max_partition_size", body: "\"10240\"") + + Couch.delete("/#{@db_name}") + resp = Couch.put("/#{@db_name}?partitioned=true&q=1&gen=2") + assert resp.status_code == 201 + + docs = for x <- 1..15, do: %{ _id: "foo:#{x}", value: String.pad_leading("", 1024, "0") } + resp = Couch.post("/#{@db_name}/_bulk_docs", body: %{ docs: docs }) + assert resp.status_code == 201 + + gen_compact(0) + + resp = Couch.post("/#{@db_name}/_bulk_docs", body: %{ docs: [%{_id: "foo:bar"}, %{_id: "baz:bang"}] }) + assert resp.status_code == 201 + + resp = Couch.get("/#{@db_name}/_all_docs") + ids = for doc <- resp.body["rows"], do: doc["id"] + + assert ids == ["baz:bang", "foo:1", "foo:10", "foo:11", "foo:12", "foo:13", + "foo:14", "foo:15", "foo:2", "foo:3", "foo:4", "foo:5", "foo:6", "foo:7", + "foo:8", "foo:9"] + end + + test "transfer space used from gen 0 to gen 1" do + setup_db(1) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", String.pad_leading("", 4096, "0")) + + %{ "file" => f, "external" => e, "active" => a } = Couch.get("/#{@db_name}").body["sizes"] + + assert f > 0 + assert e > 0 + assert a > 0 + + gen_compact(0) + + [ + %{ "file" => f0, "external" => e0, "active" => a0 }, + %{ "file" => f1, "external" => e1, "active" => a1 } + ] = Couch.get("/#{@db_name}").body["sizes"] + + assert f0 < f + assert e0 < e + assert a0 < a + + assert f1 > 0 + assert e1 > 0 + assert a1 > 0 + end + + test "transfer space used from gen 1 to gen 2" do + setup_db(2) + + make_doc("the-doc", %{ value: "DOC BODY" }) + make_att("the-doc", "the-att", String.pad_leading("", 4096, "0")) + + gen_compact(0) + + [_, %{ "file" => f, "external" => e, "active" => a }] = Couch.get("/#{@db_name}").body["sizes"] + + assert f > 0 + assert e > 0 + assert a > 0 + + gen_compact(1) + + [ + _, + %{ "file" => f1, "external" => e1, "active" => a1 }, + %{ "file" => f2, "external" => e2, "active" => a2 } + ] = Couch.get("/#{@db_name}").body["sizes"] + + assert f1 == 0 + assert e1 == 0 + assert a1 == 0 + + assert f2 > 0 + assert e2 > 0 + assert a2 > 0 + end + + defp setup_db(max_gen) do + Couch.delete("/#{@db_name}") + resp = Couch.put("/#{@db_name}?q=1&gen=#{max_gen}") + assert resp.status_code == 201 + { + Couch.get("/_node/_local/_config/couchdb").body["database_dir"], + Couch.get("/#{@db_name}").body["instance_start_time"] + } + end + + defp make_doc(doc_id, doc) do + resp = Couch.put("/#{@db_name}/#{doc_id}", body: doc) + assert resp.status_code == 201 + end + + defp update_doc(doc_id, new_doc) do + old_doc = Couch.get("/#{@db_name}/#{doc_id}").body + resp = Couch.put("/#{@db_name}/#{doc_id}", body: Map.merge(old_doc, new_doc)) + assert resp.status_code == 201 + end + + defp make_att(doc_id, att_id, data) do + rev = Couch.get("/#{@db_name}/#{doc_id}").body["_rev"] + headers = ["Content-Type": "application/octet-stream"] + resp = Couch.put("/#{@db_name}/#{doc_id}/#{att_id}?rev=#{rev}", headers: headers, body: data) + assert resp.status_code == 201 + end + + defp gen_compact(gen) do + compact(@db_name, gen) + end + + defp check_files(dir, needle, counts) do + check_files(dir, needle, counts, 0) + end + + defp check_files(_dir, _needle, [], _gen) do + :ok + end + + defp check_files(dir, needle, [count | rest], gen) do + check_file(dir, gen, needle, count) + check_files(dir, needle, rest, gen + 1) + end + + defp check_file(dir, gen, needle, expect_count) do + path = db_path(dir, gen) + {:ok, file_data} = File.read(path) + chunks = file_data |> String.split(needle) |> length() + + assert chunks - 1 == expect_count, + "in file '#{path}', expected '#{needle}' to appear #{expect_count} times, but it was found #{chunks - 1} times" + end + + defp db_path({data_dir, suffix}, gen) do + Path.join([data_dir, "shards", "00000000-ffffffff", db_file(suffix, gen)]) + end + + defp db_file(suffix, 0) do + "#{@db_name}.#{suffix}.couch" + end + + defp db_file(suffix, gen) do + "#{@db_name}.#{suffix}.#{gen}.couch" + end +end diff --git a/test/elixir/test/config/suite.elixir b/test/elixir/test/config/suite.elixir index 1d1e6059a36..691c19c3e46 100644 --- a/test/elixir/test/config/suite.elixir +++ b/test/elixir/test/config/suite.elixir @@ -139,6 +139,27 @@ "CompactTest": [ "compaction reduces size of deleted docs" ], + "CompactGenerationTest": [ + "increase the max generation", + "promote docs and atts into gen-1", + "read doc and attachment after compaction", + "read doc after open file timeout", + "add an attachment to a compacted doc", + "compact an attachment added to a compacted doc", + "compact a doc and att an additional generation", + "update a compacted doc", + "compact an updated doc", + "compact, update and compact a doc", + "compact, update then compact twice", + "compact through multiple generations", + "compact max generation (0)", + "compact max generation (1)", + "retention in gen 0", + "compact final generation", + "copy compacted doc on moving attachment", + "compact with partition limit", + "transfer space used from gen 0 to gen 1" + ], "ConfigTest": [ "Atoms, binaries, and strings suffice as whitelist sections and keys.", "Blacklist is functional",