diff --git a/oxia/batch/batcher.go b/oxia/batch/batcher.go index 671dd7495..889382d90 100644 --- a/oxia/batch/batcher.go +++ b/oxia/batch/batcher.go @@ -19,6 +19,8 @@ import ( "io" "sync/atomic" "time" + + "github.com/oxia-db/oxia/oxia/internal/model" ) var ErrShuttingDown = errors.New("shutting down") @@ -78,13 +80,31 @@ func (b *batcherImpl) Run() { //nolint:revive batch = nil } + var prevCallIsDelete *bool = nil for { select { case call := <-b.callC: + var del = false + switch call.(type) { + case model.DeleteCall: + del = true + case model.DeleteRangeCall: + del = true + } + if prevCallIsDelete == nil { + prevCallIsDelete = &del + } + if batch == nil { newBatch() } canAdd := batch.CanAdd(call) + if canAdd { + if del && !*prevCallIsDelete || !del && *prevCallIsDelete { + canAdd = false + } + } + if !canAdd { completeBatch() newBatch() @@ -93,6 +113,7 @@ func (b *batcherImpl) Run() { //nolint:revive if batch.Size() == b.maxRequestsPerBatch || b.linger == 0 { completeBatch() } + prevCallIsDelete = &del case <-timeout: if batch != nil { diff --git a/oxiad/dataserver/public_rpc_server_test.go b/oxiad/dataserver/public_rpc_server_test.go index 7ddbe1dd0..d19ac01a0 100644 --- a/oxiad/dataserver/public_rpc_server_test.go +++ b/oxiad/dataserver/public_rpc_server_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/oxia-db/oxia/oxia" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -83,3 +84,31 @@ func TestWriteClientClose(t *testing.T) { assert.Error(t, err) assert.ErrorIs(t, err, io.EOF) } + +func TestPutThenDeleteThenPut(t *testing.T) { + standaloneServer, err := NewStandalone(NewTestConfig(t.TempDir())) + assert.NoError(t, err) + defer standaloneServer.Close() + + client, err := oxia.NewAsyncClient(standaloneServer.ServiceAddr(), oxia.WithBatchLinger(1*time.Second), oxia.WithMaxRequestsPerBatch(1000)) + assert.NoError(t, err) + + defer client.Close() + + putResult := client.Put("test-key", []byte("test-value")) + deleteResult := client.Delete("test-key") + putResult2 := client.Put("test-key", []byte("test-value2")) + + r1 := <-putResult + r2 := <-deleteResult + r3 := <-putResult2 + + assert.NoError(t, r1.Err) + assert.NoError(t, r2) + assert.NoError(t, r3.Err) + + v := <-client.Get("test-key") + assert.NoError(t, v.Err) + + assert.Equal(t, []byte("test-value2"), v.Value) +}