Skip to content

Commit 297f7e5

Browse files
authored
Merge pull request ceph#62469 from igomon-bloomberg/wip-timestamp-ver-epoch
rgw: implements timestamp-based epochs for versioned objects Reviewed-by: Casey Bodley <[email protected]>
2 parents 10f802a + 4195486 commit 297f7e5

File tree

9 files changed

+382
-155
lines changed

9 files changed

+382
-155
lines changed

src/cls/rgw/cls_rgw.cc

Lines changed: 169 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -321,19 +321,25 @@ static int get_obj_vals(cls_method_context_t hctx,
321321
*/
322322
static void decreasing_str(uint64_t num, string *str)
323323
{
324+
// This buffer must be big enough to hold the string representation of
325+
// the largest unsigned 64-bit integer value (+ 1 more char).
324326
char buf[32];
325327
if (num < 0x10) { /* 16 */
326-
snprintf(buf, sizeof(buf), "9%02lld", 15 - (long long)num);
328+
snprintf(buf, sizeof(buf), "9%02" PRIu64, 0xF - num);
327329
} else if (num < 0x100) { /* 256 */
328-
snprintf(buf, sizeof(buf), "8%03lld", 255 - (long long)num);
330+
snprintf(buf, sizeof(buf), "8%03" PRIu64, 0xFF - num);
329331
} else if (num < 0x1000) /* 4096 */ {
330-
snprintf(buf, sizeof(buf), "7%04lld", 4095 - (long long)num);
332+
snprintf(buf, sizeof(buf), "7%04" PRIu64, 0xFFF - num);
331333
} else if (num < 0x10000) /* 65536 */ {
332-
snprintf(buf, sizeof(buf), "6%05lld", 65535 - (long long)num);
334+
snprintf(buf, sizeof(buf), "6%05" PRIu64, 0xFFFF - num);
333335
} else if (num < 0x100000000) /* 4G */ {
334-
snprintf(buf, sizeof(buf), "5%010lld", 0xFFFFFFFF - (long long)num);
336+
snprintf(buf, sizeof(buf), "5%010" PRIu64, 0xFFFFFFFF - num);
337+
} else if (num < 0x10000000000) /* 1T */ {
338+
snprintf(buf, sizeof(buf), "4%015" PRIu64, 0xFFFFFFFFFF - num);
339+
} else if (num < 0x1000000000000) /* 281T */ {
340+
snprintf(buf, sizeof(buf), "3%018" PRIu64, 0xFFFFFFFFFFFF - num);
335341
} else {
336-
snprintf(buf, sizeof(buf), "4%020lld", (long long)-num);
342+
snprintf(buf, sizeof(buf), "2%020" PRIu64, std::numeric_limits<uint64_t>::max() - num);
337343
}
338344

339345
*str = buf;
@@ -498,11 +504,21 @@ static int decode_list_index_key(const string& index_key, cls_rgw_obj_key *key,
498504
if (val[0] == 'i') {
499505
key->instance = val.substr(1);
500506
} else if (val[0] == 'v') {
507+
// what we are dealing here with is the string representation of the versioned epoch (as converted to by
508+
// decreasing_str() func); the first char is always 'v' to indicate that it is the versioned epoch; the
509+
// second char is a digit in [9-2] range that is used to separate value ranges - in order to make
510+
// string representation sort in the opposite direction and to decrease string length - to speed up
511+
// the lexicographical comparison; hence +2 (1 for the value indicator and one for the range prefix);
501512
string err;
502-
const char *s = val.c_str() + 1;
503-
*ver = strict_strtoll(s, 10, &err);
504-
if (!err.empty()) {
505-
CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
513+
if (val.size() > 2) {
514+
const char *s = val.c_str() + 2;
515+
*ver = strict_strtoull(s, 10, &err);
516+
if (!err.empty()) {
517+
CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
518+
return -EIO;
519+
}
520+
} else {
521+
CLS_LOG(0, "ERROR: %s: bad index_key (%s): empty val", __func__, escape_str(index_key).c_str());
506522
return -EIO;
507523
}
508524
}
@@ -1627,19 +1643,20 @@ class BIOLHEntry {
16271643
return 0;
16281644
}
16291645

1630-
bool start_modify(uint64_t candidate_epoch) {
1631-
if (candidate_epoch) {
1632-
if (candidate_epoch < olh_data_entry.epoch) {
1633-
return false; /* olh cannot be modified, old epoch */
1634-
}
1635-
olh_data_entry.epoch = candidate_epoch;
1636-
} else {
1637-
if (olh_data_entry.epoch == 0) {
1638-
olh_data_entry.epoch = 2; /* versioned epoch should start with 2, 1 is reserved to converted plain entries */
1639-
} else {
1640-
olh_data_entry.epoch++;
1641-
}
1646+
/**
1647+
* This is called when a new instance of an object (in a versioned bucket) is added (via PUT) or an existing instance is removed.
1648+
* A part of that process is to update the OLH entry (in the bucket index) with the correct modification timestamp (epoch).
1649+
* This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before*
1650+
* the latest op that updated the OLH entry.
1651+
* @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
1652+
*/
1653+
bool start_modify (uint64_t candidate_epoch) {
1654+
// only update the olh.epoch if it is newer than the current one.
1655+
if (candidate_epoch < olh_data_entry.epoch) {
1656+
return false; /* olh cannot be modified, old epoch */
16421657
}
1658+
1659+
olh_data_entry.epoch = candidate_epoch;
16431660
return true;
16441661
}
16451662

@@ -1889,81 +1906,95 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
18891906

18901907
const uint64_t prev_epoch = olh.get_epoch();
18911908

1892-
if (!olh.start_modify(op.olh_epoch)) {
1893-
ret = obj.write(op.olh_epoch, false, header);
1894-
if (ret < 0) {
1895-
return ret;
1896-
}
1897-
if (removing) {
1898-
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
1899-
}
1900-
return write_header_while_logrecord(hctx, header);
1901-
}
1902-
1903-
// promote this version to current if it's a newer epoch, or if it matches the
1904-
// current epoch and sorts after the current instance
1905-
const bool promote = (olh.get_epoch() > prev_epoch) ||
1906-
(olh.get_epoch() == prev_epoch &&
1907-
olh.get_entry().key.instance >= op.key.instance);
1909+
// op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
1910+
uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
1911+
duration_cast<std::chrono::nanoseconds>(obj.mtime().time_since_epoch()).count();
1912+
if (olh.start_modify(candidate_epoch)) {
1913+
// promote this version to current if it's a newer epoch, or if it matches the
1914+
// current epoch and sorts after the current instance
1915+
const bool promote = (olh.get_epoch() > prev_epoch) ||
1916+
(olh.get_epoch() == prev_epoch &&
1917+
olh.get_entry().key.instance >= op.key.instance);
1918+
const bool epoch_collision = olh.get_epoch() == prev_epoch;
1919+
1920+
if (olh_found) {
1921+
const string &olh_tag = olh.get_tag();
1922+
if (op.olh_tag != olh_tag) {
1923+
if (!olh.pending_removal()) {
1924+
CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
1925+
return -ECANCELED;
1926+
}
1927+
/* if pending removal, this is a new olh instance */
1928+
olh.set_tag(op.olh_tag);
1929+
}
1930+
if (epoch_collision) {
1931+
auto const &s_key = op.key.to_string();
1932+
CLS_LOG(1, "NOTICE: versioned epoch collision (%lu) for object %s", prev_epoch, s_key.c_str());
1933+
}
1934+
if (promote && olh.exists()) {
1935+
rgw_bucket_olh_entry &olh_entry = olh.get_entry();
1936+
/* found olh, previous instance is no longer the latest, need to update */
1937+
if (!(olh_entry.key == op.key)) {
1938+
BIVerObjEntry old_obj(hctx, olh_entry.key);
19081939

1909-
if (olh_found) {
1910-
const string& olh_tag = olh.get_tag();
1911-
if (op.olh_tag != olh_tag) {
1912-
if (!olh.pending_removal()) {
1913-
CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
1914-
return -ECANCELED;
1940+
ret = old_obj.demote_current(header);
1941+
if (ret < 0) {
1942+
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
1943+
return ret;
1944+
}
1945+
}
1946+
}
1947+
olh.set_pending_removal(false);
1948+
} else {
1949+
bool instance_only = (op.key.instance.empty() && op.delete_marker);
1950+
cls_rgw_obj_key key(op.key.name);
1951+
ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
1952+
if (ret < 0) {
1953+
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
1954+
return ret;
19151955
}
1916-
/* if pending removal, this is a new olh instance */
19171956
olh.set_tag(op.olh_tag);
1957+
if (op.key.instance.empty()) {
1958+
obj.set_epoch(1);
1959+
}
19181960
}
1919-
if (promote && olh.exists()) {
1920-
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
1921-
/* found olh, previous instance is no longer the latest, need to update */
1922-
if (!(olh_entry.key == op.key)) {
1923-
BIVerObjEntry old_obj(hctx, olh_entry.key);
19241961

1925-
ret = old_obj.demote_current(header);
1926-
if (ret < 0) {
1927-
CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
1928-
return ret;
1929-
}
1930-
}
1962+
/* update the olh log */
1963+
olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
1964+
if (removing) {
1965+
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
19311966
}
1932-
olh.set_pending_removal(false);
1933-
} else {
1934-
bool instance_only = (op.key.instance.empty() && op.delete_marker);
1935-
cls_rgw_obj_key key(op.key.name);
1936-
ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
1967+
1968+
if (promote) {
1969+
olh.update(op.key, op.delete_marker);
1970+
}
1971+
olh.set_exists(true);
1972+
1973+
/* write the instance and list entries */
1974+
ret = obj.write(olh.get_epoch(), promote, header);
19371975
if (ret < 0) {
1938-
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
19391976
return ret;
19401977
}
1941-
olh.set_tag(op.olh_tag);
1942-
if (op.key.instance.empty()){
1943-
obj.set_epoch(1);
1944-
}
1945-
}
19461978

1947-
/* update the olh log */
1948-
olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
1949-
if (removing) {
1950-
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
1979+
ret = olh.write(header);
19511980
}
1981+
else {
1982+
ret = obj.write(candidate_epoch, false, header);
1983+
if (ret < 0) {
1984+
return ret;
1985+
}
19521986

1953-
if (promote) {
1954-
olh.update(op.key, op.delete_marker);
1955-
}
1956-
olh.set_exists(true);
1987+
// no point here in adding CLS_RGW_OLH_OP_LINK_OLH to the pending log as we know that
1988+
// the epoch is already stale compared to the current - so no point in applying it;
19571989

1958-
ret = olh.write(header);
1959-
if (ret < 0) {
1960-
CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
1961-
return ret;
1990+
if (removing) {
1991+
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
1992+
ret = olh.write(header);
1993+
}
19621994
}
19631995

1964-
/* write the instance and list entries */
1965-
ret = obj.write(olh.get_epoch(), promote, header);
19661996
if (ret < 0) {
1997+
CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
19671998
return ret;
19681999
}
19692000

@@ -1978,7 +2009,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
19782009
rgw_bucket_dir_entry& entry = obj.get_dir_entry();
19792010

19802011
rgw_bucket_entry_ver ver;
1981-
ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
2012+
ver.epoch = candidate_epoch;
19822013

19832014
string *powner = NULL;
19842015
string *powner_display_name = NULL;
@@ -2061,73 +2092,76 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
20612092
obj.set_epoch(1);
20622093
}
20632094

2064-
if (!olh.start_modify(op.olh_epoch)) {
2065-
ret = obj.unlink_list_entry(header);
2066-
if (ret < 0) {
2067-
return ret;
2068-
}
2069-
2070-
if (obj.is_delete_marker()) {
2071-
return 0;
2072-
}
2095+
// op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
2096+
uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
2097+
duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
2098+
if (olh.start_modify(candidate_epoch)) {
2099+
rgw_bucket_olh_entry &olh_entry = olh.get_entry();
2100+
cls_rgw_obj_key &olh_key = olh_entry.key;
2101+
CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
2102+
olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
2103+
2104+
if (olh_key == dest_key) {
2105+
/* this is the current head, need to update the OLH! */
2106+
cls_rgw_obj_key next_key;
2107+
bool found = false;
2108+
ret = obj.find_next_key(&next_key, &found);
2109+
if (ret < 0) {
2110+
CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
2111+
return ret;
2112+
}
20732113

2074-
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
2075-
return olh.write(header);
2076-
}
2114+
if (found) {
2115+
BIVerObjEntry next(hctx, next_key);
2116+
ret = next.write(olh.get_epoch(), true, header);
2117+
if (ret < 0) {
2118+
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
2119+
return ret;
2120+
}
20772121

2078-
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
2079-
cls_rgw_obj_key& olh_key = olh_entry.key;
2080-
CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
2081-
olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
2122+
CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
2123+
next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker());
20822124

2083-
if (olh_key == dest_key) {
2084-
/* this is the current head, need to update the OLH! */
2085-
cls_rgw_obj_key next_key;
2086-
bool found = false;
2087-
ret = obj.find_next_key(&next_key, &found);
2088-
if (ret < 0) {
2089-
CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
2090-
return ret;
2125+
olh.update(next_key, next.is_delete_marker());
2126+
olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
2127+
} else {
2128+
// next_key is empty, but we need to preserve its name in case this entry
2129+
// gets resharded, because this key is used for hash placement
2130+
next_key.name = dest_key.name;
2131+
olh.update(next_key, false);
2132+
olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
2133+
olh.set_exists(false);
2134+
olh.set_pending_removal(true);
2135+
}
20912136
}
20922137

2093-
if (found) {
2094-
BIVerObjEntry next(hctx, next_key);
2095-
ret = next.write(olh.get_epoch(), true, header);
2138+
if (!obj.is_delete_marker()) {
2139+
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
2140+
} else {
2141+
/* this is a delete marker, it's our responsibility to remove its
2142+
* instance entry */
2143+
ret = obj.unlink(header, op.key);
20962144
if (ret < 0) {
2097-
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
20982145
return ret;
20992146
}
2100-
2101-
CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
2102-
next_key.name.c_str(), next_key.instance.c_str(), (int)next.is_delete_marker());
2103-
2104-
olh.update(next_key, next.is_delete_marker());
2105-
olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
2106-
} else {
2107-
// next_key is empty, but we need to preserve its name in case this entry
2108-
// gets resharded, because this key is used for hash placement
2109-
next_key.name = dest_key.name;
2110-
olh.update(next_key, false);
2111-
olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
2112-
olh.set_exists(false);
2113-
olh.set_pending_removal(true);
21142147
}
2115-
}
21162148

2117-
if (!obj.is_delete_marker()) {
2118-
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
2119-
} else {
2120-
/* this is a delete marker, it's our responsibility to remove its
2121-
* instance entry */
2122-
ret = obj.unlink(header, op.key);
2149+
ret = obj.unlink_list_entry(header);
21232150
if (ret < 0) {
21242151
return ret;
21252152
}
21262153
}
2154+
else {
2155+
ret = obj.unlink_list_entry(header);
2156+
if (ret < 0) {
2157+
return ret;
2158+
}
21272159

2128-
ret = obj.unlink_list_entry(header);
2129-
if (ret < 0) {
2130-
return ret;
2160+
if (obj.is_delete_marker()) {
2161+
return 0;
2162+
}
2163+
2164+
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
21312165
}
21322166

21332167
ret = olh.write(header);
@@ -2144,7 +2178,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
21442178
}
21452179

21462180
rgw_bucket_entry_ver ver;
2147-
ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
2181+
ver.epoch = candidate_epoch;
21482182

21492183
real_time mtime = obj.mtime(); /* mtime has no real meaning in
21502184
* instance removal context */

0 commit comments

Comments
 (0)