Skip to content

Commit 64b5a6f

Browse files
Gang Limingzym
authored andcommitted
TS-2275: fix interim cache lossing data if the server process crash
we have disable the permanent storage on the interim cache device due to consistence. that is why I point out this is #1 problem of the current implement in https://blog.zymlinux.net/index.php/archives/555 as: loss data if the server process crash after this patch, we can declare the interim cache stable
1 parent 40deb91 commit 64b5a6f

File tree

6 files changed

+361
-31
lines changed

6 files changed

+361
-31
lines changed

CHANGES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
-*- coding: utf-8 -*-
22
Changes with Apache Traffic Server 4.1.0
33

4+
*) [TS-2275] fix interim cache lossing data if the server process crash
5+
Author: Gang Li.
6+
47
*) [TS-2291] Add remap_stats plugin to experimental.
58

69
*) [TS-2242] Update core plugins' support_email and vendor_name for

iocore/cache/Cache.cc

Lines changed: 267 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,24 @@ vol_init_dir(Vol *d)
11631163
}
11641164
}
11651165

1166+
#if TS_USE_INTERIM_CACHE == 1
1167+
void
1168+
interimvol_clear_init(InterimCacheVol *d)
1169+
{
1170+
memset(d->header, 0, sizeof(InterimVolHeaderFooter));
1171+
d->header->magic = VOL_MAGIC;
1172+
d->header->version.ink_major = CACHE_DB_MAJOR_VERSION;
1173+
d->header->version.ink_minor = CACHE_DB_MINOR_VERSION;
1174+
d->header->agg_pos = d->header->write_pos = d->start;
1175+
d->header->last_write_pos = d->header->write_pos;
1176+
d->header->phase = 0;
1177+
d->header->cycle = 0;
1178+
d->header->create_time = time(NULL);
1179+
d->header->dirty = 0;
1180+
d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
1181+
}
1182+
#endif
1183+
11661184
void
11671185
vol_clear_init(Vol *d)
11681186
{
@@ -1180,6 +1198,12 @@ vol_clear_init(Vol *d)
11801198
d->header->dirty = 0;
11811199
d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
11821200
*d->footer = *d->header;
1201+
1202+
#if TS_USE_INTERIM_CACHE == 1
1203+
for (int i = 0; i < d->num_interim_vols; i++) {
1204+
interimvol_clear_init(&(d->interim_vols[i]));
1205+
}
1206+
#endif
11831207
}
11841208

11851209
int
@@ -1251,11 +1275,6 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
12511275
header = (VolHeaderFooter *) raw_dir;
12521276
footer = (VolHeaderFooter *) (raw_dir + vol_dirlen(this) - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
12531277

1254-
if (clear) {
1255-
Note("clearing cache directory '%s'", hash_id);
1256-
return clear_dir();
1257-
}
1258-
12591278
#if TS_USE_INTERIM_CACHE == 1
12601279
num_interim_vols = good_interim_disks;
12611280
ink_assert(num_interim_vols >= 0 && num_interim_vols <= 8);
@@ -1264,11 +1283,16 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
12641283
off_t vlen = off_t (r * g_interim_disks[i]->len * STORE_BLOCK_SIZE);
12651284
vlen = (vlen / STORE_BLOCK_SIZE) * STORE_BLOCK_SIZE;
12661285
off_t start = ink_atomic_increment(&g_interim_disks[i]->skip, vlen);
1267-
interim_vols[i].init(start, vlen, g_interim_disks[i], this);
1286+
interim_vols[i].init(start, vlen, g_interim_disks[i], this, &(this->header->interim_header[i]));
12681287
ink_assert(interim_vols[i].start + interim_vols[i].len <= g_interim_disks[i]->len * STORE_BLOCK_SIZE);
12691288
}
12701289
#endif
12711290

1291+
if (clear) {
1292+
Note("clearing cache directory '%s'", hash_id);
1293+
return clear_dir();
1294+
}
1295+
12721296
init_info = new VolInitInfo();
12731297
int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
12741298
off_t footer_offset = vol_dirlen(this) - footerlen;
@@ -1349,11 +1373,30 @@ Vol::handle_dir_read(int event, void *data)
13491373
return EVENT_DONE;
13501374
}
13511375
CHECK_DIR(this);
1376+
1377+
sector_size = header->sector_size;
1378+
13521379
#if TS_USE_INTERIM_CACHE == 1
1353-
if (gn_interim_disks > 0)
1354-
clear_interim_dir(this);
1380+
if (num_interim_vols > 0) {
1381+
interim_done = 0;
1382+
for (int i = 0; i < num_interim_vols; i++) {
1383+
interim_vols[i].recover_data();
1384+
}
1385+
} else {
13551386
#endif
1356-
sector_size = header->sector_size;
1387+
1388+
return this->recover_data();
1389+
1390+
#if TS_USE_INTERIM_CACHE == 1
1391+
}
1392+
#endif
1393+
1394+
return EVENT_CONT;
1395+
}
1396+
1397+
int
1398+
Vol::recover_data()
1399+
{
13571400
SET_HANDLER(&Vol::handle_recover_from_data);
13581401
return handle_recover_from_data(EVENT_IMMEDIATE, 0);
13591402
}
@@ -1732,6 +1775,221 @@ Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */ )
17321775
}
17331776
}
17341777

1778+
#if TS_USE_INTERIM_CACHE == 1
1779+
int
1780+
InterimCacheVol::recover_data()
1781+
{
1782+
io.aiocb.aio_fildes = fd;
1783+
io.action = this;
1784+
io.thread = AIO_CALLBACK_THREAD_ANY;
1785+
io.then = 0;
1786+
1787+
SET_HANDLER(&InterimCacheVol::handle_recover_from_data);
1788+
return handle_recover_from_data(EVENT_IMMEDIATE, 0);
1789+
}
1790+
1791+
int
1792+
InterimCacheVol::handle_recover_from_data(int event, void *data)
1793+
{
1794+
(void)data;
1795+
uint32_t got_len = 0;
1796+
uint32_t max_sync_serial = header->sync_serial;
1797+
char *s, *e;
1798+
int ndone, offset;
1799+
1800+
if (event == EVENT_IMMEDIATE) {
1801+
if (header->magic != VOL_MAGIC || header->version.ink_major != CACHE_DB_MAJOR_VERSION) {
1802+
Warning("bad header in cache directory for '%s', clearing", hash_id);
1803+
goto Lclear;
1804+
} else if (header->sync_serial == 0) {
1805+
io.aiocb.aio_buf = NULL;
1806+
goto Lfinish;
1807+
}
1808+
1809+
// initialize
1810+
recover_wrapped = 0;
1811+
last_sync_serial = 0;
1812+
last_write_serial = 0;
1813+
recover_pos = header->last_write_pos;
1814+
if (recover_pos >= skip + len) {
1815+
recover_wrapped = 1;
1816+
recover_pos = start;
1817+
}
1818+
1819+
io.aiocb.aio_buf = (char *)ats_memalign(sysconf(_SC_PAGESIZE), RECOVERY_SIZE);
1820+
io.aiocb.aio_nbytes = RECOVERY_SIZE;
1821+
if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
1822+
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1823+
1824+
} else if (event == AIO_EVENT_DONE) {
1825+
if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) {
1826+
Warning("disk read error on recover '%s', clearing", hash_id);
1827+
goto Lclear;
1828+
}
1829+
1830+
if (io.aiocb.aio_offset == header->last_write_pos) {
1831+
uint32_t to_check = header->write_pos - header->last_write_pos;
1832+
ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
1833+
uint32_t done = 0;
1834+
s = (char *) io.aiocb.aio_buf;
1835+
while (done < to_check) {
1836+
Doc *doc = (Doc *) (s + done);
1837+
if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
1838+
Warning("no valid directory found while recovering '%s', clearing", hash_id);
1839+
goto Lclear;
1840+
}
1841+
done += round_to_approx_size(doc->len);
1842+
if (doc->sync_serial > last_write_serial)
1843+
last_sync_serial = doc->sync_serial;
1844+
}
1845+
ink_assert(done == to_check);
1846+
1847+
got_len = io.aiocb.aio_nbytes - done;
1848+
recover_pos += io.aiocb.aio_nbytes;
1849+
s = (char *) io.aiocb.aio_buf + done;
1850+
e = s + got_len;
1851+
} else {
1852+
got_len = io.aiocb.aio_nbytes;
1853+
recover_pos += io.aiocb.aio_nbytes;
1854+
s = (char *) io.aiocb.aio_buf;
1855+
e = s + got_len;
1856+
}
1857+
}
1858+
1859+
// examine what we got
1860+
if (got_len) {
1861+
1862+
Doc *doc = NULL;
1863+
1864+
if (recover_wrapped && start == io.aiocb.aio_offset) {
1865+
doc = (Doc *) s;
1866+
if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
1867+
recover_pos = skip + len - EVACUATION_SIZE;
1868+
goto Ldone;
1869+
}
1870+
}
1871+
1872+
while (s < e) {
1873+
doc = (Doc *) s;
1874+
1875+
if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
1876+
1877+
if (doc->magic == DOC_MAGIC) {
1878+
if (doc->sync_serial > header->sync_serial)
1879+
max_sync_serial = doc->sync_serial;
1880+
1881+
if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
1882+
last_sync_serial = doc->sync_serial;
1883+
s += round_to_approx_size(doc->len);
1884+
continue;
1885+
1886+
} else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
1887+
recover_wrapped = 1;
1888+
recover_pos = start;
1889+
io.aiocb.aio_nbytes = RECOVERY_SIZE;
1890+
break;
1891+
}
1892+
1893+
recover_pos -= e - s;
1894+
goto Ldone;
1895+
1896+
} else {
1897+
recover_pos -= e - s;
1898+
if (recover_pos > (skip + len) - AGG_SIZE) {
1899+
recover_wrapped = 1;
1900+
recover_pos = start;
1901+
io.aiocb.aio_nbytes = RECOVERY_SIZE;
1902+
break;
1903+
}
1904+
1905+
goto Ldone;
1906+
}
1907+
}
1908+
1909+
last_write_serial = doc->write_serial;
1910+
s += round_to_approx_size(doc->len);
1911+
}
1912+
1913+
if (s >= e) {
1914+
1915+
if (s > e)
1916+
s -= round_to_approx_size(doc->len);
1917+
1918+
recover_pos -= e - s;
1919+
if (recover_pos >= skip + len)
1920+
recover_pos = start;
1921+
1922+
io.aiocb.aio_nbytes = RECOVERY_SIZE;
1923+
if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
1924+
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
1925+
}
1926+
}
1927+
1928+
if (recover_pos == prev_recover_pos)
1929+
goto Lclear;
1930+
1931+
prev_recover_pos = recover_pos;
1932+
io.aiocb.aio_offset = recover_pos;
1933+
ink_assert(ink_aio_read(&io));
1934+
return EVENT_CONT;
1935+
1936+
Ldone: {
1937+
1938+
if (recover_pos == header->write_pos && recover_wrapped) {
1939+
goto Lfinish;
1940+
}
1941+
1942+
recover_pos += EVACUATION_SIZE;
1943+
if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
1944+
Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
1945+
Warning("no valid directory found while recovering '%s', clearing", hash_id);
1946+
goto Lclear;
1947+
}
1948+
1949+
if (recover_pos > skip + len)
1950+
recover_pos -= skip + len;
1951+
1952+
uint32_t next_sync_serial = max_sync_serial + 1;
1953+
if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
1954+
next_sync_serial++;
1955+
1956+
off_t clear_start = offset_to_vol_offset(this, header->write_pos);
1957+
off_t clear_end = offset_to_vol_offset(this, recover_pos);
1958+
1959+
if (clear_start <= clear_end)
1960+
dir_clean_range_interimvol(clear_start, clear_end, this);
1961+
else {
1962+
dir_clean_range_interimvol(clear_end, DIR_OFFSET_MAX, this);
1963+
dir_clean_range_interimvol(1, clear_start, this);
1964+
}
1965+
1966+
header->sync_serial = next_sync_serial;
1967+
1968+
goto Lfinish;
1969+
}
1970+
1971+
Lclear:
1972+
1973+
interimvol_clear_init(this);
1974+
offset = this - vol->interim_vols;
1975+
clear_interimvol_dir(vol, offset); // remove this interimvol dir
1976+
1977+
Lfinish:
1978+
1979+
free((char*)io.aiocb.aio_buf);
1980+
io.aiocb.aio_buf = NULL;
1981+
1982+
set_io_not_in_progress();
1983+
1984+
ndone = ink_atomic_increment(&vol->interim_done, 1);
1985+
if (ndone == vol->num_interim_vols - 1) { // all interim finished
1986+
return vol->recover_data();
1987+
}
1988+
1989+
return EVENT_CONT;
1990+
}
1991+
#endif
1992+
17351993
// explicit pair for random table in build_vol_hash_table
17361994
struct rtable_pair {
17371995
unsigned int rval;

0 commit comments

Comments
 (0)