Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 230 additions & 7 deletions driver/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package etcd

import (
"context"
"errors"
"fmt"

etcd "go.etcd.io/etcd/client/v3"

"github.com/tarantool/go-storage/driver"
"github.com/tarantool/go-storage/kv"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
"github.com/tarantool/go-storage/tx"
Expand All @@ -23,6 +25,14 @@ type Driver struct {

var (
_ driver.Driver = &Driver{} //nolint:exhaustruct

// Static error definitions to avoid dynamic errors.
errUnsupportedPredicateTarget = errors.New("unsupported predicate target")
errValuePredicateRequiresBytes = errors.New("value predicate requires []byte value")
errUnsupportedValueOperation = errors.New("unsupported operation for value predicate")
errVersionPredicateRequiresInt = errors.New("version predicate requires int64 value")
errUnsupportedVersionOperation = errors.New("unsupported operation for version predicate")
errUnsupportedOperationType = errors.New("unsupported operation type")
)

// New creates a new etcd driver instance.
Expand Down Expand Up @@ -59,16 +69,229 @@ func New(ctx context.Context, endpoints []string) (*Driver, error) {
// Execute executes a transactional operation with conditional logic.
// It processes predicates to determine whether to execute thenOps or elseOps.
func (d Driver) Execute(
_ context.Context,
_ []predicate.Predicate,
_ []operation.Operation,
_ []operation.Operation,
ctx context.Context,
predicates []predicate.Predicate,
thenOps []operation.Operation,
elseOps []operation.Operation,
) (tx.Response, error) {
panic("implement me")
txn := d.client.Txn(ctx)

for _, p := range predicates {
cmp, err := predicateToCmp(p)
if err != nil {
return tx.Response{}, fmt.Errorf("failed to convert predicate: %w", err)
}

txn = txn.If(cmp)
}

thenEtcdOps, err := operationsToEtcdOps(thenOps)
if err != nil {
return tx.Response{}, fmt.Errorf("failed to convert then operations: %w", err)
}

txn = txn.Then(thenEtcdOps...)

elseEtcdOps, err := operationsToEtcdOps(elseOps)
if err != nil {
return tx.Response{}, fmt.Errorf("failed to convert else operations: %w", err)
}

txn = txn.Else(elseEtcdOps...)

resp, err := txn.Commit()
if err != nil {
return tx.Response{}, fmt.Errorf("transaction failed: %w", err)
}

return etcdResponseToTxResponse(resp), nil
}

// Watch monitors changes to a specific key and returns a stream of events.
// It supports optional watch configuration through the opts parameter.
func (d Driver) Watch(_ context.Context, _ []byte, _ ...watch.Option) <-chan watch.Event {
panic("implement me")
func (d Driver) Watch(ctx context.Context, key []byte, _ ...watch.Option) <-chan watch.Event {
// TODO: Apply watch options when implemented.
eventCh := make(chan watch.Event, 100) //nolint:mnd

go func() {
defer close(eventCh)

watchChan := d.client.Watch(ctx, string(key))

for {
select {
case <-ctx.Done():
return
case watchResp, ok := <-watchChan:
if !ok {
return
}

if watchResp.Err() != nil {
continue
}

for _, etcdEvent := range watchResp.Events {
event := etcdEventToWatchEvent(etcdEvent)
select {
case eventCh <- event:
case <-ctx.Done():
return
}
}
}
}
}()

return eventCh
}

// predicateToCmp converts a predicate to an etcd comparison.
func predicateToCmp(pred predicate.Predicate) (etcd.Cmp, error) {
switch pred.Target() {
case predicate.TargetValue:
return valuePredicateToCmp(pred)
case predicate.TargetVersion:
return versionPredicateToCmp(pred)
default:
return etcd.Cmp{}, fmt.Errorf("%w: %v", errUnsupportedPredicateTarget, pred.Target())
}
}

// valuePredicateToCmp converts a value predicate to an etcd comparison.
func valuePredicateToCmp(pred predicate.Predicate) (etcd.Cmp, error) {
key := string(pred.Key())
value, ok := pred.Value().([]byte)

if !ok {
return etcd.Cmp{}, errValuePredicateRequiresBytes
}

switch pred.Operation() {
case predicate.OpEqual:
return etcd.Compare(etcd.Value(key), "=", value), nil
case predicate.OpNotEqual:
return etcd.Compare(etcd.Value(key), "!=", value), nil
case predicate.OpGreater:
return etcd.Cmp{}, fmt.Errorf("%w: %v", errUnsupportedValueOperation, pred.Operation())
case predicate.OpLess:
return etcd.Cmp{}, fmt.Errorf("%w: %v", errUnsupportedValueOperation, pred.Operation())
default:
return etcd.Cmp{}, fmt.Errorf("%w: %v", errUnsupportedValueOperation, pred.Operation())
}
}

// versionPredicateToCmp converts a version predicate to an etcd comparison.
func versionPredicateToCmp(pred predicate.Predicate) (etcd.Cmp, error) {
key := string(pred.Key())
version, ok := pred.Value().(int64)

if !ok {
return etcd.Cmp{}, errVersionPredicateRequiresInt
}

switch pred.Operation() {
case predicate.OpEqual:
return etcd.Compare(etcd.Version(key), "=", version), nil
case predicate.OpNotEqual:
return etcd.Compare(etcd.Version(key), "!=", version), nil
case predicate.OpGreater:
return etcd.Compare(etcd.Version(key), ">", version), nil
case predicate.OpLess:
return etcd.Compare(etcd.Version(key), "<", version), nil
default:
return etcd.Cmp{}, fmt.Errorf("%w: %v", errUnsupportedVersionOperation, pred.Operation())
}
}

// operationsToEtcdOps converts operations to etcd operations.
func operationsToEtcdOps(ops []operation.Operation) ([]etcd.Op, error) {
etcdOps := make([]etcd.Op, 0, len(ops))
for _, op := range ops {
etcdOp, err := operationToEtcdOp(op)
if err != nil {
return nil, err
}

etcdOps = append(etcdOps, etcdOp)
}

return etcdOps, nil
}

// operationToEtcdOp converts an operation to an etcd operation.
func operationToEtcdOp(storageOperation operation.Operation) (etcd.Op, error) {
key := string(storageOperation.Key())

switch storageOperation.Type() {
case operation.TypeGet:
return etcd.OpGet(key), nil
case operation.TypePut:
return etcd.OpPut(key, string(storageOperation.Value())), nil
case operation.TypeDelete:
return etcd.OpDelete(key), nil
default:
return etcd.Op{}, fmt.Errorf("%w: %v", errUnsupportedOperationType, storageOperation.Type())
}
}

// etcdResponseToTxResponse converts an etcd transaction response to tx.Response.
func etcdResponseToTxResponse(resp *etcd.TxnResponse) tx.Response {
results := make([]tx.RequestResponse, 0, len(resp.Responses))

for _, etcdResp := range resp.Responses {
reqResp := tx.RequestResponse{
Success: true,
KeyValue: nil,
Error: nil,
}

switch {
case etcdResp.GetResponseRange() != nil:
getResp := etcdResp.GetResponseRange()
if len(getResp.Kvs) > 0 {
etcdKv := getResp.Kvs[0]

reqResp.KeyValue = &kv.KeyValue{
Key: etcdKv.Key,
Value: etcdKv.Value,
CreateRevision: etcdKv.CreateRevision,
ModRevision: etcdKv.ModRevision,
Version: etcdKv.Version,
}
}
case etcdResp.GetResponsePut() != nil:
// Put operations don't return data.
case etcdResp.GetResponseDeleteRange() != nil:
// Delete operations don't return data.
}

results = append(results, reqResp)
}

return tx.Response{
Succeeded: resp.Succeeded,
Results: results,
}
}

// etcdEventToWatchEvent converts an etcd event to a watch event.
func etcdEventToWatchEvent(etcdEvent *etcd.Event) watch.Event {
event := watch.Event{
Type: watch.EventType(0), // Will be set below.
Key: etcdEvent.Kv.Key,
Value: nil, // Will be set below.
Rev: etcdEvent.Kv.ModRevision,
}

switch etcdEvent.Type {
case etcd.EventTypePut:
event.Type = watch.EventPut
event.Value = etcdEvent.Kv.Value
case etcd.EventTypeDelete:
event.Type = watch.EventDelete
event.Value = nil
}

return event
}
Loading