-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Expand file tree
/
Copy pathblock_based_table_builder.cc
More file actions
3000 lines (2773 loc) · 122 KB
/
block_based_table_builder.cc
File metadata and controls
3000 lines (2773 loc) · 122 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/block_based_table_builder.h"
#include <atomic>
#include <cassert>
#include <cstdio>
#include <list>
#include <map>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
#include "block_cache.h"
#include "cache/cache_entry_roles.h"
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
#include "logging/logging.h"
#include "memory/memory_allocator_impl.h"
#include "options/options_helper.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/table.h"
#include "rocksdb/types.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h"
#include "table/block_based/partitioned_filter_block.h"
#include "table/block_based/user_defined_index_wrapper.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/bit_fields.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/defer.h"
#include "util/semaphore.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
extern const std::string kHashIndexPrefixesBlock;
extern const std::string kHashIndexPrefixesMetadataBlock;
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
// Create a filter block builder based on its type.
FilterBlockBuilder* CreateFilterBlockBuilder(
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
const FilterBuildingContext& context,
const bool use_delta_encoding_for_index_values,
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
const bool persist_user_defined_timestamps) {
const BlockBasedTableOptions& table_opt = context.table_options;
assert(table_opt.filter_policy); // precondition
FilterBitsBuilder* filter_bits_builder =
BloomFilterPolicy::GetBuilderFromContext(context);
if (filter_bits_builder == nullptr) {
return nullptr;
} else {
if (table_opt.partition_filters) {
assert(p_index_builder != nullptr);
// Since after partition cut request from filter builder it takes time
// until index builder actully cuts the partition, until the end of a
// data block potentially with many keys, we take the lower bound as
// partition size.
assert(table_opt.block_size_deviation <= 100);
auto partition_size =
static_cast<uint32_t>(((table_opt.metadata_block_size *
(100 - table_opt.block_size_deviation)) +
99) /
100);
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
return new PartitionedFilterBlockBuilder(
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
filter_bits_builder, table_opt.index_block_restart_interval,
use_delta_encoding_for_index_values, p_index_builder, partition_size,
ts_sz, persist_user_defined_timestamps,
table_opt.decouple_partitioned_filters);
} else {
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
table_opt.whole_key_filtering,
filter_bits_builder);
}
}
}
// A convenience function for populating the Compressor* fields; see ~Rep()
Compressor* MaybeCloneSpecialized(
Compressor* compressor, CacheEntryRole block_type,
Compressor::DictConfigArgs&& dict_config = Compressor::DictDisabled{}) {
auto specialized =
compressor->MaybeCloneSpecialized(block_type, std::move(dict_config));
if (specialized) {
// Caller is responsible for freeing when distinct
return specialized.release();
} else {
return compressor;
}
}
} // namespace
// kBlockBasedTableMagicNumber was picked by running
// echo rocksdb.table.block_based | sha1sum
// and taking the leading 64 bits.
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
// .cc files
// for that reason we declare it extern in the header but to get the space
// allocated
// it must be not extern in one place.
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
// A collector that collects properties of interest to block-based table.
// For now this class looks heavy-weight since we only write one additional
// property.
// But in the foreseeable future, we will add more and more properties that are
// specific to block-based table.
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public InternalTblPropColl {
public:
explicit BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
bool prefix_filtering, bool decoupled_partitioned_filters)
: index_type_(index_type),
whole_key_filtering_(whole_key_filtering),
prefix_filtering_(prefix_filtering),
decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
uint64_t /*file_size*/) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
void BlockAdd(uint64_t /* block_uncomp_bytes */,
uint64_t /* block_compressed_bytes_fast */,
uint64_t /* block_compressed_bytes_slow */) override {
// Intentionally left blank. No interest in collecting stats for
// blocks.
}
Status Finish(UserCollectedProperties* properties) override {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
whole_key_filtering_ ? kPropTrue : kPropFalse});
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
prefix_filtering_ ? kPropTrue : kPropFalse});
if (decoupled_partitioned_filters_) {
properties->insert(
{BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
kPropTrue});
}
return Status::OK();
}
// The name of the properties collector can be used for debugging purpose.
const char* Name() const override {
return "BlockBasedTablePropertiesCollector";
}
UserCollectedProperties GetReadableProperties() const override {
// Intentionally left blank.
return UserCollectedProperties();
}
private:
BlockBasedTableOptions::IndexType index_type_;
bool whole_key_filtering_;
bool prefix_filtering_;
bool decoupled_partitioned_filters_;
};
struct BlockBasedTableBuilder::WorkingAreaPair {
Compressor::ManagedWorkingArea compress;
Decompressor::ManagedWorkingArea verify;
};
// ParallelCompressionRep essentially defines a framework for parallelizing
// block generation ("emit"), block compression, and block writing to storage.
// The synchronization is lock-free/wait-free, so thread waiting only happens
// when work-order dependencies are unsatisfied, though sleeping/idle threads
// might be kept idle when it seems unlikely they would improve throughput by
// waking them up (essentially auto-tuned parallelism). But because all threads
// are capable of 2 out of 3 kinds of work, in a quasi-work-stealing system,
// running threads can usually expect that compatible work is available.
//
// This is currently activated with CompressionOptions::parallel_threads > 1
// but that is a somewhat crude API that would ideally be adapted along with
// the implementation in the future to allow threads to serve multiple
// flush/compaction jobs, though the available improvement might be small.
// Even within the scope of a single file it might be nice to use a general
// framework for distributing work across threads, but (a) different threads
// are limited to which work they can do because of technical challenges, (b)
// being largely CPU bound on small work units means such a framework would
// likely have big overheads compared to this hand-optimized solution.
struct BlockBasedTableBuilder::ParallelCompressionRep {
// The framework has two kinds of threads: the calling thread from
// flush/compaction/SstFileWriter is called the "emit thread" (kEmitter).
// Other threads cannot generally take over "emit" work because that is
// largely happening up the call stack from BlockBasedTableBuilder.
// The emit thread can also take on compression work in a quasi-work-stealing
// manner when the buffer for emitting new blocks is full.
//
// When parallelism is enabled, there are also "worker" threads that
// can handle compressing blocks and (one worker thread at a time) write them
// to the SST file (and handle other single-threaded wrap-up of each block).
//
// NOTE: when parallelism is enabled, the emit thread is not permitted to
// write to the SST file because that is the potential "output" bottleneck,
// and it's generally bad for parallelism to allow the only thread that can
// serve the "input" bottleneck (emit work) to also spend exclusive time on
// the output bottleneck.
enum class ThreadKind {
kEmitter,
kWorker,
};
// ThreadState allows each thread to track its work assignment. In addition to
// the cases already mentioned, kEmitting, kCompressing, and kWriting to the
// SST file writer,
// * Threads can enter the kIdle state so that they can sleep when no work is
// available for them, to be woken up when appropriate.
// * The kEnd state means the thread is not doing any more work items, which
// for worker threads means they will end soon.
// * The kCompressingAndWriting state means a worker can compress and write a
// block without additional state updates because the same block to be
// compressed is the next to be written.
enum class ThreadState {
/* BEGIN Emitter only states */
kEmitting,
/* END Emitter only states */
/* BEGIN states for emitter and worker */
kIdle,
kCompressing,
kEnd,
/* END states for emitter and worker */
/* BEGIN Worker only states */
kCompressingAndWriting,
kWriting,
/* END Worker only states */
};
// BlockRep instances are used and reused in a ring buffer (below), so that
// many blocks can be in an intermediate state between serialized into
// uncompressed bytes and written to the SST file. Notably, each block is
// "emitted" in uncompressed form into a BlockRep, compressed (at least
// attempted, when configured) for updated BlockRep, and then written from the
// BlockRep to the writer for the SST file bytes.
struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
// Uncompressed block contents
std::string uncompressed;
GrowableBuffer compressed;
CompressionType compression_type = kNoCompression;
std::unique_ptr<IndexBuilder::PreparedIndexEntry> prepared_index_entry;
};
// Ring buffer of emitted blocks that may or may not yet be compressed.
std::unique_ptr<BlockRep[]> ring_buffer;
// log_2(ring buffer size), where ring buffer size must be a power of two
const int ring_buffer_nbits;
// ring buffer size - 1, to function as a bit mask for ring buffer positions
// (e.g. given the ordinal number of a block)
const uint32_t ring_buffer_mask;
// Number of threads in worker_threads. (Emit thread doesn't count)
const uint32_t num_worker_threads;
// Rough upper bound on the sst file size contribution from blocks emitted
// into the parallel compression ring buffer but not yet written. Tracks
// uncompressed size, with trailer, until a block is compressed, then
// compressed size until the block is written. (TODO: does not currently
// account for block_align)
RelaxedAtomic<uint64_t> estimated_inflight_size{0};
// Thread objects for worker threads
std::vector<port::Thread> worker_threads;
// Working areas for data_block_compressor for each worker thread
std::vector<WorkingAreaPair> working_areas;
// Semaphores for threads to sleep when there's no available work for them
// and to wake back up when someone determines there is available work (most
// likely). Split between worker threads and emit thread because they can do
// different kinds of work.
CountingSemaphore idle_worker_sem{0};
BinarySemaphore idle_emit_sem{0};
// Primary atomic state of parallel compression, which includes a number of
// state fields that are best updated atomically to avoid locking and/or to
// simplify the interesting interleavings that have to be considered and
// accommodated.
struct State : public BitFields<uint64_t, State> {};
ALIGN_AS(CACHE_LINE_SIZE) BitFieldsAtomic<State> atomic_state;
// The first field is a bit for each ring buffer slot (max 32) for whether
// that slot is ready to be claimed for writing by a worker thread. Because
// compressions might finish out-of-order, we need to track individually
// whether they are finished, though this field doesn't differentiate
// "compression completed" from "compression not started" because that can be
// inferred from NextToCompress. A block might not enter this state, because
// the same thread that compresses it can also immediately write the block if
// it notices that the block is next to write.
using NeedsWriter = UnsignedBitField<State, 32, NoPrevBitField>;
// Track how many worker threads are in an idle state because there was no
// available work and haven't been selected to wake back up.
using IdleWorkerCount = UnsignedBitField<State, 5, NeedsWriter>;
// Track whether the emit thread is an idle state because there was no
// available work and hasn't been triggered to wake back up. The nature of
// available work and atomic CAS assignment of work ensures at least one
// thread is kept out of the idle state.
using IdleEmitFlag = BoolBitField<State, IdleWorkerCount>;
// Track whether threads should end when they finish available work because no
// more blocks will be emitted.
using NoMoreToEmitFlag = BoolBitField<State, IdleEmitFlag>;
// Track whether threads should abort ASAP because of an error.
using AbortFlag = BoolBitField<State, NoMoreToEmitFlag>;
// Track three "NextTo" counters for the positions of the next block to write,
// to start compression, and to emit into the ring buffer. If these counters
// never overflowed / wrapped around, we would have next_to_write <=
// next_to_compress <= next_to_emit because a block must be emitted before
// compressed, and compressed (at least attempted) before writing. We need to
// track more than ring_buffer_nbits of these counters to be able to
// distinguish an empty ring buffer (next_to_write == next_to_emit) from a
// full ring buffer (next_to_write != next_to_emit but equal under
// ring_buffer_mask).
using NextToWrite = UnsignedBitField<State, 8, AbortFlag>;
using NextToCompress = UnsignedBitField<State, 8, NextToWrite>;
using NextToEmit = UnsignedBitField<State, 8, NextToCompress>;
static_assert(NextToEmit::kEndBit == 64);
// BEGIN fields for use by the emit thread only. These can't live on the stack
// because the emit thread frequently returns out of BlockBasedTableBuilder.
ALIGN_AS(CACHE_LINE_SIZE)
ThreadState emit_thread_state = ThreadState::kEmitting;
// Ring buffer index that emit thread is operating on (for emitting and
// compressing states)
uint32_t emit_slot = 0;
// Including some data to inform when to wake up idle worker threads (see
// implementation for details)
int32_t emit_counter_toward_wake_up = 0;
int32_t emit_counter_for_wake_up = 0;
static constexpr int32_t kMaxWakeupInterval = 8;
// END fields for use by the emit thread only
// TSAN on GCC has bugs that report false positives on this watchdog code.
// Other efforts to work around the bug have failed, so to avoid those false
// positive reports, we simply disable the watchdog when running under GCC
// TSAN.
#if !defined(NDEBUG) && !(defined(__GNUC__) && defined(__SANITIZE_THREAD__))
#define BBTB_PC_WATCHDOG 1
#endif
#ifdef BBTB_PC_WATCHDOG
// These are for an extra "watchdog" thread in DEBUG builds that heuristically
// checks for the most likely deadlock conditions. False positives and false
// negatives are technically possible.
std::thread watchdog_thread;
std::mutex watchdog_mutex;
std::condition_variable watchdog_cv;
bool shutdown_watchdog = false;
RelaxedAtomic<uint32_t> live_workers{0};
RelaxedAtomic<uint32_t> idling_workers{0};
RelaxedAtomic<bool> live_emit{0};
RelaxedAtomic<bool> idling_emit{0};
#endif // BBTB_PC_WATCHDOG
int ComputeRingBufferNbits(uint32_t parallel_threads) {
// Ring buffer size is a power of two not to exceed 32 but otherwise
// at least twice the number of threads.
if (parallel_threads >= 9) {
return 5;
} else if (parallel_threads >= 5) {
return 4;
} else if (parallel_threads >= 3) {
return 3;
} else {
assert(parallel_threads > 1);
return 2;
}
}
explicit ParallelCompressionRep(uint32_t parallel_threads)
: ring_buffer_nbits(ComputeRingBufferNbits(parallel_threads)),
ring_buffer_mask((uint32_t{1} << ring_buffer_nbits) - 1),
num_worker_threads(std::min(parallel_threads, ring_buffer_mask)) {
assert(num_worker_threads <= IdleWorkerCount::kMask);
ring_buffer = std::make_unique<BlockRep[]>(ring_buffer_mask + 1);
// Start by aggressively waking up idle workers
emit_counter_for_wake_up = -static_cast<int32_t>(num_worker_threads);
}
~ParallelCompressionRep() {
#ifndef NDEBUG
auto state = atomic_state.Load();
if (state.Get<AbortFlag>() == false) {
// Should be clear / cancelled out with normal shutdown
assert(state.Get<NeedsWriter>() == 0);
// Ring buffer reached empty state
assert(state.Get<NextToWrite>() == state.Get<NextToCompress>());
assert(state.Get<NextToCompress>() == state.Get<NextToEmit>());
// Everything cancels out in inflight size
assert(estimated_inflight_size.LoadRelaxed() == 0);
}
// All idling metadata cleaned up, properly tracked
assert(state.Get<IdleWorkerCount>() == 0);
assert(state.Get<IdleEmitFlag>() == false);
// No excess in semaphores
assert(!idle_emit_sem.TryAcquire());
assert(!idle_worker_sem.TryAcquire());
#endif // !NDEBUG
}
// The primary function for a thread transitioning from one state or work
// assignment to the next. `slot` refers to a position in the ring buffer
// for assigned emit, compression, or write work.
//
// Because both the emit thread and worker threads can work on compression,
// this is a quasi-work-stealing parallel algorithm. (Enabling other threads
// to do emit work would be quite challenging, and allowing the emit thread
// to handle writes could create a bottle-neck.)
//
// This function is basically a CAS loop trying to pick the next piece of work
// for this thread and retrying if CAS fails. This function also handles
// thread idling when that's the appropriate assignment, continuing the loop
// looking for productive work when woken from an idle state.
//
// Precondition: thread_state is appropriate for thread_kind and not kEnd. It
// must match the previously returned state for that thread, and is only kIdle
// for the thread on startup (though the kIdle state is used internal to the
// function).
//
// Postcondition: thread_state is appropriate for thread_kind and not kIdle.
// Except for kEnd state, the calling thread has exclusive access to
// ring_buffer[slot] until next StateTransition().
template <ThreadKind thread_kind>
void StateTransition(
/*in/out*/ ThreadState& thread_state,
/*in/out*/ uint32_t& slot) {
assert(slot <= ring_buffer_mask);
// Last known value for atomic_state
State seen_state = atomic_state.Load();
for (;;) {
if (seen_state.Get<AbortFlag>()) {
thread_state = ThreadState::kEnd;
return;
}
assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
seen_state.Get<NextToCompress>()) <=
ring_buffer_mask + 1);
assert(static_cast<uint8_t>(seen_state.Get<NextToCompress>() -
seen_state.Get<NextToWrite>()) <=
ring_buffer_mask + 1);
assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
seen_state.Get<NextToWrite>()) <=
ring_buffer_mask + 1);
// Draft of the next proposed atomic_state. Start by marking completion of
// the current thread's last work.
State next_state = seen_state;
bool wake_idle = false;
switch (thread_state) {
case ThreadState::kEmitting: {
assert(thread_kind == ThreadKind::kEmitter);
assert(slot == (next_state.Get<NextToEmit>() & ring_buffer_mask));
next_state.Ref<NextToEmit>() += 1;
// Check whether to wake up idle worker thread
if (next_state.Get<IdleWorkerCount>() > 0 &&
// The number of blocks for which compression hasn't started
// is well over the number of active threads.
static_cast<uint8_t>(next_state.Get<NextToEmit>() -
next_state.Get<NextToCompress>()) >=
(ring_buffer_mask + 1) / 4 +
(num_worker_threads -
next_state.Get<IdleWorkerCount>())) {
// At first, emit_counter_for_wake_up is negative to aggressively
// wake up idle worker threads. Then it backs off the interval at
// which we wake up, up to some maximum that attempts to balance
// maximum throughput and minimum CPU overhead.
if (emit_counter_toward_wake_up >= emit_counter_for_wake_up) {
// We reached a threshold to justify a wake-up.
wake_idle = true;
// Adjust idle count assuming we are going to own waking it up,
// so no one else can duplicate that. (The idle count is really
// the number idling for which no one yet owns waking them up.)
next_state.Ref<IdleWorkerCount>() -= 1;
// Reset the counter toward the threshold for wake-up
emit_counter_toward_wake_up = 0;
// Raise the threshold (up to some limit) to stabilize the number
// of active threads after some ramp-up period.
emit_counter_for_wake_up =
std::min(emit_counter_for_wake_up + 1,
static_cast<int32_t>(num_worker_threads +
kMaxWakeupInterval));
} else {
// Advance closer to the threshold for justifying a wake-up
emit_counter_toward_wake_up++;
}
}
break;
}
case ThreadState::kIdle:
// NOTE: thread that signalled to wake up already updated idle count
// or marker. This is required to avoid overflow on the semaphore,
// especially the binary semaphore for idle_emit_sem, and likely
// desirable to avoid spurious/extra Release().
break;
case ThreadState::kCompressing:
next_state.Ref<NeedsWriter>() |= uint32_t{1} << slot;
if constexpr (thread_kind == ThreadKind::kEmitter) {
if (next_state.Get<IdleWorkerCount>() == num_worker_threads) {
// Work is available for a worker thread and none are running
wake_idle = true;
// Adjust idle count assuming we are going to own waking it up
next_state.Ref<IdleWorkerCount>() -= 1;
}
}
break;
case ThreadState::kEnd:
// Should have already recognized the end state
assert(thread_state != ThreadState::kEnd);
return;
case ThreadState::kCompressingAndWriting:
case ThreadState::kWriting:
assert(thread_kind == ThreadKind::kWorker);
assert((next_state.Get<NextToWrite>() & ring_buffer_mask) == slot);
assert(next_state.Get<NextToCompress>() !=
next_state.Get<NextToWrite>());
assert(next_state.Get<NextToEmit>() != next_state.Get<NextToWrite>());
assert((next_state.Get<NeedsWriter>() & (uint32_t{1} << slot)) == 0);
next_state.Ref<NextToWrite>() += 1;
if (next_state.Get<IdleEmitFlag>()) {
wake_idle = true;
// Clear idle emit flag assuming we are going to own waking it up
next_state.Set<IdleEmitFlag>(false);
}
break;
}
// Find the next state, depending on the kind of thread
ThreadState next_thread_state = ThreadState::kEnd;
uint32_t next_slot = 0;
if constexpr (thread_kind == ThreadKind::kEmitter) {
// First priority is emitting more uncompressed blocks, if there's
// room in the ring buffer.
if (static_cast<uint8_t>(next_state.Get<NextToEmit>() -
next_state.Get<NextToWrite>()) <=
ring_buffer_mask) {
// There is room
next_thread_state = ThreadState::kEmitting;
next_slot = next_state.Get<NextToEmit>() & ring_buffer_mask;
}
}
if constexpr (thread_kind == ThreadKind::kWorker) {
// First priority is writing next block to write, if it needs a writer
// assigned to it
uint32_t next_to_write_slot =
next_state.Get<NextToWrite>() & ring_buffer_mask;
uint32_t needs_writer_bit = uint32_t{1} << next_to_write_slot;
if (next_state.Get<NeedsWriter>() & needs_writer_bit) {
// Clear the "needs writer" marker on the slot
next_state.Ref<NeedsWriter>() &= ~needs_writer_bit;
// Take ownership of writing it
next_thread_state = ThreadState::kWriting;
next_slot = next_to_write_slot;
}
}
// If didn't find higher priority work
if (next_thread_state == ThreadState::kEnd) {
if (next_state.Get<NextToCompress>() != next_state.Get<NextToEmit>()) {
// Compression work is available, select that
if (thread_kind == ThreadKind::kWorker &&
next_state.Get<NextToCompress>() ==
next_state.Get<NextToWrite>()) {
next_thread_state = ThreadState::kCompressingAndWriting;
} else {
next_thread_state = ThreadState::kCompressing;
}
next_slot = next_state.Get<NextToCompress>() & ring_buffer_mask;
next_state.Ref<NextToCompress>() += 1;
} else if constexpr (thread_kind == ThreadKind::kEmitter) {
// Emitter thread goes idle
next_thread_state = ThreadState::kIdle;
assert(next_state.Get<IdleEmitFlag>() == false);
assert(next_state.Get<NoMoreToEmitFlag>() == false);
next_state.Set<IdleEmitFlag>(true);
} else if (next_state.Get<NoMoreToEmitFlag>()) {
// Worker thread shall not idle if we are done emitting. At least
// one worker will remain unblocked to finish writing
next_thread_state = ThreadState::kEnd;
} else {
// Worker thread goes idle
next_thread_state = ThreadState::kIdle;
assert(next_state.Get<IdleWorkerCount>() < IdleWorkerCount::kMask);
next_state.Ref<IdleWorkerCount>() += 1;
}
}
assert(thread_state != ThreadState::kEnd);
// Attempt to atomically apply the desired/computed state transition
if (atomic_state.CasWeak(seen_state, next_state)) {
// Success
thread_state = next_thread_state;
slot = next_slot;
seen_state = next_state;
if (wake_idle) {
if constexpr (thread_kind == ThreadKind::kEmitter) {
idle_worker_sem.Release();
} else {
idle_emit_sem.Release();
}
}
if (thread_state != ThreadState::kIdle) {
// Successfully transitioned to another useful state
return;
}
// Handle idle state
if constexpr (thread_kind == ThreadKind::kEmitter) {
#ifdef BBTB_PC_WATCHDOG
idling_emit.StoreRelaxed(true);
Defer decr{[this]() { idling_emit.StoreRelaxed(false); }};
#endif // BBTB_PC_WATCHDOG
// Likely go to sleep
idle_emit_sem.Acquire();
} else {
#ifdef BBTB_PC_WATCHDOG
// Tracking for watchdog
idling_workers.FetchAddRelaxed(1);
Defer decr{[this]() { idling_workers.FetchSubRelaxed(1); }};
#endif // BBTB_PC_WATCHDOG
// Likely go to sleep
idle_worker_sem.Acquire();
}
// Update state after sleep
seen_state = atomic_state.Load();
}
// else loop and try again
}
}
void EmitterStateTransition(
/*in/out*/ ThreadState& thread_state,
/*in/out*/ uint32_t& slot) {
StateTransition<ThreadKind::kEmitter>(thread_state, slot);
}
void WorkerStateTransition(
/*in/out*/ ThreadState& thread_state,
/*in/out*/ uint32_t& slot) {
StateTransition<ThreadKind::kWorker>(thread_state, slot);
}
// Exactly wake all idling threads (for an end state)
void WakeAllIdle() {
State old_state, new_state;
auto transform =
IdleEmitFlag::ClearTransform() + IdleWorkerCount::ClearTransform();
atomic_state.Apply(transform, &old_state, &new_state);
assert(new_state.Get<IdleEmitFlag>() == false);
assert(new_state.Get<IdleWorkerCount>() == 0);
if (old_state.Get<IdleEmitFlag>()) {
idle_emit_sem.Release();
}
idle_worker_sem.Release(old_state.Get<IdleWorkerCount>());
}
// Called by emit thread if it is decided no more blocks will be emitted into
// this SST file.
void SetNoMoreToEmit(/*in/out*/ ThreadState& thread_state,
/*in/out*/ uint32_t& slot) {
(void)slot;
State old_state;
atomic_state.Apply(NoMoreToEmitFlag::SetTransform(), &old_state);
assert(old_state.Get<NoMoreToEmitFlag>() == false);
assert(slot == BitwiseAnd(old_state.Get<NextToEmit>(), ring_buffer_mask));
assert(thread_state == ThreadState::kEmitting);
thread_state = ThreadState::kEnd;
WakeAllIdle();
}
// Called by any thread to abort parallel compression, etc. because of an
// error.
void SetAbort(/*in/out*/ ThreadState& thread_state) {
State old_state;
atomic_state.Apply(AbortFlag::SetTransform(), &old_state);
if (old_state.Get<AbortFlag>() == false) {
// First to set abort. Wake all workers and emitter
WakeAllIdle();
}
thread_state = ThreadState::kEnd;
}
#ifdef BBTB_PC_WATCHDOG
// Logic for the extra "watchdog" thread in DEBUG builds that heuristically
// checks for the most likely deadlock conditions.
//
// Some ways to manually validate the watchdog:
// * Insert
// if (Random::GetTLSInstance()->OneIn(100)) {
// sleep(100);
// }
// after either of the calls to semaphore Acquire above.
// * Miss some Release()s in WakeAllIdle()
//
// and run table_test unit tests.
void BGWatchdog() {
int count_toward_deadlock_judgment = 0;
for (;;) {
// Check for termination condition: All workers and emit thread have
// completed.
if (live_workers.LoadRelaxed() == 0 && live_emit.LoadRelaxed() == false) {
return;
}
// Check for potential deadlock condition
if (idling_workers.LoadRelaxed() < live_workers.LoadRelaxed() ||
(live_emit.LoadRelaxed() && !idling_emit.LoadRelaxed())) {
// Someone is working, all good
count_toward_deadlock_judgment = 0;
} else {
// Could be a deadlock state, but could also be a transient
// state where someone has woken up but not cleared their idling flag.
// Give it plenty of time and watchdog thread wake-ups before
// declaring deadlock.
count_toward_deadlock_judgment++;
if (count_toward_deadlock_judgment >= 70) {
fprintf(stderr,
"Error: apparent deadlock in parallel compression. "
"Aborting. %u / %u, %d / %d, %llx\n",
(unsigned)idling_workers.LoadRelaxed(),
(unsigned)live_workers.LoadRelaxed(),
(int)idling_emit.LoadRelaxed(), (int)live_emit.LoadRelaxed(),
(long long)atomic_state.Load().underlying);
std::terminate();
}
}
// Sleep for 1s at a time unless we are woken up because other threads
// ended.
std::unique_lock<std::mutex> lock(watchdog_mutex);
if (!shutdown_watchdog) {
watchdog_cv.wait_for(lock, std::chrono::seconds{1});
}
}
}
#endif // BBTB_PC_WATCHDOG
};
struct WarmCacheConfig {
const bool enabled;
const Cache::Priority priority;
static WarmCacheConfig Compute(
BlockBasedTableOptions::PrepopulateBlockCache mode,
TableFileCreationReason reason) {
bool enabled = false;
Cache::Priority priority = Cache::Priority::LOW;
switch (mode) {
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
enabled = (reason == TableFileCreationReason::kFlush);
break;
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushAndCompaction:
enabled = (reason == TableFileCreationReason::kFlush ||
reason == TableFileCreationReason::kCompaction);
if (reason == TableFileCreationReason::kCompaction) {
priority = Cache::Priority::BOTTOM;
}
break;
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
break;
default:
assert(false);
}
return {enabled, priority};
}
};
struct BlockBasedTableBuilder::Rep {
const ImmutableOptions ioptions;
// BEGIN from MutableCFOptions
std::shared_ptr<const SliceTransform> prefix_extractor;
// END from MutableCFOptions
const WriteOptions write_options;
const BlockBasedTableOptions table_options;
const InternalKeyComparator& internal_comparator;
// Size in bytes for the user-defined timestamps.
size_t ts_sz;
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
// user key will be stripped when creating the block based table. This
// stripping happens for all user keys, including the keys in data block,
// index block for data block, index block for index block (if index type is
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
// deletion entries.
// As long as the user keys are sorted when added via `Add` API, their logic
// ordering won't change after timestamps are stripped. However, for each user
// key to be logically equivalent before and after timestamp is stripped, the
// user key should contain the minimum timestamp.
bool persist_user_defined_timestamps;
WritableFileWriter* file;
// The current offset is only written by the current designated writer thread
// but can be read by other threads to estimate current file size
RelaxedAtomic<uint64_t> offset{0};
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks to replay later. Needed when
// compression dictionary is enabled so we can finalize the dictionary before
// compressing any data blocks.
std::vector<std::string> data_block_buffers;
BlockBuilder range_del_block;
InternalKeySliceTransform internal_prefix_transform;
std::unique_ptr<IndexBuilder> index_builder;
std::string index_separator_scratch;
PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_ikey; // Internal key or empty (unset)
bool uses_explicit_compression_manager = false;
uint64_t sample_for_compression;
RelaxedAtomic<uint64_t> compressible_input_data_bytes{0};
RelaxedAtomic<uint64_t> uncompressible_input_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_input_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_output_slow_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_output_fast_data_bytes{0};
uint32_t compression_parallel_threads;
int max_compressed_bytes_per_kb;
// Dictionary guidance for data blocks (from GetDictGuidance())
Compressor::DictConfig data_block_dict_guidance;
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
// *** these are distinct fields to minimize extra conditionals and ***
// *** field reads on hot code paths. And to avoid interlocked ***
// *** instructions associated with shared_ptr. ***
// A compressor for blocks in general, without dictionary compression
std::unique_ptr<Compressor> basic_compressor;
// Built-in compressors for compression size sampling
std::unique_ptr<Compressor> fast_sample_compressor;
std::unique_ptr<Compressor> slow_sample_compressor;
// A compressor for data blocks, which might be tuned differently and might
// use dictionary compression (when applicable). See ~Rep() for some details.
UnownedPtr<Compressor> data_block_compressor = nullptr;
// A compressor for index blocks, which might be tuned differently from
// basic_compressor. See ~Rep() for some details.
UnownedPtr<Compressor> index_block_compressor = nullptr;
// A decompressor corresponding to basic_compressor (when non-nullptr).
// Used for verification and cache warming.
std::shared_ptr<Decompressor> basic_decompressor;
// When needed, a decompressor for verifying compression using a
// dictionary sampled/trained from this file.
std::unique_ptr<Decompressor> verify_decompressor_with_dict;
// When non-nullptr, compression should be verified with this corresponding
// decompressor, except for data blocks. (Points to same as basic_decompressor
// when verify_compression is set.)
UnownedPtr<Decompressor> verify_decompressor;
// Once configured/determined, points to one of the above Decompressors to use
// in verifying data blocks.
UnownedPtr<Decompressor> data_block_verify_decompressor;
// Set of compression types used for blocks in this file (mixing compression
// algorithms in a single file is allowed, using a CompressionManager)
SmallEnumSet<CompressionType, kDisableCompressionOption>
compression_types_used;
// Working area for basic_compressor when compression_parallel_threads==1
WorkingAreaPair index_block_working_area;
// Working area for data_block_compressor, for emit/compaction thread
WorkingAreaPair data_block_working_area;
size_t data_begin_offset = 0;
TableProperties props;
// States of the builder.
//
// - `kBuffered`: This is the initial state where zero or more data blocks are
// accumulated uncompressed in-memory. From this state, call
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
// state.
//
// - `kUnbuffered`: This is the state when compression dictionary is finalized
// either because it wasn't enabled in the first place or it's been created
// from sampling previously buffered data. In this state, blocks are simply
// compressed/written out as they fill up. From this state, call `Finish()`
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
// the partially created file.
//
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
// called, so the table builder is no longer usable. We must be in this
// state by the time the destructor runs.
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};
State state = State::kUnbuffered;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit = 0;
std::shared_ptr<CacheReservationManager>
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
OffsetableCacheKey base_cache_key;
const TableFileCreationReason reason;
const bool target_file_size_is_upper_bound;
BlockHandle pending_handle; // Handle to add to index block
GrowableBuffer single_threaded_compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
std::unique_ptr<ParallelCompressionRep> pc_rep;
RelaxedAtomic<uint64_t> worker_cpu_micros{0};
const WarmCacheConfig warm_cache_config;
BlockCreateContext create_context;
// The size of the "tail" part of a SST file. "Tail" refers to
// all blocks after data blocks till the end of the SST file.
uint64_t tail_size;
// The total size of all blocks in this file before they are compressed.
// This is used for logging compaction stats.
uint64_t pre_compression_size = 0;
// See class Footer
uint32_t base_context_checksum;
uint64_t get_offset() { return offset.LoadRelaxed(); }
void set_offset(uint64_t o) { offset.StoreRelaxed(o); }
bool IsParallelCompressionActive() const { return pc_rep != nullptr; }
Status GetStatus() { return GetIOStatus(); }
bool StatusOk() {
// The OK case is optimized with an atomic. Relaxed is sufficient because
// if a thread other than the emit/compaction thread sets to non-OK it
// will synchronize that in aborting parallel compression.
bool ok = io_status_ok.LoadRelaxed();
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (ok) {
std::lock_guard<std::mutex> lock(io_status_mutex);
// Double-check
if (io_status_ok.LoadRelaxed()) {
io_status.PermitUncheckedError();
assert(io_status.ok());
} else {
ok = false;
}
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
return ok;
}
IOStatus GetIOStatus() {
// See StatusOk, which is optimized to avoid Status object copies
if (LIKELY(io_status_ok.LoadRelaxed())) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
std::lock_guard<std::mutex> lock(io_status_mutex);
// Double-check
if (io_status_ok.LoadRelaxed()) {
io_status.PermitUncheckedError();