Skip to content

Commit def4456

Browse files
nshysergepetrenko
authored andcommitted
box: handle cancelling fiber waiting in WAL queue
Currently if fiber waiting in WAL queue is cancelled it will go on writing request to WAL breaking queue order. So WAL is corrupted so that even next Tarantool restart may fail. Unfortunately we cancel fibers on Tarantool shutdown so the issue may arise just buy terminating Tarantool instance. Let's instead fail the cancelled request and all the newer request in the queue. Work around is to disable WAL queue by `box.cfg{wal_queue_max_size = 0}` so that only one write request can be in the queue. Closes tarantool#11078 NO_DOC=bugfix
1 parent 413e4e2 commit def4456

File tree

6 files changed

+158
-47
lines changed

6 files changed

+158
-47
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## bugfix/box
2+
3+
* Fixed a bug when cancelling a fiber waiting in WAL queue corrupted the
4+
the WAL (gh-11078).

src/box/journal.c

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ struct journal *current_journal = NULL;
4040
struct journal_queue journal_queue = {
4141
.max_size = 16 * 1024 * 1024, /* 16 megabytes */
4242
.size = 0,
43-
.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
44-
.waiter_count = 0,
43+
.requests = STAILQ_INITIALIZER(journal_queue.requests),
4544
};
4645

4746
void
@@ -94,36 +93,73 @@ journal_entry_fiber_wakeup_cb(struct journal_entry *entry)
9493
void
9594
journal_queue_wakeup(void)
9695
{
97-
struct rlist *list = &journal_queue.waiters;
98-
if (!rlist_empty(list) && !journal_queue_is_full())
99-
fiber_wakeup(rlist_first_entry(list, struct fiber, state));
96+
if (!stailq_empty(&journal_queue.requests) &&
97+
!journal_queue_is_full()) {
98+
struct journal_entry *req =
99+
stailq_first_entry(&journal_queue.requests,
100+
typeof(*req), fifo);
101+
fiber_wakeup(req->fiber);
102+
}
100103
}
101104

102-
void
103-
journal_queue_wait(void)
105+
int
106+
journal_queue_wait(struct journal_entry *entry)
104107
{
105-
if (!journal_queue_is_full() && !journal_queue_has_waiters())
106-
return;
107-
++journal_queue.waiter_count;
108-
rlist_add_tail_entry(&journal_queue.waiters, fiber(), state);
109-
/*
110-
* Is woken up when this position in the queue should go into the next
111-
* journal batch.
112-
*/
108+
if (!journal_queue_is_full() &&
109+
stailq_empty(&journal_queue.requests))
110+
return 0;
111+
int rc = -1;
112+
struct journal_entry *prev_entry =
113+
stailq_last_entry(&journal_queue.requests,
114+
typeof(*prev_entry), fifo);
115+
stailq_add_tail_entry(&journal_queue.requests, entry, fifo);
116+
assert(entry->fiber == NULL);
117+
entry->fiber = fiber();
113118
fiber_yield();
114-
--journal_queue.waiter_count;
115-
journal_queue_wakeup();
119+
if (entry->is_complete) {
120+
/* Already rolled back on cascade rollback. */
121+
diag_set_journal_res(entry->res);
122+
} else if (fiber_is_cancelled()) {
123+
struct stailq rollback;
124+
stailq_cut_tail(&journal_queue.requests, &prev_entry->fifo,
125+
&rollback);
126+
/* Pop this request. */
127+
VERIFY(stailq_shift_entry(&rollback,
128+
typeof(*entry), fifo) == entry);
129+
stailq_reverse(&rollback);
130+
/* Cascade rollback of newer requests. */
131+
struct journal_entry *req;
132+
stailq_foreach_entry(req, &rollback, fifo) {
133+
req->res = JOURNAL_ENTRY_ERR_CASCADE;
134+
req->is_complete = true;
135+
req->write_async_cb(req);
136+
}
137+
/* Rollback this request. */
138+
entry->res = JOURNAL_ENTRY_ERR_CANCELLED;
139+
entry->is_complete = true;
140+
entry->write_async_cb(entry);
141+
diag_set(FiberIsCancelled);
142+
} else {
143+
/* There is a space in queue to handle this request. */
144+
VERIFY(stailq_shift_entry(&journal_queue.requests,
145+
typeof(*entry), fifo) == entry);
146+
journal_queue_wakeup();
147+
rc = 0;
148+
}
149+
entry->fiber = NULL;
150+
return rc;
116151
}
117152

118153
void
119154
journal_queue_flush(void)
120155
{
121-
if (!journal_queue_has_waiters())
156+
if (stailq_empty(&journal_queue.requests))
122157
return;
123-
struct rlist *list = &journal_queue.waiters;
124-
while (!rlist_empty(list))
125-
fiber_wakeup(rlist_first_entry(list, struct fiber, state));
126-
journal_queue_wait();
158+
struct journal_entry *req;
159+
stailq_foreach_entry(req, &journal_queue.requests, fifo)
160+
fiber_wakeup(req->fiber);
161+
/* Schedule after all fibers waiting in journal queue. */
162+
fiber_sleep(0);
127163
}
128164

129165
int

src/box/journal.h

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ enum {
5454
* entry.
5555
*/
5656
JOURNAL_ENTRY_ERR_CASCADE = -3,
57+
/** Rollback due to fiber waiting for WAL is cancelled. */
58+
JOURNAL_ENTRY_ERR_CANCELLED = -4,
5759
/**
5860
* Anchor for the structs built on top of journal entry so as they
5961
* could introduce their own unique errors. Set to a big value in
@@ -106,6 +108,10 @@ struct journal_entry {
106108
* journal entry is completed.
107109
*/
108110
bool is_complete;
111+
/**
112+
* Fiber which put the request in journal queue.
113+
*/
114+
struct fiber *fiber;
109115
/**
110116
* The number of rows in the request.
111117
*/
@@ -134,6 +140,7 @@ journal_entry_create(struct journal_entry *entry, size_t n_rows,
134140
entry->res = JOURNAL_ENTRY_ERR_UNKNOWN;
135141
entry->flags = 0;
136142
entry->is_complete = false;
143+
entry->fiber = NULL;
137144
}
138145

139146
/**
@@ -158,14 +165,8 @@ struct journal_queue {
158165
int64_t max_size;
159166
/** Current approximate size of journal queue. */
160167
int64_t size;
161-
/**
162-
* The fibers waiting for some space to free in journal queue.
163-
* Once some space is freed they will be waken up in the same order they
164-
* entered the queue.
165-
*/
166-
struct rlist waiters;
167-
/** How many waiters there are in a queue. */
168-
int waiter_count;
168+
/** The requests waiting in journal queue. */
169+
struct stailq requests;
169170
};
170171

171172
/** A single queue for all journal instances. */
@@ -186,16 +187,6 @@ struct journal {
186187
void
187188
journal_queue_wakeup(void);
188189

189-
/**
190-
* Check whether anyone is waiting for the journal queue to empty. If there are
191-
* other waiters we must go after them to preserve write order.
192-
*/
193-
static inline bool
194-
journal_queue_has_waiters(void)
195-
{
196-
return journal_queue.waiter_count != 0;
197-
}
198-
199190
/**
200191
* Check whether any of the queue size limits is reached.
201192
* If the queue is full, we must wait for some of the entries to be written
@@ -208,10 +199,10 @@ journal_queue_is_full(void)
208199
}
209200

210201
/** Yield until there's some space in the journal queue. */
211-
void
212-
journal_queue_wait(void);
202+
int
203+
journal_queue_wait(struct journal_entry *entry);
213204

214-
/** Empty the queue by waking everyone in it up and put self to queue tail. */
205+
/** Flush journal queue. Next wal_sync() will sync flushed requests. */
215206
void
216207
journal_queue_flush(void);
217208

@@ -271,7 +262,8 @@ journal_write_row(struct xrow_header *row);
271262
static inline int
272263
journal_write_submit(struct journal_entry *entry)
273264
{
274-
journal_queue_wait();
265+
if (journal_queue_wait(entry) != 0)
266+
return -1;
275267
/*
276268
* We cannot account entry after write. If journal is synchronous
277269
* the journal_queue_on_complete() is called in write_async().

src/box/txn.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,10 +1179,8 @@ txn_commit_impl(struct txn *txn, enum txn_commit_wait_mode wait_mode)
11791179
goto rollback_abort;
11801180
}
11811181
fiber_set_txn(fiber(), NULL);
1182-
if (journal_write_submit(req) != 0) {
1183-
fiber_set_txn(fiber(), txn);
1182+
if (journal_write_submit(req) != 0)
11841183
goto rollback_io;
1185-
}
11861184
if (wait_mode != TXN_COMMIT_WAIT_MODE_COMPLETE) {
11871185
if (txn_has_flag(txn, TXN_IS_DONE))
11881186
goto finish_done;

src/box/txn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ enum {
144144
TXN_SIGNATURE_UNKNOWN = JOURNAL_ENTRY_ERR_UNKNOWN,
145145
TXN_SIGNATURE_IO = JOURNAL_ENTRY_ERR_IO,
146146
TXN_SIGNATURE_CASCADE = JOURNAL_ENTRY_ERR_CASCADE,
147+
TXN_SIGNATURE_CANCELLED = JOURNAL_ENTRY_ERR_CANCELLED,
147148
/**
148149
* The default signature value for failed transactions.
149150
* Indicates either write failure or any other failure
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
local server = require('luatest.server')
2+
local t = require('luatest')
3+
4+
local variants = {}
5+
for i = 1, 5 do
6+
table.insert(variants, {pos = i})
7+
end
8+
local g = t.group('wal', variants)
9+
10+
g.before_all(function(cg)
11+
t.tarantool.skip_if_not_debug()
12+
cg.server = server:new()
13+
cg.server:start()
14+
end)
15+
16+
g.after_all(function(cg)
17+
cg.server:drop()
18+
end)
19+
20+
g.after_each(function(cg)
21+
cg.server:exec(function()
22+
box.error.injection.set('ERRINJ_WAL_DELAY', false)
23+
box.space.test:drop()
24+
end)
25+
end)
26+
27+
g.test_wal_queue_fiber_cancel = function(cg)
28+
cg.server:exec(function(pos)
29+
local fiber = require('fiber')
30+
local s = box.schema.create_space('test')
31+
s:create_index('pk')
32+
box.cfg{wal_queue_max_size = 100}
33+
box.error.injection.set('ERRINJ_WAL_DELAY', true)
34+
fiber.create(function()
35+
box.begin()
36+
s:insert({100, string.rep('a', 1000)})
37+
s:insert({0})
38+
box.commit()
39+
end)
40+
local fibers = {}
41+
for i = 1, 5 do
42+
local f = fiber.new(function()
43+
-- Crafted to check that WAL records are in correct order
44+
-- on recovery after restart.
45+
box.begin()
46+
s:delete({i - 1})
47+
s:insert({i})
48+
box.commit()
49+
end)
50+
f:set_joinable(true)
51+
table.insert(fibers, f)
52+
fiber.yield()
53+
end
54+
fibers[pos]:cancel()
55+
box.error.injection.set('ERRINJ_WAL_DELAY', false)
56+
for i = 1, pos - 1 do
57+
t.assert_equals({fibers[i]:join()}, {true})
58+
end
59+
local ok, err = fibers[pos]:join()
60+
t.assert_not(ok)
61+
t.assert_covers(err:unpack(), {
62+
type = 'FiberIsCancelled',
63+
})
64+
for i = pos + 1, 5 do
65+
local ok, err = fibers[i]:join()
66+
t.assert_not(ok)
67+
t.assert_covers(err:unpack(), {
68+
type = 'ClientError',
69+
code = box.error.CASCADE_ROLLBACK,
70+
message = "WAL has a rollback in progress",
71+
})
72+
end
73+
t.assert_equals(s:select({100}, {iterator = 'lt'}), {{pos - 1}})
74+
end, {cg.params.pos})
75+
cg.server:restart()
76+
cg.server:exec(function(pos)
77+
local s = box.space.test
78+
t.assert_equals(s:select({100}, {iterator = 'lt'}), {{pos - 1}})
79+
end, {cg.params.pos})
80+
end

0 commit comments

Comments
 (0)