Skip to content

Commit dfd4bb1

Browse files
jaffeech7ck
authored andcommitted
Expose translation mvcc (FeatureBaseDB#2381)
* expose Transaction on TranslateStore for DAX Snapshotting * try to fix ramdisk nonsense apparently, we were running in either a shell env or docker env randomly, so this could sometimes pass and sometimes fail since the shell env had the ramdisk set up and docker didn't. Now we force to run in docker always and set up ramdisk explicitly. * debug ramdisk issue? * fix tests... and a buncha other stuff The executor test I modified failed when I changed DefaultPartitionN to 8, but just because stuff was out of order so I made it more robust. I edited some data gen stuff to make shorter lines because it was making grep results unusable. the actual fix is in translate_boltdb_test.go * clean up, fix code review feedback (cherry picked from commit 3693b99)
1 parent a6c165d commit dfd4bb1

File tree

7 files changed

+97
-61
lines changed

7 files changed

+97
-61
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ testvsub:
7777
echo; echo "999 done testing subpkg $$pkg"; \
7878
done
7979

80-
# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramfs
80+
# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramdisk
8181
ramdisk-linux:
82-
mount -o size=2G -t tmpfs none /mnt/ramfs
82+
mount -o size=2G -t tmpfs none /mnt/ramdisk
8383

8484
# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/Volumes/RAMDisk
8585
ramdisk-osx:

api.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ func (r RedirectError) Error() string {
967967
}
968968

969969
// TranslateData returns all translation data in the specified partition.
970-
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (io.WriterTo, error) {
970+
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (TranslateStore, error) {
971971
span, _ := tracing.StartSpanFromContext(ctx, "API.TranslateData")
972972
defer span.Finish()
973973

@@ -1022,7 +1022,7 @@ func (api *API) TranslateData(ctx context.Context, indexName string, partition i
10221022
}
10231023

10241024
// FieldTranslateData returns all translation data in the specified field.
1025-
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (io.WriterTo, error) {
1025+
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (TranslateStore, error) {
10261026
span, _ := tracing.StartSpanFromContext(ctx, "API.FieldTranslateData")
10271027
defer span.Finish()
10281028
if err := api.validate(apiFieldTranslateData); err != nil {
@@ -3136,18 +3136,22 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey
31363136
qtid := req.TableKey.QualifiedTableID()
31373137

31383138
// Create the snapshot for the current version.
3139-
wrTo, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
3139+
trans, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
31403140
if err != nil {
3141-
return errors.Wrapf(err, "getting index/partition writeto: %s/%d", req.TableKey, req.PartitionNum)
3141+
return errors.Wrapf(err, "getting index/partition translate store: %s/%d", req.TableKey, req.PartitionNum)
31423142
}
3143+
// get a write tx to ensure no other writes while incrementing WL version.
3144+
wrTo, err := trans.Begin(true)
3145+
if err != nil {
3146+
return errors.Wrap(err, "beginning table translate write tx")
3147+
}
3148+
defer wrTo.Rollback()
31433149

3144-
// TODO(jaffee) need to ensure writes to translation data can't
3145-
// occur while this is happening.
31463150
resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum)
31473151
if err := resource.IncrementWLVersion(); err != nil {
31483152
return errors.Wrap(err, "incrementing write log version")
31493153
}
3150-
// TODO(jaffee) downgrade (currently non-existent) lock to read-only
3154+
// TODO(jaffee) downgrade write tx to read-only
31513155
err = resource.SnapshotTo(wrTo)
31523156
return errors.Wrap(err, "snapshotting table keys")
31533157
}
@@ -3158,17 +3162,22 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey
31583162
qtid := req.TableKey.QualifiedTableID()
31593163

31603164
// Create the snapshot for the current version.
3161-
// TODO(jaffee) change this to get write lock
3162-
wrTo, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
3165+
trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
31633166
if err != nil {
31643167
return errors.Wrap(err, "getting index/field writeto")
31653168
}
3169+
// get a write tx to ensure no other writes while incrementing WL version.
3170+
wrTo, err := trans.Begin(true)
3171+
if err != nil {
3172+
return errors.Wrap(err, "beginning field translate write tx")
3173+
}
3174+
defer wrTo.Rollback()
31663175

31673176
resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field)
31683177
if err := resource.IncrementWLVersion(); err != nil {
31693178
return errors.Wrap(err, "incrementing writelog version")
31703179
}
3171-
// TODO(jaffee) downgrade to read lock
3180+
// TODO(jaffee) downgrade to read tx
31723181
err = resource.SnapshotTo(wrTo)
31733182
return errors.Wrap(err, "snapshotTo in FieldKeys")
31743183
}

executor_test.go

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5132,47 +5132,50 @@ func TestExecutor_Execute_Extract_Keyed(t *testing.T) {
51325132
`)
51335133

51345134
resp := c.Query(t, c.Idx(), `Extract(All(), Rows(set))`)
5135-
expect := []interface{}{
5136-
pilosa.ExtractedTable{
5137-
Fields: []pilosa.ExtractedTableField{
5138-
{
5139-
Name: "set",
5140-
Type: "[]uint64",
5141-
},
5135+
expect := pilosa.ExtractedTable{
5136+
Fields: []pilosa.ExtractedTableField{
5137+
{
5138+
Name: "set",
5139+
Type: "[]uint64",
51425140
},
5143-
// The order of these probably shouldn't matter, but currently depends indirectly on the
5144-
// index.
5145-
Columns: []pilosa.ExtractedTableColumn{
5146-
{
5147-
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
5148-
Rows: []interface{}{
5149-
[]uint64{
5150-
1,
5151-
2,
5152-
},
5141+
},
5142+
// The order of these probably shouldn't matter, but currently depends indirectly on the
5143+
// index.
5144+
Columns: []pilosa.ExtractedTableColumn{
5145+
{
5146+
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
5147+
Rows: []interface{}{
5148+
[]uint64{
5149+
1,
5150+
2,
51535151
},
51545152
},
5155-
{
5156-
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
5157-
Rows: []interface{}{
5158-
[]uint64{
5159-
2,
5160-
},
5153+
},
5154+
{
5155+
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
5156+
Rows: []interface{}{
5157+
[]uint64{
5158+
2,
51615159
},
51625160
},
5163-
{
5164-
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
5165-
Rows: []interface{}{
5166-
[]uint64{},
5167-
},
5161+
},
5162+
{
5163+
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
5164+
Rows: []interface{}{
5165+
[]uint64{},
51685166
},
51695167
},
51705168
},
51715169
}
51725170

5173-
if !reflect.DeepEqual(expect, resp.Results) {
5174-
t.Errorf("expected %v but got %v", expect, resp.Results)
5171+
if len(resp.Results) != 1 {
5172+
t.Fail()
51755173
}
5174+
res := resp.Results[0].(pilosa.ExtractedTable)
5175+
if !reflect.DeepEqual(expect.Fields, res.Fields) {
5176+
t.Errorf("expected:\n%v\nbut got:\n%v", expect, resp.Results)
5177+
}
5178+
assert.ElementsMatch(t, expect.Columns, res.Columns)
51765179
}
51775180

51785181
func TestExecutor_Execute_MaxMemory(t *testing.T) {
@@ -7429,6 +7432,7 @@ func backupCluster(t *testing.T, c *test.Cluster, index string) (backupDir strin
74297432

74307433
buf := &bytes.Buffer{}
74317434
backupLog := logger.NewStandardLogger(buf)
7435+
74327436
backupCommand := ctl.NewBackupCommand(backupLog)
74337437
backupCommand.Host = c.Nodes[len(c.Nodes)-1].URL() // don't pick node 0 so we don't always get primary (better code coverage)
74347438
backupCommand.Index = index

http_handler.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2790,8 +2790,15 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
27902790
http.Error(w, err.Error(), http.StatusNotFound)
27912791
return
27922792
}
2793+
tx, err := p.Begin(false)
2794+
if err != nil {
2795+
http.Error(w, err.Error(), http.StatusInternalServerError)
2796+
return
2797+
}
2798+
defer tx.Rollback()
2799+
27932800
// Stream translate data to response body.
2794-
if _, err := p.WriteTo(w); err != nil {
2801+
if _, err := tx.WriteTo(w); err != nil {
27952802
h.logger.Errorf("error streaming translation data: %s", err)
27962803
}
27972804
return
@@ -2816,8 +2823,14 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
28162823
http.Error(w, err.Error(), http.StatusNotFound)
28172824
return
28182825
}
2826+
tx, err := p.Begin(false)
2827+
if err != nil {
2828+
http.Error(w, err.Error(), http.StatusInternalServerError)
2829+
return
2830+
}
2831+
defer tx.Rollback()
28192832
// Stream translate partition to response body.
2820-
if _, err := p.WriteTo(w); err != nil {
2833+
if _, err := tx.WriteTo(w); err != nil {
28212834
h.logger.Errorf("error streaming translation data: %s", err)
28222835
}
28232836
}

translate.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
7676
// Returns a reader from the given ID offset.
7777
EntryReader(ctx context.Context, offset uint64) (TranslateEntryReader, error)
7878

79-
// WriteTo ensures that the TranslateStore implements io.WriterTo.
80-
// It should write the contents of the store to the writer.
81-
WriteTo(io.Writer) (int64, error)
79+
Begin(write bool) (TranslatorTx, error)
8280

8381
// ReadFrom ensures that the TranslateStore implements io.ReaderFrom.
8482
// It should read from the reader and replace the data store with
@@ -88,6 +86,16 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
8886
Delete(records *roaring.Bitmap) (Commitor, error)
8987
}
9088

89+
// TranslatorTx reproduces a subset of the methods on the BoltDB Tx
90+
// object. Others may be needed in the future and we should just add
91+
// them here. The idea is not to scatter direct references to bolt
92+
// stuff throughout the whole codebase.
93+
type TranslatorTx interface {
94+
WriteTo(io.Writer) (int64, error)
95+
Rollback() error
96+
// e.g. Commit() error
97+
}
98+
9199
// OpenTranslateStoreFunc represents a function for instantiating and opening a TranslateStore.
92100
type OpenTranslateStoreFunc func(path, index, field string, partitionID, partitionN int, fsyncEnabled bool) (TranslateStore, error)
93101

translate_boltdb.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -396,14 +396,9 @@ func (s *BoltTranslateStore) MaxID() (max uint64, err error) {
396396
return max, nil
397397
}
398398

399-
// WriteTo writes the contents of the store to the writer.
400-
func (s *BoltTranslateStore) WriteTo(w io.Writer) (int64, error) {
401-
tx, err := s.db.Begin(false)
402-
if err != nil {
403-
return 0, err
404-
}
405-
defer func() { _ = tx.Rollback() }()
406-
return tx.WriteTo(w)
399+
// Begin starts and returns a transaction on the underlying store.
400+
func (s *BoltTranslateStore) Begin(write bool) (TranslatorTx, error) {
401+
return s.db.Begin(write)
407402
}
408403

409404
// ReadFrom reads the content and overwrites the existing store.

translate_boltdb_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -458,12 +458,19 @@ func TestTranslateStore_ReadWrite(t *testing.T) {
458458
buf := bytes.NewBuffer(nil)
459459
expN := s.Size()
460460

461-
// After this, the buffer should contain batch0.
462-
if n, err := s.WriteTo(buf); err != nil {
463-
t.Fatalf("writing to buffer: %s", err)
464-
} else if n != expN {
465-
t.Fatalf("expected buffer size: %d, but got: %d", expN, n)
466-
}
461+
// wrap in a func so we can defer rollback. Need rollback to
462+
// happen before the end of the test. I'm not entirely sure
463+
// why, but it hangs if you don't.
464+
func() {
465+
tx, err := s.Begin(false)
466+
require.NoError(t, err)
467+
defer tx.Rollback()
468+
469+
// After this, the buffer should contain batch0.
470+
n, err := tx.WriteTo(buf)
471+
require.NoError(t, err)
472+
require.Equal(t, expN, n)
473+
}()
467474

468475
// Populate the store with the keys in batch1.
469476
batch1IDs, err := s.CreateKeys(batch1...)

0 commit comments

Comments
 (0)