Skip to content

Commit adae78e

Browse files
committed
Squashed 'src/leveldb/' changes from aca1ffc..ae6c262
ae6c262 Merge branch 'leveldb' into ripple-fork 28fa222 Looks like a bit more delay is needed to smooth the latency. a18f3e6 Tidy up JobQueue, add ripple_core module ab82e57 Release leveldb 1.12 02c6259 Release leveldb 1.11 5bbb544 Rate limit compactions with a 25ms pause after each complete file. 8c29c47 LevelDB issue 178 fix: cannot resize a level 0 compaction set 18b245c Added GNU/kFreeBSD kernel name (TARGET_OS) 8be9d12 CondVar::SignalAll was broken, leading to deadlocks on Windows builds. http://code.google.com/p/leveldb/issues/detail?id=149 c9fc070 Upgrade LevelDB to 1.10.0, mostly for better write stall logging. 8215b15 Tweak to variable name to support unity build git-subtree-dir: src/leveldb git-subtree-split: ae6c2620b2ef3d5c69e63dc0eda865d6a39fa061
1 parent c25e981 commit adae78e

19 files changed

+196
-59
lines changed

AUTHORS

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ Google Inc.
66
# Initial version authors:
77
Jeffrey Dean <[email protected]>
88
Sanjay Ghemawat <[email protected]>
9+
10+
# Partial list of contributors:
11+
Kevin Regan <[email protected]>

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ TESTS = \
4242
env_test \
4343
filename_test \
4444
filter_block_test \
45+
issue178_test \
4546
log_test \
4647
memenv_test \
4748
skiplist_test \
@@ -69,7 +70,7 @@ SHARED = $(SHARED1)
6970
else
7071
# Update db.h if you change these.
7172
SHARED_MAJOR = 1
72-
SHARED_MINOR = 9
73+
SHARED_MINOR = 12
7374
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
7475
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
7576
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
@@ -146,6 +147,9 @@ filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
146147
filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
147148
$(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
148149

150+
issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS)
151+
$(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
152+
149153
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
150154
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
151155

build_detect_platform

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ case "$TARGET_OS" in
9494
PLATFORM_LIBS="-lpthread"
9595
PORT_FILE=port/port_posix.cc
9696
;;
97+
GNU/kFreeBSD)
98+
PLATFORM=OS_KFREEBSD
99+
COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_KFREEBSD"
100+
PLATFORM_LIBS="-lpthread"
101+
PORT_FILE=port/port_posix.cc
102+
;;
97103
NetBSD)
98104
PLATFORM=OS_NETBSD
99105
COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_NETBSD"

db/db_impl.cc

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
namespace leveldb {
3737

38+
const int kNumNonTableCacheFiles = 10;
39+
3840
// Information kept for every waiting writer
3941
struct DBImpl::Writer {
4042
Status status;
@@ -92,9 +94,9 @@ Options SanitizeOptions(const std::string& dbname,
9294
Options result = src;
9395
result.comparator = icmp;
9496
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
95-
ClipToRange(&result.max_open_files, 20, 50000);
96-
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
97-
ClipToRange(&result.block_size, 1<<10, 4<<20);
97+
ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
98+
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
99+
ClipToRange(&result.block_size, 1<<10, 4<<20);
98100
if (result.info_log == NULL) {
99101
// Open a log file in the same directory as the db
100102
src.env->CreateDir(dbname); // In case it does not exist
@@ -130,12 +132,13 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
130132
log_(NULL),
131133
tmp_batch_(new WriteBatch),
132134
bg_compaction_scheduled_(false),
133-
manual_compaction_(NULL) {
135+
manual_compaction_(NULL),
136+
consecutive_compaction_errors_(0) {
134137
mem_->Ref();
135138
has_imm_.Release_Store(NULL);
136139

137140
// Reserve ten files or so for other uses and give the rest to TableCache.
138-
const int table_cache_size = options.max_open_files - 10;
141+
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
139142
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
140143

141144
versions_ = new VersionSet(dbname_, &options_, table_cache_,
@@ -310,16 +313,24 @@ Status DBImpl::Recover(VersionEdit* edit) {
310313
if (!s.ok()) {
311314
return s;
312315
}
316+
std::set<uint64_t> expected;
317+
versions_->AddLiveFiles(&expected);
313318
uint64_t number;
314319
FileType type;
315320
std::vector<uint64_t> logs;
316321
for (size_t i = 0; i < filenames.size(); i++) {
317-
if (ParseFileName(filenames[i], &number, &type)
318-
&& type == kLogFile
319-
&& ((number >= min_log) || (number == prev_log))) {
322+
if (ParseFileName(filenames[i], &number, &type)) {
323+
expected.erase(number);
324+
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
320325
logs.push_back(number);
321326
}
322327
}
328+
if (!expected.empty()) {
329+
char buf[50];
330+
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
331+
static_cast<int>(expected.size()));
332+
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
333+
}
323334

324335
// Recover in the order in which the logs were generated
325336
std::sort(logs.begin(), logs.end());
@@ -611,6 +622,7 @@ void DBImpl::BackgroundCall() {
611622
Status s = BackgroundCompaction();
612623
if (s.ok()) {
613624
// Success
625+
consecutive_compaction_errors_ = 0;
614626
} else if (shutting_down_.Acquire_Load()) {
615627
// Error most likely due to shutdown; do not wait
616628
} else {
@@ -622,7 +634,12 @@ void DBImpl::BackgroundCall() {
622634
Log(options_.info_log, "Waiting after background compaction error: %s",
623635
s.ToString().c_str());
624636
mutex_.Unlock();
625-
env_->SleepForMicroseconds(1000000);
637+
++consecutive_compaction_errors_;
638+
int seconds_to_sleep = 1;
639+
for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
640+
seconds_to_sleep *= 2;
641+
}
642+
env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
626643
mutex_.Lock();
627644
}
628645
}
@@ -805,6 +822,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
805822
(unsigned long long) output_number,
806823
(unsigned long long) current_entries,
807824
(unsigned long long) current_bytes);
825+
826+
// rate-limit compaction file creation with a 100ms pause
827+
env_->SleepForMicroseconds(100000);
808828
}
809829
}
810830
return s;
@@ -1268,10 +1288,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
12681288
} else if (imm_ != NULL) {
12691289
// We have filled up the current memtable, but the previous
12701290
// one is still being compacted, so we wait.
1291+
Log(options_.info_log, "Current memtable full; waiting...\n");
12711292
bg_cv_.Wait();
12721293
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
12731294
// There are too many level-0 files.
1274-
Log(options_.info_log, "waiting...\n");
1295+
Log(options_.info_log, "Too many L0 files; waiting...\n");
12751296
bg_cv_.Wait();
12761297
} else {
12771298
// Attempt to switch to a new memtable and trigger compaction of old

db/db_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class DBImpl : public DB {
163163

164164
// Have we encountered a background error in paranoid mode?
165165
Status bg_error_;
166+
int consecutive_compaction_errors_;
166167

167168
// Per level compaction stats. stats_[level] stores the stats for
168169
// compactions that produced data for the specified "level".

db/db_test.cc

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ class AtomicCounter {
3333
public:
3434
AtomicCounter() : count_(0) { }
3535
void Increment() {
36+
IncrementBy(1);
37+
}
38+
void IncrementBy(int count) {
3639
MutexLock l(&mu_);
37-
count_++;
40+
count_ += count;
3841
}
3942
int Read() {
4043
MutexLock l(&mu_);
@@ -45,6 +48,10 @@ class AtomicCounter {
4548
count_ = 0;
4649
}
4750
};
51+
52+
void DelayMilliseconds(int millis) {
53+
Env::Default()->SleepForMicroseconds(millis * 1000);
54+
}
4855
}
4956

5057
// Special Env used to delay background operations
@@ -69,6 +76,7 @@ class SpecialEnv : public EnvWrapper {
6976
AtomicCounter random_read_counter_;
7077

7178
AtomicCounter sleep_counter_;
79+
AtomicCounter sleep_time_counter_;
7280

7381
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
7482
delay_sstable_sync_.Release_Store(NULL);
@@ -103,7 +111,7 @@ class SpecialEnv : public EnvWrapper {
103111
Status Flush() { return base_->Flush(); }
104112
Status Sync() {
105113
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
106-
env_->SleepForMicroseconds(100000);
114+
DelayMilliseconds(100);
107115
}
108116
return base_->Sync();
109117
}
@@ -174,8 +182,9 @@ class SpecialEnv : public EnvWrapper {
174182

175183
virtual void SleepForMicroseconds(int micros) {
176184
sleep_counter_.Increment();
177-
target()->SleepForMicroseconds(micros);
185+
sleep_time_counter_.IncrementBy(micros);
178186
}
187+
179188
};
180189

181190
class DBTest {
@@ -461,6 +470,20 @@ class DBTest {
461470
}
462471
return result;
463472
}
473+
474+
bool DeleteAnSSTFile() {
475+
std::vector<std::string> filenames;
476+
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
477+
uint64_t number;
478+
FileType type;
479+
for (size_t i = 0; i < filenames.size(); i++) {
480+
if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
481+
ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
482+
return true;
483+
}
484+
}
485+
return false;
486+
}
464487
};
465488

466489
TEST(DBTest, Empty) {
@@ -611,7 +634,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
611634
}
612635

613636
// Step 4: Wait for compaction to finish
614-
env_->SleepForMicroseconds(1000000);
637+
DelayMilliseconds(1000);
615638

616639
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
617640
} while (ChangeOptions());
@@ -1295,7 +1318,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
12951318
Reopen();
12961319
Reopen();
12971320
ASSERT_EQ("(a->v)", Contents());
1298-
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
1321+
DelayMilliseconds(1000); // Wait for compaction to finish
12991322
ASSERT_EQ("(a->v)", Contents());
13001323
}
13011324

@@ -1311,7 +1334,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
13111334
Put("","");
13121335
Reopen();
13131336
Put("","");
1314-
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
1337+
DelayMilliseconds(1000); // Wait for compaction to finish
13151338
Reopen();
13161339
Put("d","dv");
13171340
Reopen();
@@ -1321,7 +1344,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
13211344
Delete("b");
13221345
Reopen();
13231346
ASSERT_EQ("(->)(c->cv)", Contents());
1324-
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
1347+
DelayMilliseconds(1000); // Wait for compaction to finish
13251348
ASSERT_EQ("(->)(c->cv)", Contents());
13261349
}
13271350

@@ -1506,6 +1529,30 @@ TEST(DBTest, NoSpace) {
15061529
ASSERT_GE(env_->sleep_counter_.Read(), 5);
15071530
}
15081531

1532+
TEST(DBTest, ExponentialBackoff) {
1533+
Options options = CurrentOptions();
1534+
options.env = env_;
1535+
Reopen(&options);
1536+
1537+
ASSERT_OK(Put("foo", "v1"));
1538+
ASSERT_EQ("v1", Get("foo"));
1539+
Compact("a", "z");
1540+
env_->non_writable_.Release_Store(env_); // Force errors for new files
1541+
env_->sleep_counter_.Reset();
1542+
env_->sleep_time_counter_.Reset();
1543+
for (int i = 0; i < 5; i++) {
1544+
dbfull()->TEST_CompactRange(2, NULL, NULL);
1545+
}
1546+
env_->non_writable_.Release_Store(NULL);
1547+
1548+
// Wait for compaction to finish
1549+
DelayMilliseconds(1000);
1550+
1551+
ASSERT_GE(env_->sleep_counter_.Read(), 5);
1552+
ASSERT_LT(env_->sleep_counter_.Read(), 10);
1553+
ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
1554+
}
1555+
15091556
TEST(DBTest, NonWritableFileSystem) {
15101557
Options options = CurrentOptions();
15111558
options.write_buffer_size = 1000;
@@ -1519,7 +1566,7 @@ TEST(DBTest, NonWritableFileSystem) {
15191566
fprintf(stderr, "iter %d; errors %d\n", i, errors);
15201567
if (!Put("foo", big).ok()) {
15211568
errors++;
1522-
env_->SleepForMicroseconds(100000);
1569+
DelayMilliseconds(100);
15231570
}
15241571
}
15251572
ASSERT_GT(errors, 0);
@@ -1567,6 +1614,24 @@ TEST(DBTest, ManifestWriteError) {
15671614
}
15681615
}
15691616

1617+
TEST(DBTest, MissingSSTFile) {
1618+
ASSERT_OK(Put("foo", "bar"));
1619+
ASSERT_EQ("bar", Get("foo"));
1620+
1621+
// Dump the memtable to disk.
1622+
dbfull()->TEST_CompactMemTable();
1623+
ASSERT_EQ("bar", Get("foo"));
1624+
1625+
Close();
1626+
ASSERT_TRUE(DeleteAnSSTFile());
1627+
Options options = CurrentOptions();
1628+
options.paranoid_checks = true;
1629+
Status s = TryReopen(&options);
1630+
ASSERT_TRUE(!s.ok());
1631+
ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
1632+
<< s.ToString();
1633+
}
1634+
15701635
TEST(DBTest, FilesDeletedAfterCompaction) {
15711636
ASSERT_OK(Put("foo", "v2"));
15721637
Compact("a", "z");
@@ -1711,13 +1776,13 @@ TEST(DBTest, MultiThreaded) {
17111776
}
17121777

17131778
// Let them run for a while
1714-
env_->SleepForMicroseconds(kTestSeconds * 1000000);
1779+
DelayMilliseconds(kTestSeconds * 1000);
17151780

17161781
// Stop the threads and wait for them to finish
17171782
mt.stop.Release_Store(&mt);
17181783
for (int id = 0; id < kNumThreads; id++) {
17191784
while (mt.thread_done[id].Acquire_Load() == NULL) {
1720-
env_->SleepForMicroseconds(100000);
1785+
DelayMilliseconds(100);
17211786
}
17221787
}
17231788
} while (ChangeOptions());

db/dbformat.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ std::string ParsedInternalKey::DebugString() const {
2626
(unsigned long long) sequence,
2727
int(type));
2828
std::string result = "'";
29-
result += user_key.ToString();
29+
result += EscapeString(user_key.ToString());
3030
result += buf;
3131
return result;
3232
}

db/filename_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ TEST(FileNameTest, Parse) {
7070
for (int i = 0; i < sizeof(errors) / sizeof(errors[0]); i++) {
7171
std::string f = errors[i];
7272
ASSERT_TRUE(!ParseFileName(f, &number, &type)) << f;
73-
};
73+
}
7474
}
7575

7676
TEST(FileNameTest, Construction) {

0 commit comments

Comments
 (0)