@@ -42,6 +42,9 @@ RocksDBKVStore::~RocksDBKVStore() {
4242
4343void RocksDBKVStore::open () {
4444 rdbOptions.create_if_missing = true ;
45+ rdbOptions.create_missing_column_families = true ;
46+
47+ seqnoCFOptions.comparator = &vbidSeqnoComparator;
4548
4649 /* Use a listener to set the appropriate engine in the
4750 * flusher threads RocksDB creates. We need the flusher threads to
@@ -58,8 +61,18 @@ void RocksDBKVStore::open() {
5861 const std::string dbname =
5962 dbdir + " /rocksdb." + std::to_string (configuration.getShardId ());
6063
64+ std::vector<rocksdb::ColumnFamilyDescriptor> families{
65+ rocksdb::ColumnFamilyDescriptor (rocksdb::kDefaultColumnFamilyName ,
66+ rocksdb::ColumnFamilyOptions ()),
67+
68+ rocksdb::ColumnFamilyDescriptor (" vbid_seqno_to_key" ,
69+ seqnoCFOptions)};
70+
71+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
72+
6173 rocksdb::DB* dbPtr;
62- rocksdb::Status s = rocksdb::DB::Open (rdbOptions, dbname, &dbPtr);
74+ rocksdb::Status s =
75+ rocksdb::DB::Open (rdbOptions, dbname, families, &handles, &dbPtr);
6376
6477 if (s.ok ()) {
6578 db.reset (dbPtr);
@@ -68,10 +81,15 @@ void RocksDBKVStore::open() {
6881 " RocksDBKVStore::open: failed to open database '" + dbname +
6982 " ': " + s.ToString ());
7083 }
84+
85+ defaultFamilyHandle.reset (handles[0 ]);
86+ seqnoFamilyHandle.reset (handles[1 ]);
7187}
7288
7389void RocksDBKVStore::close () {
7490 batch.reset ();
91+ defaultFamilyHandle.reset ();
92+ seqnoFamilyHandle.reset ();
7593 db.reset ();
7694}
7795
@@ -136,10 +154,13 @@ void RocksDBKVStore::storeItem(const Item& itm) {
136154 uint16_t vbid = itm.getVBucketId ();
137155 auto k = mkKeyStr (vbid, itm.getKey ());
138156 auto v = mkValSlice (itm);
157+ auto seq = mkSeqnoStr (vbid, itm.getBySeqno ());
139158
140159 // TODO RDB: check status.
141- rocksdb::Status s = batch->Put (k, v);
142- cb_assert (s.ok ());
160+ rocksdb::Status s1 = batch->Put (k, v);
161+ rocksdb::Status s2 = batch->Put (seqnoFamilyHandle.get (), seq, k);
162+ cb_assert (s1.ok ());
163+ cb_assert (s2.ok ());
143164}
144165
145166GetValue RocksDBKVStore::get (const DocKey& key, uint16_t vb, bool fetchDelete) {
@@ -205,15 +226,27 @@ static bool matches_prefix(rocksdb::Slice s, size_t len, const char* p) {
205226
206227void RocksDBKVStore::delVBucket (uint16_t vb, uint64_t vb_version) {
207228 std::lock_guard<std::mutex> lg (writeLock);
208- std::unique_ptr< rocksdb::Iterator> it (
209- db-> NewIterator ( rocksdb::ReadOptions ()));
229+ rocksdb::WriteBatch delBatch;
230+
210231 const char * prefix (reinterpret_cast <const char *>(&vb));
211232 std::string start (prefix, sizeof (vb));
212- rocksdb::WriteBatch delBatch;
213- for (it->Seek (start);
214- it->Valid () && matches_prefix (it->key (), sizeof (vb), prefix);
215- it->Next ()) {
216- delBatch.Delete (it->key ());
233+
234+ // We must delete both all the documents for the VB,
235+ // and all the vbid:seqno=>vbid:key mappings
236+ std::vector<rocksdb::ColumnFamilyHandle*> CFHandles{
237+ defaultFamilyHandle.get (), seqnoFamilyHandle.get ()};
238+
239+ // makes use of the fact that keys in both CFs are vbid prefixed
240+ // if we move to a db per vb this will just be dropping the whole DB.
241+ for (auto * handle : CFHandles) {
242+ std::unique_ptr<rocksdb::Iterator> it (
243+ db->NewIterator (rocksdb::ReadOptions (), handle));
244+
245+ for (it->Seek (start);
246+ it->Valid () && matches_prefix (it->key (), sizeof (vb), prefix);
247+ it->Next ()) {
248+ delBatch.Delete (handle, it->key ());
249+ }
217250 }
218251 rocksdb::Status s = db->Write (writeOptions, &delBatch);
219252 cb_assert (s.ok ());
@@ -268,6 +301,26 @@ void RocksDBKVStore::grokKeySlice(const rocksdb::Slice& s,
268301 k->assign (s.data () + sizeof (uint16_t ), s.size () - sizeof (uint16_t ));
269302}
270303
304+ std::string RocksDBKVStore::mkSeqnoStr (uint16_t vbid, int64_t seqno) {
305+ size_t size = sizeof (uint16_t ) + sizeof (int64_t );
306+
307+ std::string buffer;
308+ buffer.reserve (size);
309+
310+ buffer.append (reinterpret_cast <char *>(&vbid), sizeof (vbid));
311+ buffer.append (reinterpret_cast <char *>(&seqno), sizeof (seqno));
312+
313+ return buffer;
314+ }
315+
316+ void RocksDBKVStore::grokSeqnoSlice (const rocksdb::Slice& s,
317+ uint16_t * vb,
318+ int64_t * seqno) {
319+ assert (s.size () == sizeof (uint16_t ) + sizeof (int64_t ));
320+ std::memcpy (vb, s.data (), sizeof (uint16_t ));
321+ std::memcpy (seqno, s.data () + sizeof (uint16_t ), sizeof (int64_t ));
322+ }
323+
271324rocksdb::Slice RocksDBKVStore::mkValSlice (const Item& item) {
272325 // Serialize an Item to the format to write to RocksDB.
273326 // Using the following layout:
0 commit comments