diff --git a/iterator_test.go b/iterator_test.go index 651ef2081b..e4b306fa03 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -255,22 +255,23 @@ func (c *minSeqNumPropertyCollector) AddRangeKeys(span sstable.Span) error { return nil } -func (c *minSeqNumPropertyCollector) FinishDataBlock(buf []byte) ([]byte, error) { - return nil, nil +func (c *minSeqNumPropertyCollector) Finish(buf []byte) []byte { + return binary.AppendUvarint(buf, uint64(c.minSeqNum)) } -func (c *minSeqNumPropertyCollector) AddPrevDataBlockToIndexBlock() {} - -func (c *minSeqNumPropertyCollector) FinishIndexBlock(buf []byte) ([]byte, error) { - return nil, nil -} - -func (c *minSeqNumPropertyCollector) FinishTable(buf []byte) ([]byte, error) { - return binary.AppendUvarint(buf, uint64(c.minSeqNum)), nil +func (c *minSeqNumPropertyCollector) AddCollected(prop []byte) error { + res, n := binary.Uvarint(prop) + if n <= 0 { + panic("could not decode") + } + if c.minSeqNum == 0 || c.minSeqNum > base.SeqNum(res) { + c.minSeqNum = base.SeqNum(res) + } + return nil } func (c *minSeqNumPropertyCollector) AddCollectedWithSuffixReplacement( - oldProp []byte, oldSuffix, newSuffix []byte, + oldProp []byte, newSuffix []byte, ) error { return errors.Errorf("not implemented") } @@ -279,6 +280,9 @@ func (c *minSeqNumPropertyCollector) SupportsSuffixReplacement() bool { return false } +// Close is part of the BlockPropertyCollector interface. +func (c *minSeqNumPropertyCollector) Close() {} + // minSeqNumFilter is a BlockPropertyFilter that uses the // minSeqNumPropertyCollector data to filter out entire tables. type minSeqNumFilter struct { diff --git a/sstable/block_property.go b/sstable/block_property.go index 29b469235f..59349ad0fc 100644 --- a/sstable/block_property.go +++ b/sstable/block_property.go @@ -11,23 +11,26 @@ import ( "sync" "unsafe" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" ) // Block properties are an optional user-facing feature that can be used to -// filter data blocks (and whole sstables) from an Iterator before they are -// loaded. They do not apply to range delete blocks. These are expected to -// very concisely represent a set of some attribute value contained within the -// key or value, such that the set includes all the attribute values in the -// block. This has some similarities with OLAP pruning approaches that -// maintain min-max attribute values for some column (which concisely -// represent a set), that is then used to prune at query time. In Pebble's -// case, data blocks are small, typically 25-50KB, so these properties should -// reduce their precision in order to be concise -- a good rule of thumb is to -// not consume more than 50-100 bytes across all properties maintained for a -// block, i.e., a 500x reduction compared to loading the data block. +// filter data blocks or index blocks or whole sstables from an Iterator before +// they are loaded. +// +// Block properties are expected to very concisely represent a set of some +// attribute value contained within the key or value, such that the set includes +// all the attribute values in the block. This has some similarities with OLAP +// pruning approaches that maintain min-max attribute values for some column +// (which concisely represent a set), that is then used to prune at query time. +// In Pebble's case, data blocks are small, typically 25-50KB, so these +// properties should reduce their precision in order to be concise -- a good +// rule of thumb is to not consume more than 50-100 bytes across all properties +// maintained for a block, i.e., a 500x reduction compared to loading the data +// block. // // A block property must be assigned a unique name, which is encoded and // stored in the sstable. This name must be unique among all user-properties @@ -37,17 +40,12 @@ import ( // considered semantically identical. The caller is free to choose the // semantics of an empty byte slice e.g. they could use it to represent the // empty set or the universal set, whichever they think is more common and -// therefore better to encode more concisely. The serialization of the -// property for the various Finish*() calls in a BlockPropertyCollector -// implementation should be identical, since the corresponding -// BlockPropertyFilter implementation is not told the context in which it is -// deserializing the property. +// therefore better to encode more concisely. // -// Block properties are more general than table properties and should be -// preferred over using table properties. A BlockPropertyCollector can achieve -// identical behavior to table properties by returning the nil slice from -// FinishDataBlock and FinishIndexBlock, and interpret them as the universal -// set in BlockPropertyFilter, and return a non-universal set in FinishTable. +// Block properties are hierarchical: the properties for an index block must be +// derivable just from the properties of the data blocks it contains. Similarly, +// the table properties must be derivable just from the properties of the index +// blocks and range block. // // Block property filtering is nondeterministic because the separation of keys // into blocks is nondeterministic. Clients use block-property filters to @@ -83,19 +81,10 @@ import ( // compactions. If Pebble is configured with such value separation, block // properties must only apply to the key, and will be provided a nil value. -// BlockPropertyCollector is used when writing a sstable. -// -// - All calls to Add are included in the next FinishDataBlock, after which -// the next data block is expected to start. -// -// - The index entry generated for the data block, which contains the return -// value from FinishDataBlock, is not immediately included in the current -// index block. It is included when AddPrevDataBlockToIndexBlock is called. -// An alternative would be to return an opaque handle from FinishDataBlock -// and pass it to a new AddToIndexBlock method, which requires more -// plumbing, and passing of an interface{} results in a undesirable heap -// allocation. AddPrevDataBlockToIndexBlock must be called before keys are -// added to the new data block. +// BlockPropertyCollector is used when writing a sstable. Multiple +// BlockPropertyCollector instances are used for each property, according to the +// various levels (data/range blocks, index blocks, table). The lowest levels +// use AddPointKey()/AddRangeKey() while the other levels use AddCollected().` type BlockPropertyCollector interface { // Name returns the name of the block property collector. Name() string @@ -104,16 +93,20 @@ type BlockPropertyCollector interface { // sstable. The callee can assume that these are in sorted order. AddPointKey(key InternalKey, value []byte) error - // AddRangeKeys is called for each range span added to the sstable. The range - // key properties are stored separately and don't contribute to data block - // properties. They are only used when FinishTable is called. - // TODO(radu): clean up this subtle semantic. + // AddRangeKeys is called for each range span added to a range key block in + // the sstable. The callee can assume these are fragmented and in sorted + // order. AddRangeKeys(span keyspan.Span) error + // AddCollected adds previously collected property data. For example, when + // calculating properties for index blocks, AddCollected is called with the + // results of Finish for each data block. + AddCollected(prop []byte) error + // AddCollectedWithSuffixReplacement adds previously collected property data - // and updates it to reflect a change of suffix on all keys: the old property - // data is assumed to be constructed from keys that all have the same - // oldSuffix and is recalculated to reflect the same keys but with newSuffix. + // after updating to reflect a change of suffix on all keys: the property data + // is recalculated to reflect the same keys it was computed from but with + // newSuffix. // // A collector which supports this method must be able to derive its updated // value from its old value and the change being made to the suffix, without @@ -129,29 +122,19 @@ type BlockPropertyCollector interface { // This method is optional (if it is not implemented, it always returns an // error). SupportsSuffixReplacement() can be used to check if this method is // implemented. - AddCollectedWithSuffixReplacement(oldProp []byte, oldSuffix, newSuffix []byte) error + AddCollectedWithSuffixReplacement(oldProp []byte, newSuffix []byte) error // SupportsSuffixReplacement returns whether the collector supports the // AddCollectedWithSuffixReplacement method. SupportsSuffixReplacement() bool - // FinishDataBlock is called when all the entries have been added to a - // data block. Subsequent Add calls will be for the next data block. It - // returns the property value for the finished block. - FinishDataBlock(buf []byte) ([]byte, error) - - // AddPrevDataBlockToIndexBlock adds the entry corresponding to the - // previous FinishDataBlock to the current index block. - AddPrevDataBlockToIndexBlock() - - // FinishIndexBlock is called when an index block, containing all the - // key-value pairs since the last FinishIndexBlock, will no longer see new - // entries. It returns the property value for the index block. - FinishIndexBlock(buf []byte) ([]byte, error) + // Finish appends the property value to buf and resets the collector to an + // empty state. + Finish(buf []byte) []byte - // FinishTable is called when the sstable is finished, and returns the - // property value for the sstable. - FinishTable(buf []byte) ([]byte, error) + // Close can be used to allow the implementation to be reused in the future. + // Once Close is called, the object must no longer be used. + Close() } // BlockPropertyFilter is used in an Iterator to filter sstables and blocks @@ -233,9 +216,7 @@ type BlockIntervalCollector struct { mapper IntervalMapper suffixReplacer BlockIntervalSuffixReplacer - blockInterval BlockInterval - indexInterval BlockInterval - tableInterval BlockInterval + interval BlockInterval } var _ BlockPropertyCollector = &BlockIntervalCollector{} @@ -273,11 +254,19 @@ func NewBlockIntervalCollector( if mapper == nil { panic("mapper must be provided") } - return &BlockIntervalCollector{ + c := blockIntervalCollectorPool.Get().(*BlockIntervalCollector) + *c = BlockIntervalCollector{ name: name, mapper: mapper, suffixReplacer: suffixReplacer, } + return c +} + +var blockIntervalCollectorPool = sync.Pool{ + New: func() interface{} { + return &BlockIntervalCollector{} + }, } // Name is part of the BlockPropertyCollector interface. @@ -291,7 +280,7 @@ func (b *BlockIntervalCollector) AddPointKey(key InternalKey, value []byte) erro if err != nil { return err } - b.blockInterval.UnionWith(interval) + b.interval.UnionWith(interval) return nil } @@ -306,14 +295,27 @@ func (b *BlockIntervalCollector) AddRangeKeys(span Span) error { } // Range keys are not included in block or index intervals; they just apply // directly to the table interval. - b.tableInterval.UnionWith(interval) + b.interval.UnionWith(interval) + return nil +} + +// AddCollected is part of the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) AddCollected(prop []byte) error { + i, err := decodeBlockInterval(prop) + if err != nil { + return err + } + b.interval.UnionWith(i) return nil } // AddCollectedWithSuffixReplacement is part of the BlockPropertyCollector interface. func (b *BlockIntervalCollector) AddCollectedWithSuffixReplacement( - oldProp []byte, oldSuffix, newSuffix []byte, + oldProp []byte, newSuffix []byte, ) error { + if b.suffixReplacer == nil { + return errors.Errorf("%s does not support suffix replacement", b.name) + } i, err := decodeBlockInterval(oldProp) if err != nil { return err @@ -322,7 +324,7 @@ func (b *BlockIntervalCollector) AddCollectedWithSuffixReplacement( if err != nil { return err } - b.blockInterval.UnionWith(i) + b.interval.UnionWith(i) return nil } @@ -331,30 +333,17 @@ func (b *BlockIntervalCollector) SupportsSuffixReplacement() bool { return b.suffixReplacer != nil } -// FinishDataBlock is part of the BlockPropertyCollector interface. -func (b *BlockIntervalCollector) FinishDataBlock(buf []byte) ([]byte, error) { - buf = encodeBlockInterval(b.blockInterval, buf) - b.tableInterval.UnionWith(b.blockInterval) - return buf, nil -} - -// AddPrevDataBlockToIndexBlock implements the BlockPropertyCollector -// interface. -func (b *BlockIntervalCollector) AddPrevDataBlockToIndexBlock() { - b.indexInterval.UnionWith(b.blockInterval) - b.blockInterval = BlockInterval{} -} - -// FinishIndexBlock implements the BlockPropertyCollector interface. -func (b *BlockIntervalCollector) FinishIndexBlock(buf []byte) ([]byte, error) { - buf = encodeBlockInterval(b.indexInterval, buf) - b.indexInterval = BlockInterval{} - return buf, nil +// Finish is part of the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Finish(buf []byte) []byte { + result := encodeBlockInterval(b.interval, buf) + b.interval = BlockInterval{} + return result } -// FinishTable implements the BlockPropertyCollector interface. -func (b *BlockIntervalCollector) FinishTable(buf []byte) ([]byte, error) { - return encodeBlockInterval(b.tableInterval, buf), nil +// Close is part of the BlockPropertyCollector interface. +func (b *BlockIntervalCollector) Close() { + *b = BlockIntervalCollector{} + blockIntervalCollectorPool.Put(b) } // BlockInterval represents the [Lower, Upper) interval of 64-bit values diff --git a/sstable/block_property_obsolete.go b/sstable/block_property_obsolete.go index c89268f127..4dd57ef560 100644 --- a/sstable/block_property_obsolete.go +++ b/sstable/block_property_obsolete.go @@ -16,9 +16,7 @@ import ( // For an explanation of obsolete keys, see the comment for TableFormatPebblev4 // which explains obsolete keys. type obsoleteKeyBlockPropertyCollector struct { - blockIsNonObsolete bool - indexIsNonObsolete bool - tableIsNonObsolete bool + hasNonObsoletePoint bool } var _ BlockPropertyCollector = (*obsoleteKeyBlockPropertyCollector)(nil) @@ -42,36 +40,32 @@ func (o *obsoleteKeyBlockPropertyCollector) AddRangeKeys(span Span) error { // AddPoint is an out-of-band method that is specific to this collector. func (o *obsoleteKeyBlockPropertyCollector) AddPoint(isObsolete bool) { - o.blockIsNonObsolete = o.blockIsNonObsolete || !isObsolete + o.hasNonObsoletePoint = o.hasNonObsoletePoint || !isObsolete } -// FinishDataBlock is part of the BlockPropertyCollector interface. -func (o *obsoleteKeyBlockPropertyCollector) FinishDataBlock(buf []byte) ([]byte, error) { - o.tableIsNonObsolete = o.tableIsNonObsolete || o.blockIsNonObsolete - return obsoleteKeyBlockPropertyEncode(!o.blockIsNonObsolete, buf), nil +// Finish is part of the BlockPropertyCollector interface. +func (o *obsoleteKeyBlockPropertyCollector) Finish(buf []byte) []byte { + res := obsoleteKeyBlockPropertyEncode(!o.hasNonObsoletePoint, buf) + o.hasNonObsoletePoint = false + return res } -// AddPrevDataBlockToIndexBlock is part of the BlockPropertyCollector interface. -func (o *obsoleteKeyBlockPropertyCollector) AddPrevDataBlockToIndexBlock() { - o.indexIsNonObsolete = o.indexIsNonObsolete || o.blockIsNonObsolete - o.blockIsNonObsolete = false -} - -// FinishIndexBlock is part of the BlockPropertyCollector interface. -func (o *obsoleteKeyBlockPropertyCollector) FinishIndexBlock(buf []byte) ([]byte, error) { - buf = obsoleteKeyBlockPropertyEncode(!o.indexIsNonObsolete, buf) - o.indexIsNonObsolete = false - return buf, nil -} +// Close is part of the BlockPropertyCollector interface. +func (o *obsoleteKeyBlockPropertyCollector) Close() {} -// FinishTable is part of the BlockPropertyCollector interface. -func (o *obsoleteKeyBlockPropertyCollector) FinishTable(buf []byte) ([]byte, error) { - return obsoleteKeyBlockPropertyEncode(!o.tableIsNonObsolete, buf), nil +// AddCollected is part of the BlockPropertyCollector interface. +func (o *obsoleteKeyBlockPropertyCollector) AddCollected(oldProp []byte) error { + isObsolete, err := obsoleteKeyBlockPropertyDecode(oldProp) + if err != nil { + return err + } + o.hasNonObsoletePoint = o.hasNonObsoletePoint || !isObsolete + return nil } // AddCollectedWithSuffixReplacement is part of the BlockPropertyCollector interface. func (o *obsoleteKeyBlockPropertyCollector) AddCollectedWithSuffixReplacement( - oldProp []byte, oldSuffix, newSuffix []byte, + oldProp []byte, newSuffix []byte, ) error { // Verify the property is valid. _, err := obsoleteKeyBlockPropertyDecode(oldProp) @@ -79,7 +73,7 @@ func (o *obsoleteKeyBlockPropertyCollector) AddCollectedWithSuffixReplacement( return err } // Suffix rewriting currently loses the obsolete bit. - o.blockIsNonObsolete = true + o.hasNonObsoletePoint = true return nil } diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 67ecbb2bd5..9c4ba89a57 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -221,53 +221,52 @@ func TestBlockIntervalCollector(t *testing.T) { bic := NewBlockIntervalCollector("foo", testIntervalMapper{}, nil /* suffixReplacer */) require.Equal(t, "foo", bic.Name()) - // First data block has empty point key interval. - encoded, err := bic.FinishDataBlock(nil) - require.NoError(t, err) - require.True(t, bytes.Equal(nil, encoded)) - bic.AddPrevDataBlockToIndexBlock() - // Second data block contains a point and range key interval. The latter - // should not contribute to the block interval. + // Empty block. + data1 := bic.Finish(nil) + require.True(t, bytes.Equal(nil, data1)) + decodeAndCheck(t, data1, BlockInterval{}) + + // Second data block with one point key. addTestPointKeys(t, bic, 20, 24) + data2 := bic.Finish(nil) + decodeAndCheck(t, data2, BlockInterval{20, 25}) + addTestRangeKeys(t, bic, 5, 10, 15) addTestRangeKeys(t, bic, 149) - encoded, err = bic.FinishDataBlock(nil) - require.NoError(t, err) - decoded, err := decodeBlockInterval(encoded) - require.NoError(t, err) - require.Equal(t, BlockInterval{20, 25}, decoded) - var encodedIndexBlock []byte - // Finish index block before including second data block. - encodedIndexBlock, err = bic.FinishIndexBlock(nil) - require.NoError(t, err) - require.True(t, bytes.Equal(nil, encodedIndexBlock)) - bic.AddPrevDataBlockToIndexBlock() + rangeBlock := bic.Finish(nil) + decodeAndCheck(t, rangeBlock, BlockInterval{5, 150}) + + require.NoError(t, bic.AddCollected(data1)) + index1 := bic.Finish(nil) + require.True(t, bytes.Equal(nil, data1)) + decodeAndCheck(t, index1, BlockInterval{}) + // Third data block. addTestPointKeys(t, bic, 14, 10) - encoded, err = bic.FinishDataBlock(nil) - require.NoError(t, err) - decodeAndCheck(t, encoded, BlockInterval{10, 15}) - bic.AddPrevDataBlockToIndexBlock() + data3 := bic.Finish(nil) + decodeAndCheck(t, data3, BlockInterval{10, 15}) + // Fourth data block. addTestPointKeys(t, bic, 100, 104) - encoded, err = bic.FinishDataBlock(nil) - require.NoError(t, err) - decodeAndCheck(t, encoded, BlockInterval{100, 105}) - // Finish index block before including fourth data block. - encodedIndexBlock, err = bic.FinishIndexBlock(nil) - require.NoError(t, err) - decodeAndCheck(t, encodedIndexBlock, BlockInterval{10, 25}) - bic.AddPrevDataBlockToIndexBlock() - // Finish index block that contains only fourth data block. - encodedIndexBlock, err = bic.FinishIndexBlock(nil) - require.NoError(t, err) - decodeAndCheck(t, encodedIndexBlock, BlockInterval{100, 105}) - var encodedTable []byte - // Finish table. The table interval is the union of the current point key - // table interval [10, 105) and the range key interval [5, 150). - encodedTable, err = bic.FinishTable(nil) - require.NoError(t, err) - decodeAndCheck(t, encodedTable, BlockInterval{5, 150}) + data4 := bic.Finish(nil) + decodeAndCheck(t, data4, BlockInterval{100, 105}) + + require.NoError(t, bic.AddCollected(data2)) + require.NoError(t, bic.AddCollected(data3)) + index2 := bic.Finish(nil) + decodeAndCheck(t, index2, BlockInterval{10, 25}) + + require.NoError(t, bic.AddCollected(data4)) + index3 := bic.Finish(nil) + decodeAndCheck(t, index3, BlockInterval{100, 105}) + + // Table properties. + require.NoError(t, bic.AddCollected(index1)) + require.NoError(t, bic.AddCollected(index2)) + require.NoError(t, bic.AddCollected(index3)) + require.NoError(t, bic.AddCollected(rangeBlock)) + table := bic.Finish(nil) + decodeAndCheck(t, table, BlockInterval{5, 150}) } func TestBlockIntervalFilter(t *testing.T) { @@ -298,12 +297,8 @@ func TestBlockIntervalFilter(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - name := "foo" - // The mapper here won't actually be used. - bic := NewBlockIntervalCollector(name, &testIntervalMapper{}, nil) - bif := NewBlockIntervalFilter(name, tc.filter.Lower, tc.filter.Upper, nil) - bic.(*BlockIntervalCollector).blockInterval = tc.prop - prop, _ := bic.FinishDataBlock(nil) + prop := encodeBlockInterval(tc.prop, nil) + bif := NewBlockIntervalFilter("foo", tc.filter.Lower, tc.filter.Upper, nil) intersects, err := bif.Intersects(prop) require.NoError(t, err) require.Equal(t, tc.intersects, intersects) @@ -392,16 +387,11 @@ func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) bic10Id := byte(10) addTestPointKeys(t, bic0, 10, 19) - _, err := bic0.FinishDataBlock(nil) - require.NoError(t, err) - prop0, err := bic0.FinishTable([]byte{bic0Id}) - require.NoError(t, err) + prop0 := bic0.Finish([]byte{bic0Id}) addTestPointKeys(t, bic10, 110, 119) - _, err = bic10.FinishDataBlock(nil) - require.NoError(t, err) - prop10, err := bic10.FinishTable([]byte{bic10Id}) - require.NoError(t, err) + prop10 := bic10.Finish([]byte{bic10Id}) + prop0Str := string(prop0) prop10Str := string(prop10) type filter struct { @@ -538,13 +528,11 @@ func TestBlockPropertiesFilterer_Intersects(t *testing.T) { bic10Id := shortID(10) addTestPointKeys(t, bic0, 19, 10, 15) - prop, err := bic0.FinishDataBlock(encoder.getScratchForProp()) - require.NoError(t, err) + prop := bic0.Finish(encoder.getScratchForProp()) encoder.addProp(bic0Id, prop) addTestPointKeys(t, bic10, 110, 119) - prop, err = bic10.FinishDataBlock(encoder.getScratchForProp()) - require.NoError(t, err) + prop = bic10.Finish(encoder.getScratchForProp()) encoder.addProp(bic10Id, prop) props0And10 := encoder.props() type filter struct { @@ -1332,8 +1320,8 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { } type keyCountCollector struct { - name string - block, index, table int + name string + count int } var _ BlockPropertyCollector = &keyCountCollector{} @@ -1345,55 +1333,45 @@ func keyCountCollectorFn(name string) func() BlockPropertyCollector { func (p *keyCountCollector) Name() string { return p.name } func (p *keyCountCollector) AddPointKey(k InternalKey, _ []byte) error { - p.block++ + p.count++ return nil } func (p *keyCountCollector) AddRangeKeys(span Span) error { return rangekey.Encode(&span, func(k base.InternalKey, v []byte) error { - p.table++ + p.count++ return nil }) } -func (p *keyCountCollector) FinishDataBlock(buf []byte) ([]byte, error) { - buf = append(buf, []byte(strconv.Itoa(int(p.block)))...) - p.table += p.block - return buf, nil +func (p *keyCountCollector) Finish(buf []byte) []byte { + result := append(buf, []byte(strconv.Itoa(int(p.count)))...) + p.count = 0 + return result } -func (p *keyCountCollector) AddPrevDataBlockToIndexBlock() { - p.index += p.block - p.block = 0 -} - -func (p *keyCountCollector) FinishIndexBlock(buf []byte) ([]byte, error) { - buf = append(buf, []byte(strconv.Itoa(int(p.index)))...) - p.index = 0 - return buf, nil -} - -func (p *keyCountCollector) FinishTable(buf []byte) ([]byte, error) { - buf = append(buf, []byte(strconv.Itoa(int(p.table)))...) - p.table = 0 - return buf, nil -} - -func (p *keyCountCollector) AddCollectedWithSuffixReplacement( - oldProp []byte, oldSuffix, newSuffix []byte, -) error { - n, err := strconv.Atoi(string(oldProp)) +func (p *keyCountCollector) AddCollected(prop []byte) error { + n, err := strconv.Atoi(string(prop)) if err != nil { return err } - p.block = n + p.count += n return nil } +func (p *keyCountCollector) AddCollectedWithSuffixReplacement( + oldProp []byte, newSuffix []byte, +) error { + return p.AddCollected(oldProp) +} + func (p *keyCountCollector) SupportsSuffixReplacement() bool { return true } +// Close is part of the BlockPropertyCollector interface. +func (p *keyCountCollector) Close() {} + type intSuffixIntervalMapper struct { suffixLen int } diff --git a/sstable/data_test.go b/sstable/data_test.go index 31f9cb42e5..17a766241f 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -67,7 +67,10 @@ func optsFromArgs(td *datadriven.TestData, writerOpts *WriterOptions) error { func runBuildCmd( td *datadriven.TestData, writerOpts *WriterOptions, cacheSize int, ) (*WriterMetadata, *Reader, error) { - + comparer := writerOpts.Comparer + if comparer == nil { + comparer = DefaultComparer + } f0 := &objstorage.MemObj{} if err := optsFromArgs(td, writerOpts); err != nil { return nil, nil, err @@ -76,16 +79,16 @@ func runBuildCmd( w := NewWriter(f0, *writerOpts) var rangeDels []keyspan.Span rangeDelFrag := keyspan.Fragmenter{ - Cmp: DefaultComparer.Compare, - Format: DefaultComparer.FormatKey, + Cmp: comparer.Compare, + Format: comparer.FormatKey, Emit: func(s keyspan.Span) { rangeDels = append(rangeDels, s) }, } var rangeKeys []keyspan.Span rangeKeyFrag := keyspan.Fragmenter{ - Cmp: DefaultComparer.Compare, - Format: DefaultComparer.FormatKey, + Cmp: comparer.Compare, + Format: comparer.FormatKey, Emit: func(s keyspan.Span) { rangeKeys = append(rangeKeys, s) }, diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index 916c1506c8..706468521b 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -101,9 +101,9 @@ func rewriteKeySuffixesInBlocks( }() for _, c := range w.blockPropCollectors { - if !c.SupportsSuffixReplacement() { + if !c.dataBlock.SupportsSuffixReplacement() { return nil, TableFormatUnspecified, - errors.Errorf("block property collector %s does not support suffix replacement", c.Name()) + errors.Errorf("block property collector %s does not support suffix replacement", c.dataBlock.Name()) } } @@ -329,11 +329,11 @@ func rewriteDataBlocksToWriter( oldShortIDs[i] = invalidShortID } for i, p := range w.blockPropCollectors { - if prop, ok := r.Properties.UserProperties[p.Name()]; ok { + if prop, ok := r.Properties.UserProperties[p.dataBlock.Name()]; ok { was, is := shortID(prop[0]), shortID(i) oldShortIDs[was] = is } else { - return errors.Errorf("sstable does not contain property %s", p.Name()) + return errors.Errorf("sstable does not contain property %s", p.dataBlock.Name()) } } } @@ -361,7 +361,7 @@ func rewriteDataBlocksToWriter( } for i, p := range w.blockPropCollectors { - if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil { + if err := p.dataBlock.AddCollectedWithSuffixReplacement(oldProps[i], to); err != nil { return err } } diff --git a/sstable/writer.go b/sstable/writer.go index 2fe0544454..a68a27ae63 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -167,9 +167,15 @@ type Writer struct { rangeKeyBlock rowblk.Writer topLevelIndexBlock rowblk.Writer props Properties - blockPropCollectors []BlockPropertyCollector - obsoleteCollector obsoleteKeyBlockPropertyCollector - blockPropsEncoder blockPropertiesEncoder + blockPropCollectors []blockPropertyCollectorSet + // obsoleteCollectors is used to avoid allocating these instances. + obsoleteCollectors struct { + dataBlock obsoleteKeyBlockPropertyCollector + indexBlock obsoleteKeyBlockPropertyCollector + rangeKeyBlock obsoleteKeyBlockPropertyCollector + table obsoleteKeyBlockPropertyCollector + } + blockPropsEncoder blockPropertiesEncoder // filter accumulates the filter block. If populated, the filter ingests // either the output of w.split (i.e. a prefix extractor) if w.split is not // nil, or the full keys otherwise. @@ -226,6 +232,39 @@ type pointKeyInfo struct { isObsolete bool } +// blockPropertyCollectorSet contains a set of block property collector +// instances of the same type which are used to calculate the properties for +// various types of blocks. +type blockPropertyCollectorSet struct { + // dataBlock is used to calculate properties for the current data block. + dataBlock BlockPropertyCollector + // indexBlock is used to calculate properties for the current index block. + // This accumulates properties calculated by dataBlock. + indexBlock BlockPropertyCollector + // rangeKeyBlock is used to calculate properties for the range key block. + rangeKeyBlock BlockPropertyCollector + // table is used to calculate properties for the table. This accumulates + // properties calculated by indexBlocks nd rangeKeyBlock. + table BlockPropertyCollector + + // lastDataBlockProps contains the encoded properties of the last data block; + // used to add them to indexBlock when appropriate. + // TODO(radu): finish the index block before finishing the data block that + // didn't fit so we can update indexBlock whenever we finish dataBlock. + lastDataBlockProps []byte +} + +func makeBlockPropertyCollectorSet( + constructFn func() BlockPropertyCollector, +) blockPropertyCollectorSet { + return blockPropertyCollectorSet{ + dataBlock: constructFn(), + indexBlock: constructFn(), + rangeKeyBlock: constructFn(), + table: constructFn(), + } +} + type coordinationState struct { parallelismEnabled bool @@ -970,13 +1009,14 @@ func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) err // property collectors in such Pebble DB's must not look at the value. v = nil } - if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil { + if err := w.blockPropCollectors[i].dataBlock.AddPointKey(key, v); err != nil { w.err = err return err } } if w.tableFormat >= TableFormatPebblev4 { - w.obsoleteCollector.AddPoint(isObsolete) + // The obsolete collector uses a special, out-of-band method. + w.obsoleteCollectors.dataBlock.AddPoint(isObsolete) } w.maybeAddToFilter(key.UserKey) @@ -1353,8 +1393,6 @@ func (w *Writer) flush(key InternalKey) error { if shouldFlushIndexBlock { flushableIndexBlock = w.indexBlock w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled) - // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to - // flush the index block. indexProps, err = w.finishIndexBlockProps() if err != nil { return err @@ -1365,7 +1403,9 @@ func (w *Writer) flush(key InternalKey) error { // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish // the data block, we can call // BlockPropertyCollector.AddPrevDataBlockToIndexBlock. - w.addPrevDataBlockToIndexBlockProps() + if err := w.addPrevDataBlockToIndexBlockProps(); err != nil { + return err + } // Schedule a write. writeTask := writeTaskPool.Get().(*writeTask) @@ -1415,14 +1455,11 @@ func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error { if len(w.blockPropCollectors) == 0 { return nil } - var err error buf.blockPropsEncoder.resetProps() for i := range w.blockPropCollectors { - scratch := buf.blockPropsEncoder.getScratchForProp() - if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil { - return err - } - buf.blockPropsEncoder.addProp(shortID(i), scratch) + c := &w.blockPropCollectors[i] + c.lastDataBlockProps = c.dataBlock.Finish(c.lastDataBlockProps[:0]) + buf.blockPropsEncoder.addProp(shortID(i), c.lastDataBlockProps) } buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps() @@ -1501,10 +1538,14 @@ func (w *Writer) addIndexEntry( return nil } -func (w *Writer) addPrevDataBlockToIndexBlockProps() { +func (w *Writer) addPrevDataBlockToIndexBlockProps() error { for i := range w.blockPropCollectors { - w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock() + c := &w.blockPropCollectors[i] + if err := c.indexBlock.AddCollected(c.lastDataBlockProps); err != nil { + return err + } } + return nil } // addIndexEntrySync adds an index entry for the specified key and block handle. @@ -1533,26 +1574,27 @@ func (w *Writer) addIndexEntrySep( ) var flushableIndexBlock *indexBlockBuf var props []byte - var err error if shouldFlush { flushableIndexBlock = w.indexBlock w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled) w.twoLevelIndex = true // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to // flush the index block. + var err error props, err = w.finishIndexBlockProps() if err != nil { return err } } - err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props) + if err := w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props); err != nil { + return err + } if flushableIndexBlock != nil { flushableIndexBlock.clear() indexBlockBufPool.Put(flushableIndexBlock) } - w.addPrevDataBlockToIndexBlockProps() - return err + return w.addPrevDataBlockToIndexBlockProps() } func shouldFlushWithHints( @@ -1673,23 +1715,20 @@ func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) { return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer} } -// Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated -// -// and has its own lifetime, independent of the Writer and the blockPropsEncoder, -// -// and it is safe to: -// 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned. -// 2. Store the byte slice in the Writer since it is a copy and not supported by -// an underlying buffer. +// finishIndexBlockProps is used when we are finishing an index block; only used +// when two-level indexes are enabled. + +// The byte slice returned by finishIndexBlockProps is heap-allocated and has +// its own lifetime, independent of the Writer and the blockPropsEncoder. func (w *Writer) finishIndexBlockProps() ([]byte, error) { w.blockPropsEncoder.resetProps() - for i := range w.blockPropCollectors { + for i, c := range w.blockPropCollectors { scratch := w.blockPropsEncoder.getScratchForProp() - var err error - if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil { + indexBlockProps := c.indexBlock.Finish(scratch) + if err := c.table.AddCollected(indexBlockProps); err != nil { return nil, err } - w.blockPropsEncoder.addProp(shortID(i), scratch) + w.blockPropsEncoder.addProp(shortID(i), indexBlockProps) } return w.blockPropsEncoder.props(), nil } @@ -1722,6 +1761,8 @@ func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error { return nil } +// writeTwoLevelIndex writes out all the second-level index blocks and +// constructs and writes the top-level index block. func (w *Writer) writeTwoLevelIndex() (block.Handle, error) { props, err := w.finishIndexBlockProps() if err != nil { @@ -1844,7 +1885,7 @@ func (w *Writer) EncodeSpan(span *keyspan.Span) error { return rangedel.Encode(span, w.Add) } for i := range w.blockPropCollectors { - if err := w.blockPropCollectors[i].AddRangeKeys(*span); err != nil { + if err := w.blockPropCollectors[i].rangeKeyBlock.AddRangeKeys(*span); err != nil { return err } } @@ -1924,7 +1965,6 @@ func (w *Writer) Close() (err error) { if w.twoLevelIndex { w.props.IndexType = twoLevelIndex - // Write the two level index block. if _, err = w.writeTwoLevelIndex(); err != nil { return err } @@ -1992,27 +2032,29 @@ func (w *Writer) Close() (err error) { // Finish and record the prop collectors if props are not yet recorded. // Pre-computed props might have been copied by specialized sst creators // like suffix replacer. - if len(w.props.UserProperties) == 0 { - userProps := make(map[string]string) + if len(w.props.UserProperties) == 0 && len(w.blockPropCollectors) > 0 { + w.props.UserProperties = make(map[string]string, len(w.blockPropCollectors)) for i := range w.blockPropCollectors { + tableCollector := w.blockPropCollectors[i].table + if !w.twoLevelIndex { + // If we don't have a two-level index, the index block collector has + // our overall table properties. + tableCollector = w.blockPropCollectors[i].indexBlock + } scratch := w.blockPropsEncoder.getScratchForProp() - // Place the shortID in the first byte. - scratch = append(scratch, byte(i)) - buf, err := w.blockPropCollectors[i].FinishTable(scratch) - if err != nil { + // Add range key properties. + rangeKeyProp := w.blockPropCollectors[i].rangeKeyBlock.Finish(scratch) + if err := tableCollector.AddCollected(rangeKeyProp); err != nil { return err } - var prop string - if len(buf) > 0 { - prop = string(buf) - } + + // Place the shortID in the first byte. + scratch = append(scratch, byte(i)) + prop := tableCollector.Finish(scratch) // NB: The property is populated in the map even if it is the // empty string, since the presence in the map is what indicates // that the block property collector was used when writing. - userProps[w.blockPropCollectors[i].Name()] = prop - } - if len(userProps) > 0 { - w.props.UserProperties = userProps + w.props.UserProperties[tableCollector.Name()] = string(prop) } } @@ -2024,7 +2066,9 @@ func (w *Writer) Close() (err error) { raw.RestartInterval = propertiesBlockRestartInterval w.props.CompressionOptions = rocksDBCompressionOptions w.props.save(w.tableFormat, &raw) - w.layout.WritePropertiesBlock(raw.Finish()) + if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil { + return err + } } // Write the table footer. @@ -2047,6 +2091,16 @@ func (w *Writer) Close() (err error) { indexBlockBufPool.Put(w.indexBlock) w.indexBlock = nil + // Close property collectors. Closing is optional, we don't need to do it in + // error paths. + for _, p := range w.blockPropCollectors { + p.dataBlock.Close() + p.indexBlock.Close() + p.rangeKeyBlock.Close() + p.table.Close() + } + w.blockPropCollectors = nil + // Make any future calls to Set or Close return an error. w.err = errWriterClosed return nil @@ -2179,12 +2233,17 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write w.err = errors.New("pebble: too many block property collectors") return w } - w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors) + w.blockPropCollectors = make([]blockPropertyCollectorSet, 0, numBlockPropertyCollectors) for _, constructFn := range o.BlockPropertyCollectors { - w.blockPropCollectors = append(w.blockPropCollectors, constructFn()) + w.blockPropCollectors = append(w.blockPropCollectors, makeBlockPropertyCollectorSet(constructFn)) } if w.tableFormat >= TableFormatPebblev4 { - w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector) + w.blockPropCollectors = append(w.blockPropCollectors, blockPropertyCollectorSet{ + dataBlock: &w.obsoleteCollectors.dataBlock, + indexBlock: &w.obsoleteCollectors.indexBlock, + rangeKeyBlock: &w.obsoleteCollectors.rangeKeyBlock, + table: &w.obsoleteCollectors.table, + }) } var buf bytes.Buffer @@ -2193,7 +2252,7 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write if i > 0 { buf.WriteString(",") } - buf.WriteString(w.blockPropCollectors[i].Name()) + buf.WriteString(w.blockPropCollectors[i].dataBlock.Name()) } buf.WriteString("]") w.props.PropertyCollectorNames = buf.String() diff --git a/sstable/writer_test.go b/sstable/writer_test.go index ef30633281..95dd92e116 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -14,7 +14,6 @@ import ( "strings" "sync" "testing" - "unsafe" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" @@ -740,147 +739,6 @@ func (f *discardFile) Write(p []byte) error { return nil } -type blockPropErrSite uint - -const ( - errSiteAdd blockPropErrSite = iota - errSiteFinishBlock - errSiteFinishIndex - errSiteFinishTable - errSiteNone -) - -type testBlockPropCollector struct { - errSite blockPropErrSite - err error -} - -var _ BlockPropertyCollector = (*testBlockPropCollector)(nil) - -func (c *testBlockPropCollector) Name() string { return "testBlockPropCollector" } - -func (c *testBlockPropCollector) AddPointKey(_ InternalKey, _ []byte) error { - if c.errSite == errSiteAdd { - return c.err - } - return nil -} - -func (c *testBlockPropCollector) AddRangeKeys(_ Span) error { - if c.errSite == errSiteAdd { - return c.err - } - return nil -} - -func (c *testBlockPropCollector) FinishDataBlock(_ []byte) ([]byte, error) { - if c.errSite == errSiteFinishBlock { - return nil, c.err - } - return nil, nil -} - -func (c *testBlockPropCollector) AddPrevDataBlockToIndexBlock() {} - -func (c *testBlockPropCollector) FinishIndexBlock(_ []byte) ([]byte, error) { - if c.errSite == errSiteFinishIndex { - return nil, c.err - } - return nil, nil -} - -func (c *testBlockPropCollector) FinishTable(_ []byte) ([]byte, error) { - if c.errSite == errSiteFinishTable { - return nil, c.err - } - return nil, nil -} - -func (c *testBlockPropCollector) AddCollectedWithSuffixReplacement( - oldProp []byte, oldSuffix, newSuffix []byte, -) error { - return errors.Errorf("not implemented") -} - -func (c *testBlockPropCollector) SupportsSuffixReplacement() bool { - return false -} - -func TestWriterBlockPropertiesErrors(t *testing.T) { - blockPropErr := errors.Newf("block property collector failed") - testCases := []blockPropErrSite{ - errSiteAdd, - errSiteFinishBlock, - errSiteFinishIndex, - errSiteFinishTable, - errSiteNone, - } - - var ( - k1 = base.MakeInternalKey([]byte("a"), 0, base.InternalKeyKindSet) - v1 = []byte("apples") - k2 = base.MakeInternalKey([]byte("b"), 0, base.InternalKeyKindSet) - v2 = []byte("bananas") - k3 = base.MakeInternalKey([]byte("c"), 0, base.InternalKeyKindSet) - v3 = []byte("carrots") - ) - - for _, tc := range testCases { - t.Run("", func(t *testing.T) { - fs := vfs.NewMem() - f, err := fs.Create("test", vfs.WriteCategoryUnspecified) - require.NoError(t, err) - - w := NewWriter(objstorageprovider.NewFileWritable(f), WriterOptions{ - BlockSize: 1, - BlockPropertyCollectors: []func() BlockPropertyCollector{ - func() BlockPropertyCollector { - return &testBlockPropCollector{ - errSite: tc, - err: blockPropErr, - } - }, - }, - TableFormat: TableFormatPebblev1, - }) - - err = w.Add(k1, v1) - switch tc { - case errSiteAdd: - require.Error(t, err) - require.Equal(t, blockPropErr, err) - return - case errSiteFinishBlock: - require.NoError(t, err) - // Addition of a second key completes the first block. - err = w.Add(k2, v2) - require.Error(t, err) - require.Equal(t, blockPropErr, err) - return - case errSiteFinishIndex: - require.NoError(t, err) - // Addition of a second key completes the first block. - err = w.Add(k2, v2) - require.NoError(t, err) - // The index entry for the first block is added after the completion of - // the second block, which is triggered by adding a third key. - err = w.Add(k3, v3) - require.Error(t, err) - require.Equal(t, blockPropErr, err) - return - } - - err = w.Close() - if tc == errSiteFinishTable { - require.Error(t, err) - require.Equal(t, blockPropErr, err) - } else { - require.NoError(t, err) - } - }) - } -} - func TestWriter_TableFormatCompatibility(t *testing.T) { testCases := []struct { name string @@ -1005,10 +863,9 @@ func TestObsoleteBlockPropertyCollectorFilter(t *testing.T) { // Data block with 1 obsolete and 1 non-obsolete point. c.AddPoint(false) c.AddPoint(true) - finishAndCheck := func(finishFunc func([]byte) ([]byte, error), expectedIntersects bool) { + finishAndCheck := func(expectedIntersects bool) []byte { var buf [1]byte - prop, err := finishFunc(buf[:0:1]) - require.NoError(t, err) + prop := c.Finish(buf[:0:1]) expectedLength := 1 if expectedIntersects { // The common case is encoded in 0 bytes @@ -1016,50 +873,30 @@ func TestObsoleteBlockPropertyCollectorFilter(t *testing.T) { } require.Equal(t, expectedLength, len(prop)) // Confirm that the collector used the slice. - require.Equal(t, unsafe.Pointer(&buf[0]), unsafe.Pointer(&prop[:1][0])) + require.Equal(t, &buf[0], &prop[:1][0]) intersects, err := f.Intersects(prop) require.NoError(t, err) require.Equal(t, expectedIntersects, intersects) + return prop } - finishAndCheck(c.FinishDataBlock, true) - c.AddPrevDataBlockToIndexBlock() + prop1 := finishAndCheck(true) + // Data block with only obsolete points. c.AddPoint(true) c.AddPoint(true) - finishAndCheck(c.FinishDataBlock, false) - c.AddPrevDataBlockToIndexBlock() - // Index block has one obsolete block and one non-obsolete block. - finishAndCheck(c.FinishIndexBlock, true) + prop2 := finishAndCheck(false) // Data block with obsolete point. c.AddPoint(true) - finishAndCheck(c.FinishDataBlock, false) - c.AddPrevDataBlockToIndexBlock() - // Data block with obsolete point. - c.AddPoint(true) - finishAndCheck(c.FinishDataBlock, false) - c.AddPrevDataBlockToIndexBlock() - // Index block has only obsolete blocks. - finishAndCheck(c.FinishIndexBlock, false) - // Table is not obsolete. - finishAndCheck(c.FinishTable, true) + prop3 := finishAndCheck(false) - // Reset the collector state. - c = obsoleteKeyBlockPropertyCollector{} - // Table with only obsolete blocks. + require.NoError(t, c.AddCollected(prop1)) + require.NoError(t, c.AddCollected(prop2)) + finishAndCheck(true) - // Data block with obsolete point. - c.AddPoint(true) - finishAndCheck(c.FinishDataBlock, false) - c.AddPrevDataBlockToIndexBlock() - // Data block with obsolete point. - c.AddPoint(true) - finishAndCheck(c.FinishDataBlock, false) - c.AddPrevDataBlockToIndexBlock() - // Index block has only obsolete blocks. - finishAndCheck(c.FinishIndexBlock, false) - // Table is obsolete. - finishAndCheck(c.FinishTable, false) + require.NoError(t, c.AddCollected(prop2)) + require.NoError(t, c.AddCollected(prop3)) + finishAndCheck(false) } func BenchmarkWriter(b *testing.B) {