diff --git a/.github/actions/prepare-ee-test-env/action.yml b/.github/actions/prepare-ee-test-env/action.yml new file mode 100644 index 0000000..84dc54e --- /dev/null +++ b/.github/actions/prepare-ee-test-env/action.yml @@ -0,0 +1,72 @@ +name: "Prepare test environment with Tarantool EE" +description: "Prepares test environment with Tarantool EE" + +inputs: + sdk-version: + required: true + type: string + sdk-build: + required: false + type: string + default: release + sdk-gc: + required: false + type: string + default: gc64 + sdk-download-token: + required: true + type: string + skip-etcd-install: + description: Whether to skip etcd installation + type: boolean + required: false + default: false + +env: + # Note: Use exactly match version of tool, to avoid unexpected issues with test on CI. + GO_VERSION: '1.23.8' + +runs: + using: "composite" + steps: + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: '${{ env.GO_VERSION }}' + + - name: Setup python + uses: actions/setup-python@v4 + with: + python-version: '${{ env.PYTHON_VERSION }}' + + - name: Setup Mage + run: | + git clone https://github.com/magefile/mage + cd mage + go run bootstrap.go + shell: bash + + - name: Install build requirements + run: | + sudo apt -y update + sudo apt -y install git gcc make cmake unzip zip fish zsh + shell: bash + + - name: Cache Tarantool SDK + id: cache-sdk + uses: actions/cache@v3 + with: + path: tarantool-enterprise + key: ${{ matrix.sdk-version }} + + - name: Download Tarantool SDK + run: | + ARCHIVE_NAME=tarantool-enterprise-sdk-${{ inputs.sdk-gc }}-${{ inputs.sdk-version }}.tar.gz + ARCHIVE_PATH=$(echo ${{ inputs.sdk-version }} | sed -rn \ + 's/^([0-9]+)\.([0-9]+)\.([0-9]+-){2}([a-z0-9]+-)?r[0-9]+\.([a-z]+)\.([a-z0-9_]+)$/${{ inputs.sdk-build }}\/\5\/\6\/\1\.\2/p') + curl -O -L -v \ + https://tarantool:${{ inputs.sdk-download-token }}@download.tarantool.io/enterprise/${ARCHIVE_PATH}/${ARCHIVE_NAME} + tar -xzf ${ARCHIVE_NAME} + rm -f ${ARCHIVE_NAME} + source tarantool-enterprise/env.sh + shell: bash \ No newline at end of file diff --git a/.github/actions/setup-etcd/action.yml b/.github/actions/setup-etcd/action.yml new file mode 100644 index 0000000..1aef486 --- /dev/null +++ b/.github/actions/setup-etcd/action.yml @@ -0,0 +1,47 @@ +name: 'Setup etcd' +description: 'Download and extract etcd release archive' +inputs: + etcd-version: + description: 'Release name from https://github.com/etcd-io/etcd/releases' + required: false + default: v3.5.9 + install-prefix: + description: 'Where to extract the archive' + default: ${{ github.workspace }}/.etcd/bin/ +runs: + using: 'composite' + steps: + - shell: bash + env: + BASE_URL: "https://github.com/etcd-io/etcd/releases/download" + ETCD_VER: ${{ inputs.etcd-version }} + INSTALL_PREFIX: ${{ inputs.install-prefix }} + run: | + set -eux + rm -rf ${INSTALL_PREFIX} && mkdir -p ${INSTALL_PREFIX} + + OS_NAME="$(uname | tr '[:upper:]' '[:lower:]')" + ARCH=$(uname -m | awk '{print ($0 == "x86_64")?"amd64":"arm64"}') + FILENAME="etcd-${ETCD_VER}-${OS_NAME}-${ARCH}" + if [ "${OS_NAME}" == "linux" ]; then + curl -L "${BASE_URL}/${ETCD_VER}/${FILENAME}.tar.gz" -o "${INSTALL_PREFIX}/${FILENAME}.tar.gz" + tar xvzf "${INSTALL_PREFIX}/${FILENAME}.tar.gz" -C ${INSTALL_PREFIX} --strip-components=1 + elif [[ "${OS_NAME}" == "darwin" ]]; then + curl -L "${BASE_URL}/${ETCD_VER}/${FILENAME}.zip" -o "${INSTALL_PREFIX}/${FILENAME}.zip" + unzip "${INSTALL_PREFIX}/${FILENAME}.zip" -d ${INSTALL_PREFIX} + ln -s ${INSTALL_PREFIX}/${FILENAME}/etcd ${INSTALL_PREFIX}/etcd + ln -s ${INSTALL_PREFIX}/${FILENAME}/etcdctl ${INSTALL_PREFIX}/etcdctl + else + echo "Unsupported OS: ${OS_NAME}" + exit 1 + fi + + - shell: bash + env: + INSTALL_PREFIX: ${{ inputs.install-prefix }} + run: | + set -eux + ${INSTALL_PREFIX}/etcd --version + ${INSTALL_PREFIX}/etcdctl version 2>/dev/null || ${INSTALL_PREFIX}/etcdctl --version + echo "ETCD_PATH=$(echo $INSTALL_PREFIX)" >> "$GITHUB_ENV" + echo "${INSTALL_PREFIX}" >> "$GITHUB_PATH" \ No newline at end of file diff --git a/.github/workflows/tkv.yaml b/.github/workflows/tkv.yaml new file mode 100644 index 0000000..fed0ba2 --- /dev/null +++ b/.github/workflows/tkv.yaml @@ -0,0 +1,54 @@ +name: tkv.yaml +on: + pull_request_target: + types: [ labeled ] + +env: + # Note: Use exactly match version of tool, to avoid unexpected issues with test on CI. + GO_VERSION: '1.24.9' + + +jobs: + full-ci-ee: + # Tests will run only when the pull request is labeled with `full-ci`. To + # avoid security problems, the label must be reset manually for every run. + # + # We need to use `pull_request_target` because it has access to base + # repository secrets unlike `pull_request`. + if: (github.event_name == 'push') || + (github.event_name == 'pull_request_target' && + github.event.action == 'labeled' && + github.event.label.name == 'full-ci') || + (github.event_name == 'pull_request' && + github.event.action == 'synchronize' && + github.event.pull_request.head.repo.full_name == github.repository && + contains(github.event.pull_request.labels.*.name, 'full-ci')) + runs-on: ubuntu-22.04 + strategy: + matrix: + sdk-version: + - "3.5.0-0-r70.linux.x86_64" + fail-fast: false + steps: + # `ref` as merge request is needed for pull_request_target because this + # target runs in the context of the base commit of the pull request. + - uses: actions/checkout@v4 + if: github.event_name == 'pull_request_target' + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + + - uses: actions/checkout@v4 + if: github.event_name != 'pull_request_target' + with: + fetch-depth: 0 + + - name: Prepare EE env + uses: ./.github/actions/prepare-ee-test-env + with: + sdk-version: '${{ matrix.sdk-version }}' + sdk-download-token: '${{ secrets.SDK_DOWNLOAD_TOKEN }}' + + - name: Integration tests + run: + go test ./... -count=1 -v diff --git a/.golangci.yml b/.golangci.yml index 1dac47c..bd43e73 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -50,6 +50,7 @@ linters: - "go.etcd.io/etcd/client/v3" - "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-option" + - "github.com/vmihailenco/msgpack/v5" test: files: - "$test" @@ -57,3 +58,4 @@ linters: - $gostd - "github.com/tarantool/go-storage" - "github.com/stretchr/testify" + - "github.com/tarantool/go-tarantool/v2" diff --git a/driver/driver.go b/driver/driver.go index dc49a03..3ed9188 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -26,5 +26,7 @@ type Driver interface { // Watch establishes a watch stream for changes to a specific key or prefix. // The returned channel will receive events as changes occur. - Watch(ctx context.Context, key []byte, opts ...watch.Option) <-chan watch.Event + // The returned cleanup function should be called to stop the watch and release resources. + // An error is returned if the watch could not be established. + Watch(ctx context.Context, key []byte, opts ...watch.Option) (<-chan watch.Event, func(), error) } diff --git a/driver/etcd/etcd.go b/driver/etcd/etcd.go index 91ed8fc..152b556 100644 --- a/driver/etcd/etcd.go +++ b/driver/etcd/etcd.go @@ -69,6 +69,6 @@ func (d Driver) Execute( // Watch monitors changes to a specific key and returns a stream of events. // It supports optional watch configuration through the opts parameter. -func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) <-chan watch.Event { +func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { panic("implement me") } diff --git a/driver/tcs/tcs.go b/driver/tcs/tcs.go deleted file mode 100644 index 2bab56c..0000000 --- a/driver/tcs/tcs.go +++ /dev/null @@ -1,84 +0,0 @@ -// Package tcs provides a Tarantool Cartridge storage driver implementation. -// It enables using Tarantool as a distributed key-value storage backend. -package tcs - -import ( - "context" - "fmt" - - "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" - - "github.com/tarantool/go-storage/driver" - "github.com/tarantool/go-storage/operation" - "github.com/tarantool/go-storage/predicate" - "github.com/tarantool/go-storage/tx" - "github.com/tarantool/go-storage/watch" -) - -// Driver is a Tarantool implementation of the storage driver interface. -// It uses TCS as the underlying key-value storage backend. -type Driver struct { - conn *pool.ConnectionPool // Tarantool connection pool. -} - -var ( - _ driver.Driver = &Driver{} //nolint:exhaustruct -) - -// New creates a new Tarantool driver instance. -// It establishes connections to Tarantool instances using the provided addresses. -func New(ctx context.Context, addrs []string) (*Driver, error) { - instances := make([]pool.Instance, 0, len(addrs)) - for i, addr := range addrs { - instances = append(instances, pool.Instance{ - Name: fmt.Sprintf("instance-%d", i), - Dialer: &tarantool.NetDialer{ - Address: addr, - User: "user", - Password: "password", - RequiredProtocolInfo: tarantool.ProtocolInfo{ - Auth: tarantool.AutoAuth, - Version: tarantool.ProtocolVersion(0), - Features: nil, - }, - }, - Opts: tarantool.Opts{ - Timeout: 0, - Reconnect: 0, - MaxReconnects: 0, - RateLimit: 0, - RLimitAction: tarantool.RLimitAction(0), - Concurrency: 0, - SkipSchema: false, - Notify: nil, - Handle: nil, - Logger: nil, - }, - }) - } - - conn, err := pool.Connect(ctx, instances) - if err != nil { - return nil, fmt.Errorf("failed to connect to tarantool pool: %w", err) - } - - return &Driver{conn: conn}, nil -} - -// Execute executes a transactional operation with conditional logic. -// It processes predicates to determine whether to execute thenOps or elseOps. -func (d Driver) Execute( - _ context.Context, - _ []predicate.Predicate, - _ []operation.Operation, - _ []operation.Operation, -) (tx.Response, error) { - panic("implement me") -} - -// Watch monitors changes to a specific key and returns a stream of events. -// It supports optional watch configuration through the opts parameter. -func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) <-chan watch.Event { - panic("implement me") -} diff --git a/driver/tkv/integration_test.go b/driver/tkv/integration_test.go new file mode 100644 index 0000000..9ddbb32 --- /dev/null +++ b/driver/tkv/integration_test.go @@ -0,0 +1,771 @@ +package tkv_test + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/pool" + tcshelper "github.com/tarantool/go-tarantool/v2/test_helpers/tcs" + + "github.com/tarantool/go-storage/driver/tkv" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" +) + +// skipIfNoTarantool skips the test if no Tarantool instance is available. +func skipIfNoTarantool(t *testing.T) { + t.Helper() + + if os.Getenv("TARANTOOL_ADDR") == "" { + t.Skip("Skipping test: TARANTOOL_ADDR environment variable not set") + } +} + +// createTestDriver creates a TKV driver for testing. +// It skips the test if no Tarantool instance is available. +func createTestDriver(ctx context.Context, t *testing.T) *tkv.Driver { + t.Helper() + + skipIfNoTarantool(t) + + addrs := []string{} + + // Parse comma-separated addresses. + addr := os.Getenv("TARANTOOL_ADDR") + if addr != "" { + // Split by comma and trim spaces. + for _, a := range strings.Split(addr, ",") { + addrs = append(addrs, strings.TrimSpace(a)) + } + } + + // Create connection pool. + instances := make([]pool.Instance, 0, len(addrs)) + for i, addr := range addrs { + instances = append(instances, pool.Instance{ + Name: string(rune('a' + i)), + Dialer: &tarantool.NetDialer{ + Address: addr, + User: "client", + Password: "secret", + RequiredProtocolInfo: tarantool.ProtocolInfo{ + Auth: 0, + Version: 0, + Features: nil, + }, + }, + Opts: tarantool.Opts{ + Timeout: 0, + Reconnect: 0, + MaxReconnects: 0, + RateLimit: 0, + RLimitAction: 0, + Concurrency: 0, + SkipSchema: false, + Notify: nil, + Handle: nil, + Logger: nil, + }, + }) + } + + conn, err := pool.Connect(ctx, instances) + require.NoError(t, err, "Failed to connect to Tarantool pool") + + // Wrap the pool connection to implement DoerWatcher. + wrapper := pool.NewConnectorAdapter(conn, pool.RW) + + return tkv.New(wrapper) +} + +// cleanupTestKey deletes a test key to ensure clean state. +func cleanupTestKey(ctx context.Context, driver *tkv.Driver, key []byte) { + _, _ = driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) +} + +// testKey generates a unique test key to avoid conflicts between tests. +func testKey(t *testing.T, prefix string) []byte { + t.Helper() + + return []byte("/test/" + prefix + "/" + t.Name()) +} + +// TestTKVDriver_Put tests basic Put operation. +func TestTKVDriver_Put(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "put") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("put-test-value") + + // Put operation. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +// TestTKVDriver_Get tests basic Get operation. +func TestTKVDriver_Get(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "get") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("get-test-value") + + // First put a value. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + // Get operation. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "Get operation should return one result") + assert.Len(t, response.Results[0].Values, 1, "Get operation should return one value in response") + assert.Equal(t, key, response.Results[0].Values[0].Key, "Returned key should match requested key") + assert.Equal(t, value, response.Results[0].Values[0].Value, "Returned value should match stored value") + assert.Positive(t, response.Results[0].Values[0].ModRevision, "ModRevision should be greater than 0") +} + +// TestTKVDriver_Delete tests basic Delete operation. +func TestTKVDriver_Delete(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("delete-test-value") + + // First put a value. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + // Delete operation. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Equal(t, key, response.Results[0].Values[0].Key, "Returned key should match deleted key") + assert.Equal(t, value, response.Results[0].Values[0].Value, "Returned value should match deleted value") + assert.Positive(t, response.Results[0].Values[0].ModRevision, "ModRevision should be greater than 0") +} + +// TestTKVDriver_GetAfterDelete tests Get operation after Delete. +func TestTKVDriver_GetAfterDelete(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "get-after-delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("get-after-delete-test-value") + + // Put a value. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + // Delete the value. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Len(t, response.Results[0].Values, 1, "Delete operation should return one value in response") + + // Get after delete - should succeed but return empty data. + response, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Get operation on deleted key should return empty values") +} + +// TestTKVDriver_ValueEqualPredicate tests ValueEqual predicate functionality. +func TestTKVDriver_ValueEqualPredicate(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "value-equal") + defer cleanupTestKey(ctx, driver, key) + + initialValue := []byte("initial") + updatedValue := []byte("updated") + + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, initialValue), + }, nil) + require.NoError(t, err) + + // Test ValueEqual predicate - should succeed when value matches. + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key, "initial"), + }, []operation.Operation{ + operation.Put(key, updatedValue), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when value matches") + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +// TestTKVDriver_ValueNotEqualPredicate tests ValueNotEqual predicate functionality. +func TestTKVDriver_ValueNotEqualPredicate(t *testing.T) { + t.Parallel() + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "value-not-equal") + defer cleanupTestKey(ctx, driver, key) + + initialValue := []byte("initial") + + // Setup: put initial value. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, initialValue), + }, nil) + require.NoError(t, err) + + // Test ValueNotEqual predicate - should succeed when value doesn't match. + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueNotEqual(key, "different"), + }, []operation.Operation{ + operation.Put(key, []byte("new-value")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when value doesn't match") + + require.Len(t, response.Results, 1, "TX should return one result") + assert.Empty(t, response.Results[0].Values, "Put operation should not return any values in response") +} + +// TestTKVDriver_VersionEqualPredicate tests VersionEqual predicate functionality. +func TestTKVDriver_VersionEqualPredicate(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "version-equal") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("version-test") + + // Put initial value and get its revision. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + + initialRevision := getRevisionFromResponse(t, response, 0) + + response, err = driver.Execute(ctx, []predicate.Predicate{ + predicate.VersionEqual(key, initialRevision), + }, []operation.Operation{ + operation.Put(key, []byte("updated")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when version matches") +} + +// TestTKVDriver_VersionGreaterPredicate tests VersionGreater predicate functionality. +func TestTKVDriver_VersionGreaterPredicate(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "version-greater") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("version-test") + + // Put initial value and get its revision. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err) + + initialRevision := getRevisionFromResponse(t, response, 0) + + response, err = driver.Execute(ctx, []predicate.Predicate{ + predicate.VersionGreater(key, initialRevision-1), + }, []operation.Operation{ + operation.Put(key, []byte("new-value")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Should succeed when version is greater than specified version") +} + +// TestTKVDriver_MultipleKeysPut tests putting multiple keys in one operation. +func TestTKVDriver_MultipleKeysPut(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key1 := testKey(t, "multi-put1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-put2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + // Put multiple keys in one operation. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 2, "TX should return two results") + + for i := range response.Results { + assert.Empty(t, response.Results[i].Values, "Put operation %d should not return any values in response", i) + } +} + +// TestTKVDriver_MultiplePredicates tests transaction with multiple predicates. +func TestTKVDriver_MultiplePredicates(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key1 := testKey(t, "multi-pred1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-pred2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + // Setup: put initial values. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + // Transaction with multiple predicates. + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key1, "value1"), + predicate.ValueEqual(key2, "value2"), + }, []operation.Operation{ + operation.Put(key1, []byte("updated1")), + operation.Put(key2, []byte("updated2")), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Transaction with multiple predicates should succeed") + + require.Len(t, response.Results, 2, "Transaction with multiple predicates should return two results") + + for i := range response.Results { + assert.Empty(t, response.Results[i].Values, "Operation %d should not return any values in response", i) + } +} + +// TestTKVDriver_MultipleOperations tests transaction with multiple operations. +func TestTKVDriver_MultipleOperations(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key1 := testKey(t, "multi-op1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "multi-op2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + // Setup: put initial values. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + // Transaction with multiple operations. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, []byte("updated1")), + operation.Put(key2, []byte("updated2")), + operation.Get(key1), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded, "Transaction with multiple operations should succeed") + + require.Len(t, response.Results, 3, "Transaction with multiple operations should return three results") + assert.Equal(t, key1, response.Results[2].Values[0].Key, "Get operation should return the correct key") + assert.Equal(t, []byte("updated1"), response.Results[2].Values[0].Value, + "Get operation should return the updated value") + assert.Positive(t, response.Results[2].Values[0].ModRevision, + "Get operation should return a valid mod_revision") +} + +// TestTKVDriver_ElseOperations tests transaction with else operations. +func TestTKVDriver_ElseOperations(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key1 := testKey(t, "else-op1") + defer cleanupTestKey(ctx, driver, key1) + + key2 := testKey(t, "else-op2") + defer cleanupTestKey(ctx, driver, key2) + + value1 := []byte("value1") + value2 := []byte("value2") + + // Setup: put initial values. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + }, nil) + require.NoError(t, err) + + // Transaction with predicates that should fail, triggering else operations. + response, err := driver.Execute(ctx, []predicate.Predicate{ + predicate.ValueEqual(key1, "wrong-value"), + }, []operation.Operation{ + operation.Put(key1, []byte("should-not-execute")), + }, []operation.Operation{ + operation.Delete(key1), + operation.Delete(key2), + }) + require.NoError(t, err) + assert.False(t, response.Succeeded, "Transaction should fail when predicates don't match") + + require.Len(t, response.Results, 2, "Transaction with else operations should return two results") + assert.Equal(t, key1, response.Results[0].Values[0].Key, "Delete operation should delete key1") + assert.Equal(t, key2, response.Results[1].Values[0].Key, "Delete operation should delete key2") +} + +// TestTKVDriver_WatchPutEvent tests watch functionality for Put events. +func TestTKVDriver_WatchPutEvent(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "watch-put") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("watch-test-value") + + // Create a context with timeout. + watchCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Start watching before making changes. + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + require.NoError(t, err) + + defer stopWatch() + + // Give watcher time to register. + time.Sleep(500 * time.Millisecond) + + // Trigger put event. + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + // Wait for put event. + select { + case event := <-eventCh: + assert.Equal(t, key, event.Prefix) + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for put event") + } +} + +// TestTKVDriver_WatchDeleteEvent tests watch functionality for Delete events. +func TestTKVDriver_WatchDeleteEvent(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := testKey(t, "watch-delete") + defer cleanupTestKey(ctx, driver, key) + + value := []byte("watch-test-value") + + // Create a context with timeout. + watchCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Start watching before making changes. + eventCh, stopWatch, err := driver.Watch(watchCtx, key) + require.NoError(t, err) + + defer stopWatch() + + // Give watcher time to register. + time.Sleep(500 * time.Millisecond) + + // Put initial value first. + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key, value), + }, nil) + require.NoError(t, err) + + // Wait for initial put event. + select { + case <-eventCh: + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for initial put event") + } + + // Trigger delete event. + _, err = driver.Execute(ctx, nil, []operation.Operation{ + operation.Delete(key), + }, nil) + require.NoError(t, err) + + // Wait for delete event. + select { + case event := <-eventCh: + assert.Equal(t, key, event.Prefix) + case <-watchCtx.Done(): + t.Fatal("Timeout waiting for delete event") + } +} + +// TestTKVDriver_GetByPrefix tests Get operation with prefix to retrieve multiple values. +func TestTKVDriver_GetByPrefix(t *testing.T) { + t.Parallel() + + skipIfNoTarantool(t) + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + // Create a common prefix for all test keys. + basePrefix := "/test/prefix-test/" + t.Name() + "/" + + // Create multiple keys with the same prefix. + key1 := []byte(basePrefix + "key1") + key2 := []byte(basePrefix + "key2") + key3 := []byte(basePrefix + "key3") + key4 := []byte("/test/other-prefix/other-key") // Different prefix for comparison. + + defer cleanupTestKey(ctx, driver, key1) + defer cleanupTestKey(ctx, driver, key2) + defer cleanupTestKey(ctx, driver, key3) + defer cleanupTestKey(ctx, driver, key4) + + value1 := []byte("prefix-value1") + value2 := []byte("prefix-value2") + value3 := []byte("prefix-value3") + value4 := []byte("other-value") + + // Put multiple values with the same prefix and one with different prefix. + _, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Put(key1, value1), + operation.Put(key2, value2), + operation.Put(key3, value3), + operation.Put(key4, value4), + }, nil) + require.NoError(t, err) + + // Get operation with prefix - should return all keys matching the prefix. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get([]byte(basePrefix)), + }, nil) + require.NoError(t, err) + assert.True(t, response.Succeeded) + + require.Len(t, response.Results, 1, "Get operation should return one result") + + // Should return 3 values matching the prefix. + assert.Len(t, response.Results[0].Values, 3, "Get by prefix should return three values matching the prefix") + + // Verify all returned keys have the correct prefix. + for _, kv := range response.Results[0].Values { + assert.True(t, strings.HasPrefix(string(kv.Key), basePrefix), + "Returned key %s should have prefix %s", string(kv.Key), basePrefix) + assert.Positive(t, kv.ModRevision, "ModRevision should be greater than 0") + } + + // Verify specific values are present. + foundKeys := make(map[string][]byte) + for _, kv := range response.Results[0].Values { + foundKeys[string(kv.Key)] = kv.Value + } + + assert.Equal(t, value1, foundKeys[string(key1)], "Should find value1 for key1") + assert.Equal(t, value2, foundKeys[string(key2)], "Should find value2 for key2") + assert.Equal(t, value3, foundKeys[string(key3)], "Should find value3 for key3") + + // Verify key with different prefix is not included. + _, exists := foundKeys[string(key4)] + assert.False(t, exists, "Key with different prefix should not be included in results") +} + +// TestTKVDriver_GetNonExistentKey tests Get operation on non-existent key. +func TestTKVDriver_GetNonExistentKey(t *testing.T) { + t.Parallel() + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + key := []byte("/test/nonexistent/key") + + // Test Get operation on non-existent key. + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get(key), + }, nil) + require.NoError(t, err, "Get operation should not fail for non-existent key") + assert.True(t, response.Succeeded, "Get operation should succeed") +} + +// TestTKVDriver_GetRoot tests getting all keys (root path). +func TestTKVDriver_GetRoot(t *testing.T) { + t.Parallel() + + ctx := context.Background() + driver := createTestDriver(ctx, t) + + // Test getting all keys (root path). + response, err := driver.Execute(ctx, nil, []operation.Operation{ + operation.Get([]byte("/")), + }, nil) + + require.NoError(t, err, "Get root operation should not fail") + assert.True(t, response.Succeeded, "Get root operation should succeed") +} + +// getRevisionFromResponse extracts revision from transaction response. +// This extracts the revision from the tx.Response structure. +func getRevisionFromResponse(t *testing.T, response tx.Response, position int) int64 { + t.Helper() + + require.Greater(t, len(response.Results), position, "expected at least %d results", position+1) + require.NotEmpty(t, response.Results[position].Values, "expected at least %d values", position+1) + + return response.Results[position].Values[0].ModRevision +} + +func TestMain(m *testing.M) { + flag.Parse() + + tcs, err := tcshelper.Start(0) + switch { + case errors.Is(err, tcshelper.ErrNotSupported): + fmt.Println("TcS is not supported:", err) //nolint:forbidigo + case err != nil: + fmt.Println("Failed to start TCS:", err) //nolint:forbidigo + os.Exit(1) + default: + os.Exit(func() int { + defer tcs.Stop() + + // os.Setenv is a temporary hack to set the TARANTOOL_ADDR environment variable. + err := os.Setenv("TARANTOOL_ADDR", strings.Join(tcs.Endpoints(), ",")) + if err != nil { + fmt.Println("Failed to set TARANTOOL_ADDR:", err) //nolint:forbidigo + return 1 + } + + return m.Run() + }()) + } +} diff --git a/driver/tkv/operations.go b/driver/tkv/operations.go new file mode 100644 index 0000000..8b02898 --- /dev/null +++ b/driver/tkv/operations.go @@ -0,0 +1,116 @@ +package tkv + +import ( + "errors" + "fmt" + + "github.com/vmihailenco/msgpack/v5" + + "github.com/tarantool/go-storage/operation" +) + +// Error definitions for err113 compliance. +var ( + // ErrUnknownOperation is returned when the operation is unknown. + ErrUnknownOperation = errors.New("unknown operation") +) + +// FailedToEncodeTkvOperationError is returned when we failed to encode tkvOperation. +type FailedToEncodeTkvOperationError struct { + Text string + Err error +} + +// Error returns the error message. +func (e FailedToEncodeTkvOperationError) Error() string { + return fmt.Sprintf("failed to encode tkvOperation, %s: %s", e.Text, e.Err) +} + +const ( + // putOperationArrayLen is the length of the array that is used to encode a put operation. + putOperationArrayLen = 3 + // otherOperationArrayLen is the length of the array that is used to encode a delete operation. + otherOperationArrayLen = 2 +) + +var ( + //nolint: gochecknoglobals + ops = map[operation.Type]string{ + operation.TypeGet: "get", + operation.TypePut: "put", + operation.TypeDelete: "delete", + } +) + +// getOperation returns the TKV operation string for an operation type. +func getOperation(opType operation.Type) (string, bool) { + result, ok := ops[opType] + return result, ok +} + +type tkvOperation struct { + operation.Operation +} + +// newTKVOperations returns a slice of TKV operations from a slice of operations. +func newTKVOperations(operations []operation.Operation) []tkvOperation { + tkvOperations := make([]tkvOperation, 0, len(operations)) + for _, o := range operations { + tkvOperations = append(tkvOperations, tkvOperation{o}) + } + + return tkvOperations +} + +func (o tkvOperation) EncodeMsgpack(encoder *msgpack.Encoder) error { + op, ok := getOperation(o.Type()) //nolint:varnamelen + if !ok { + return ErrUnknownOperation + } + + switch { + case o.Type() == operation.TypePut: + err := encoder.EncodeArrayLen(putOperationArrayLen) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode put operation array length", Err: err} + } + + err = encoder.EncodeString(op) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode put operation", Err: err} + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Key())) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode put operation key", Err: err} + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Value())) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode put operation value", Err: err} + } + default: + err := encoder.EncodeArrayLen(otherOperationArrayLen) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode operation array length", Err: err} + } + + err = encoder.EncodeString(op) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode operation", Err: err} + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(o.Key())) + if err != nil { + return FailedToEncodeTkvOperationError{Text: "encode operation key", Err: err} + } + } + + return nil +} diff --git a/driver/tkv/predicate.go b/driver/tkv/predicate.go new file mode 100644 index 0000000..44e1087 --- /dev/null +++ b/driver/tkv/predicate.go @@ -0,0 +1,114 @@ +package tkv + +import ( + "errors" + "fmt" + + "github.com/vmihailenco/msgpack/v5" + + "github.com/tarantool/go-storage/predicate" +) + +var ( + // ErrUnknownOperator is returned when the operator is unknown. + ErrUnknownOperator = errors.New("unknown operator") + // ErrUnknownTarget is returned when the target is unknown. + ErrUnknownTarget = errors.New("unknown target") + + _ msgpack.CustomEncoder = tkvPredicate{Predicate: nil} + + //nolint: gochecknoglobals + operators = map[predicate.Op]string{ + predicate.OpEqual: "==", + predicate.OpNotEqual: "!=", + predicate.OpGreater: ">", + predicate.OpLess: "<", + } + + //nolint: gochecknoglobals + targets = map[predicate.Target]string{ + predicate.TargetValue: "value", + predicate.TargetVersion: "mod_revision", + } +) + +// FailedToEncodeTkvPredicateError is returned when we failed to encode tkvPredicate. +type FailedToEncodeTkvPredicateError struct { + Text string + Err error +} + +// Error returns the error message. +func (e FailedToEncodeTkvPredicateError) Error() string { + return fmt.Sprintf("failed to encode tkvPredicate, %s: %s", e.Text, e.Err) +} + +// getOperator returns the TKV operator string for a predicate operation. +func getOperator(op predicate.Op) (string, bool) { + result, ok := operators[op] + return result, ok +} + +// getTarget returns the TKV target string for a predicate target. +func getTarget(target predicate.Target) (string, bool) { + result, ok := targets[target] + return result, ok +} + +type tkvPredicate struct { + predicate.Predicate +} + +func newTKVPredicates(predicates []predicate.Predicate) []tkvPredicate { + tkvPredicates := make([]tkvPredicate, 0, len(predicates)) + for _, p := range predicates { + tkvPredicates = append(tkvPredicates, tkvPredicate{p}) + } + + return tkvPredicates +} + +const ( + defaultPredicateArrayLen = 4 +) + +func (p tkvPredicate) EncodeMsgpack(encoder *msgpack.Encoder) error { + op, ok := getOperator(p.Operation()) //nolint:varnamelen + if !ok { + return ErrUnknownOperator + } + + target, ok := getTarget(p.Target()) + if !ok { + return ErrUnknownTarget + } + + err := encoder.EncodeArrayLen(defaultPredicateArrayLen) + if err != nil { + return FailedToEncodeTkvPredicateError{Text: "encode array length", Err: err} + } + + err = encoder.EncodeString(target) + if err != nil { + return FailedToEncodeTkvPredicateError{Text: "encode target", Err: err} + } + + err = encoder.EncodeString(op) + if err != nil { + return FailedToEncodeTkvPredicateError{Text: "encode operator", Err: err} + } + + err = encoder.Encode(p.Value()) + if err != nil { + return FailedToEncodeTkvPredicateError{Text: "encode value", Err: err} + } + + // We're deliberately using here conversion from byte to string, since MsgPack API doesn't have a way to + // write byte array as string. + err = encoder.EncodeString(string(p.Key())) + if err != nil { + return FailedToEncodeTkvPredicateError{Text: "encode key", Err: err} + } + + return nil +} diff --git a/driver/tkv/tkv.go b/driver/tkv/tkv.go new file mode 100644 index 0000000..1689e35 --- /dev/null +++ b/driver/tkv/tkv.go @@ -0,0 +1,105 @@ +// Package tkv provides a Tarantool Cartridge storage driver implementation. +// It enables using Tarantool as a distributed key-value storage backend. +package tkv + +import ( + "context" + "errors" + "fmt" + + "github.com/tarantool/go-tarantool/v2" + + "github.com/tarantool/go-storage/driver" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" + "github.com/tarantool/go-storage/watch" +) + +// DoerWatcher is an interface that combines tarantool.Doer and NewWatcher method. +// tarantool.Connection and pool.ConnectionAdapter implement this interface. +type DoerWatcher interface { + tarantool.Doer + + NewWatcher(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) +} + +// Driver is a Tarantool implementation of the storage driver interface. +// It uses TKV as the underlying key-value storage backend. +type Driver struct { + conn DoerWatcher // Tarantool connection pool. +} + +var ( + _ driver.Driver = &Driver{} //nolint:exhaustruct + + // ErrUnexpectedResponse is returned when the response from tarantool has unexpected format. + ErrUnexpectedResponse = errors.New("unexpected response from tarantool") +) + +// New creates a new Tarantool driver instance. +// It establishes connections to Tarantool instances using the provided addresses. +func New(doer DoerWatcher) *Driver { + return &Driver{conn: doer} +} + +// Execute executes a transactional operation with conditional logic. +// It processes predicates to determine whether to execute thenOps or elseOps. +func (d Driver) Execute( + ctx context.Context, + predicates []predicate.Predicate, + thenOps []operation.Operation, + elseOps []operation.Operation, +) (tx.Response, error) { + txnArg := newTxnRequest(predicates, thenOps, elseOps) + + req := tarantool.NewCallRequest("config.storage.txn"). + Args([]any{txnArg}).Context(ctx) + + var result []txnResponse + + switch err := d.conn.Do(req).GetTyped(&result); { + case err != nil: + return tx.Response{}, fmt.Errorf("failed to execute transaction: %w", err) + case len(result) != 1: + return tx.Response{}, fmt.Errorf("%w: expected 1 response, got %d", ErrUnexpectedResponse, len(result)) + } + + return result[0].asTxnResponse(), nil +} + +// Watch monitors changes to a specific key and returns a stream of events. +// It supports optional watch configuration through the opts parameter. +// To watch for config storage key "config.storage:" prefix should be used. +func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) (<-chan watch.Event, func(), error) { + rvChan := make(chan watch.Event, 1) + + watcher, err := d.conn.NewWatcher("config.storage:"+string(key), func(_ tarantool.WatchEvent) { + select { + case rvChan <- watch.Event{Prefix: key}: + default: + } + }) + if err != nil { + close(rvChan) + return nil, nil, fmt.Errorf("failed to create watcher: %w", err) + } + + isStopped := make(chan struct{}) + + go func() { + defer func() { + // When watcher.Unregister() will finish it's execution - means watcher won't call any more callbacks, + // that will write messages to rvChan, so we can close it. + watcher.Unregister() + close(rvChan) + }() + + select { + case <-ctx.Done(): + case <-isStopped: + } + }() + + return rvChan, func() { close(isStopped) }, nil +} diff --git a/driver/tkv/txn.go b/driver/tkv/txn.go new file mode 100644 index 0000000..6680ba3 --- /dev/null +++ b/driver/tkv/txn.go @@ -0,0 +1,99 @@ +package tkv + +import ( + "fmt" + + "github.com/vmihailenco/msgpack/v5" + + "github.com/tarantool/go-storage/kv" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" +) + +// FailedToDecodeTxnResponseDataSingleError is returned when we failed to decode txnResponseDataSingle. +type FailedToDecodeTxnResponseDataSingleError struct { + Text string + Err error +} + +// Error returns the error message. +func (e FailedToDecodeTxnResponseDataSingleError) Error() string { + return fmt.Sprintf("failed to decode txnResponseDataSingle, %s: %s", e.Text, e.Err) +} + +type txnResponseDataSingle struct { + Response []struct { + Path []byte `msgpack:"path"` + ModRevision int64 `msgpack:"mod_revision"` + Value []byte `msgpack:"value"` + } +} + +func (t *txnResponseDataSingle) DecodeMsgpack(decoder *msgpack.Decoder) error { + err := decoder.Decode(&t.Response) + if err != nil { + return FailedToDecodeTxnResponseDataSingleError{Text: "decode response", Err: err} + } + + return nil +} + +type txnResponseData struct { + IsSuccess bool `msgpack:"is_success"` + Responses []txnResponseDataSingle `msgpack:"responses"` +} + +type txnResponse struct { + Data txnResponseData `msgpack:"data"` + Revision int64 `msgpack:"revision"` +} + +func (r txnResponse) asTxnResponse() tx.Response { + results := make([]tx.RequestResponse, 0, len(r.Data.Responses)) + for _, val := range r.Data.Responses { + keyValues := make([]kv.KeyValue, 0, len(val.Response)) + for _, resp := range val.Response { + modRevision := resp.ModRevision + if modRevision == 0 && r.Revision != 0 { + modRevision = r.Revision + } + + keyValues = append(keyValues, kv.KeyValue{ + Key: resp.Path, + Value: resp.Value, + ModRevision: modRevision, + }) + } + + results = append(results, tx.RequestResponse{ + Values: keyValues, + }) + } + + return tx.Response{ + Succeeded: r.Data.IsSuccess, + Results: results, + } +} + +type txnRequest struct { + _msgpack struct{} `msgpack:",omitempty"` + + Predicates []tkvPredicate `msgpack:"predicates"` + OnSuccess []tkvOperation `msgpack:"on_success"` + OnFailure []tkvOperation `msgpack:"on_failure"` +} + +func newTxnRequest( + predicates []predicate.Predicate, + onSuccess []operation.Operation, + onFailure []operation.Operation, +) txnRequest { + return txnRequest{ + _msgpack: struct{}{}, + Predicates: newTKVPredicates(predicates), + OnSuccess: newTKVOperations(onSuccess), + OnFailure: newTKVOperations(onFailure), + } +} diff --git a/go.mod b/go.mod index e6dacfb..490d788 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/tarantool/go-option v1.0.0 github.com/tarantool/go-tarantool/v2 v2.4.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/etcd/client/v3 v3.6.5 ) @@ -21,7 +22,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect - github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.etcd.io/etcd/api/v3 v3.6.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect diff --git a/internal/mocks/driver_mock.go b/internal/mocks/driver_mock.go index bd6a2f1..4136fc6 100644 --- a/internal/mocks/driver_mock.go +++ b/internal/mocks/driver_mock.go @@ -27,7 +27,7 @@ type DriverMock struct { beforeExecuteCounter uint64 ExecuteMock mDriverMockExecute - funcWatch func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event) + funcWatch func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error) funcWatchOrigin string inspectFuncWatch func(ctx context.Context, key []byte, opts ...watch.Option) afterWatchCounter uint64 @@ -500,6 +500,8 @@ type DriverMockWatchParamPtrs struct { // DriverMockWatchResults contains results of the Driver.Watch type DriverMockWatchResults struct { ch1 <-chan watch.Event + f1 func() + err error } // DriverMockWatchOrigins contains origins of expectations of the Driver.Watch @@ -626,7 +628,7 @@ func (mmWatch *mDriverMockWatch) Inspect(f func(ctx context.Context, key []byte, } // Return sets up results that will be returned by Driver.Watch -func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event) *DriverMock { +func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event, f1 func(), err error) *DriverMock { if mmWatch.mock.funcWatch != nil { mmWatch.mock.t.Fatalf("DriverMock.Watch mock is already set by Set") } @@ -634,13 +636,13 @@ func (mmWatch *mDriverMockWatch) Return(ch1 <-chan watch.Event) *DriverMock { if mmWatch.defaultExpectation == nil { mmWatch.defaultExpectation = &DriverMockWatchExpectation{mock: mmWatch.mock} } - mmWatch.defaultExpectation.results = &DriverMockWatchResults{ch1} + mmWatch.defaultExpectation.results = &DriverMockWatchResults{ch1, f1, err} mmWatch.defaultExpectation.returnOrigin = minimock.CallerInfo(1) return mmWatch.mock } // Set uses given function f to mock the Driver.Watch method -func (mmWatch *mDriverMockWatch) Set(f func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event)) *DriverMock { +func (mmWatch *mDriverMockWatch) Set(f func(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error)) *DriverMock { if mmWatch.defaultExpectation != nil { mmWatch.mock.t.Fatalf("Default expectation is already set for the Driver.Watch method") } @@ -671,8 +673,8 @@ func (mmWatch *mDriverMockWatch) When(ctx context.Context, key []byte, opts ...w } // Then sets up Driver.Watch return parameters for the expectation previously defined by the When method -func (e *DriverMockWatchExpectation) Then(ch1 <-chan watch.Event) *DriverMock { - e.results = &DriverMockWatchResults{ch1} +func (e *DriverMockWatchExpectation) Then(ch1 <-chan watch.Event, f1 func(), err error) *DriverMock { + e.results = &DriverMockWatchResults{ch1, f1, err} return e.mock } @@ -698,7 +700,7 @@ func (mmWatch *mDriverMockWatch) invocationsDone() bool { } // Watch implements mm_driver.Driver -func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event) { +func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch.Option) (ch1 <-chan watch.Event, f1 func(), err error) { mm_atomic.AddUint64(&mmWatch.beforeWatchCounter, 1) defer mm_atomic.AddUint64(&mmWatch.afterWatchCounter, 1) @@ -718,7 +720,7 @@ func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch. for _, e := range mmWatch.WatchMock.expectations { if minimock.Equal(*e.params, mm_params) { mm_atomic.AddUint64(&e.Counter, 1) - return e.results.ch1 + return e.results.ch1, e.results.f1, e.results.err } } @@ -755,7 +757,7 @@ func (mmWatch *DriverMock) Watch(ctx context.Context, key []byte, opts ...watch. if mm_results == nil { mmWatch.t.Fatal("No results are set for the DriverMock.Watch") } - return (*mm_results).ch1 + return (*mm_results).ch1, (*mm_results).f1, (*mm_results).err } if mmWatch.funcWatch != nil { return mmWatch.funcWatch(ctx, key, opts...) diff --git a/kv/kv.go b/kv/kv.go index 85f6ed9..178d9a2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -10,10 +10,6 @@ type KeyValue struct { // Value is the serialized representation of the value. Value []byte - // CreateRevision is the revision number when this key was created. - CreateRevision int64 // ModRevision is the revision number of the last modification to this key. ModRevision int64 - // Version is the version counter for key modifications. - Version int64 } diff --git a/tx/requestresponse.go b/tx/requestresponse.go index 3c25f0b..ee3c04d 100644 --- a/tx/requestresponse.go +++ b/tx/requestresponse.go @@ -4,12 +4,6 @@ import "github.com/tarantool/go-storage/kv" // RequestResponse represents the response for an individual transaction operation. type RequestResponse struct { - // KeyValue contains the result data for Get operations. - KeyValue *kv.KeyValue - - // Success indicates whether the operation was successful. - Success bool - - // Error contains any error that occurred during the operation. - Error error + // Values contains the result data for Get operations. + Values []kv.KeyValue } diff --git a/watch/event.go b/watch/event.go index 97a50a6..762fb3b 100644 --- a/watch/event.go +++ b/watch/event.go @@ -2,51 +2,8 @@ // It enables real-time monitoring of key changes through event streams. package watch -import ( - "github.com/tarantool/go-storage/kv" -) - -// EventType represents the type of watch event. -type EventType int - -const ( - // EventPut indicates a key was created or updated. - EventPut EventType = iota - // EventDelete indicates a key was deleted. - EventDelete -) - -func (t EventType) String() string { - switch t { - case EventPut: - return "Put" - case EventDelete: - return "Delete" - default: - return "Unknown" - } -} - // Event represents a change notification from the watch stream. type Event struct { - // Type indicates whether this is a put or delete event. - Type EventType - // Key is the key that was changed. - Key []byte - // Value contains the new value for put events, nil for delete events. - Value []byte - // Rev is the revision number of the event. - Rev int64 -} - -// AsKeyValue converts the Event to a KeyValue structure. -// For delete events, the Value field will be nil. -func (e *Event) AsKeyValue() kv.KeyValue { - return kv.KeyValue{ - Key: e.Key, - Value: e.Value, - CreateRevision: 0, - ModRevision: 0, - Version: 0, - } + // Prefix indicates key/prefix of what was changed. + Prefix []byte }