Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/golang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ jobs:
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.gover }}
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.13
- name: Start memcached servers
run: ./misc/memcached_server start
- name: Start slow memcached server
run: ./misc/slow_server.sh
- name: Run gotest
run: |
if [[ ${{ matrix.compiler }} = "gcc" ]]; then export CC=gcc CXX=g++; fi
Expand Down
4 changes: 2 additions & 2 deletions libmc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
__file__ as _libmc_so_file
)

__VERSION__ = "1.4.14"
__version__ = "1.4.14"
__VERSION__ = "1.4.15"
__version__ = "1.4.15"
__author__ = "mckelvin"
__email__ = "mckelvin@users.noreply.github.com"
__date__ = "Fri Jun 7 06:16:00 2024 +0800"
Expand Down
7 changes: 7 additions & 0 deletions misc/slow_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh
set -ex

python tests/shabby/slow_memcached_server.py &
pid=$!
echo "pid of slow memcached server: $pid"

96 changes: 49 additions & 47 deletions src/golibmc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type Client struct {
connectTimeout C.int
pollTimeout C.int
retryTimeout C.int
maxRetries C.int // maximum amount of retries. maxRetries <= 0 means unlimited. default is -1.
maxRetries C.int // maximum amount of retries. maxRetries <= 0 means unlimited. default is -1.

lk sync.Mutex // protects following fields
freeConns []*conn
Expand Down Expand Up @@ -649,6 +649,7 @@ func (cn *conn) configHashFunction(val int) {
}

// ConfigTimeout Keys:
//
// PollTimeout
// ConnectTimeout
// RetryTimeout
Expand All @@ -670,14 +671,14 @@ func (client *Client) ConfigTimeout(cCfgKey C.config_options_t, timeout time.Dur
// GetServerAddressByKey will return the address of the memcached
// server where a key is stored (assume all memcached servers are
// accessiable and wonot establish any connections. )
func (client *Client) GetServerAddressByKey(key string) string {
func (client *Client) GetServerAddressByKey(ctx context.Context, key string) string {
rawKey := client.addPrefix(key)

cKey := C.CString(rawKey)
defer C.free(unsafe.Pointer(cKey))
cKeyLen := C.size_t(len(rawKey))

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return ""
}
Expand All @@ -692,13 +693,13 @@ func (client *Client) GetServerAddressByKey(key string) string {
// server where a key is stored. (Will try to connect to
// corresponding memcached server and may failover accordingly. )
// if no server is avaiable, an empty string will be returned.
func (client *Client) GetRealtimeServerAddressByKey(key string) string {
func (client *Client) GetRealtimeServerAddressByKey(ctx context.Context, key string) string {
rawKey := client.addPrefix(key)

cKey := C.CString(rawKey)
defer C.free(unsafe.Pointer(cKey))
cKeyLen := C.size_t(len(rawKey))
cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return ""
}
Expand Down Expand Up @@ -732,7 +733,7 @@ func (client *Client) addPrefix(key string) string {
return strings.Join([]string{client.prefix, key}, "")
}

func (client *Client) store(cmd string, item *Item) error {
func (client *Client) store(ctx context.Context, cmd string, item *Item) error {
key := client.addPrefix(item.Key)

cKey := C.CString(key)
Expand All @@ -750,7 +751,7 @@ func (client *Client) store(cmd string, item *Item) error {

var errCode C.err_code_t

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -816,32 +817,32 @@ func (client *Client) store(cmd string, item *Item) error {
}

// Add is a storage command, return without error only when the key is empty
func (client *Client) Add(item *Item) error {
return client.store("add", item)
func (client *Client) Add(ctx context.Context, item *Item) error {
return client.store(ctx, "add", item)
}

// Replace is a storage command, return without error only when the key is not empty
func (client *Client) Replace(item *Item) error {
return client.store("replace", item)
func (client *Client) Replace(ctx context.Context, item *Item) error {
return client.store(ctx, "replace", item)
}

// Prepend value to an existed key
func (client *Client) Prepend(item *Item) error {
return client.store("prepend", item)
func (client *Client) Prepend(ctx context.Context, item *Item) error {
return client.store(ctx, "prepend", item)
}

// Append value to an existed key
func (client *Client) Append(item *Item) error {
return client.store("append", item)
func (client *Client) Append(ctx context.Context, item *Item) error {
return client.store(ctx, "append", item)
}

// Set value to a key
func (client *Client) Set(item *Item) error {
return client.store("set", item)
func (client *Client) Set(ctx context.Context, item *Item) error {
return client.store(ctx, "set", item)
}

// SetMulti will set multi values at once
func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) {
func (client *Client) SetMulti(ctx context.Context, items []*Item) (failedKeys []string, err error) {
nItems := len(items)
cKeys := make([]*C.char, nItems)
cKeyLens := make([]C.size_t, nItems)
Expand Down Expand Up @@ -877,7 +878,7 @@ func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) {
var results **C.message_result_t
var n C.size_t

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return []string{}, err
}
Expand Down Expand Up @@ -934,12 +935,12 @@ func (client *Client) SetMulti(items []*Item) (failedKeys []string, err error) {
}

// Cas is short for Compare And Swap
func (client *Client) Cas(item *Item) error {
return client.store("cas", item)
func (client *Client) Cas(ctx context.Context, item *Item) error {
return client.store(ctx, "cas", item)
}

// Delete a key
func (client *Client) Delete(key string) error {
func (client *Client) Delete(ctx context.Context, key string) error {
rawKey := client.addPrefix(key)

cKey := C.CString(rawKey)
Expand All @@ -950,7 +951,7 @@ func (client *Client) Delete(key string) error {
var rst **C.message_result_t
var n C.size_t

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -981,7 +982,7 @@ func (client *Client) Delete(key string) error {
}

// DeleteMulti will delete multi keys at once
func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error) {
func (client *Client) DeleteMulti(ctx context.Context, keys []string) (failedKeys []string, err error) {
var rawKeys []string
if len(client.prefix) == 0 {
rawKeys = keys
Expand Down Expand Up @@ -1009,7 +1010,7 @@ func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error
cKeyLen := C.size_t(len(key))
cKeyLens[i] = cKeyLen
}
cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return []string{}, err
}
Expand Down Expand Up @@ -1065,8 +1066,8 @@ func (client *Client) DeleteMulti(keys []string) (failedKeys []string, err error
return
}

func (client *Client) getOrGets(cmd string, key string) (item *Item, err error) {
cn, err := client.conn(context.Background())
func (client *Client) getOrGets(ctx context.Context, cmd string, key string) (item *Item, err error) {
cn, err := client.conn(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1119,17 +1120,17 @@ func (client *Client) getOrGets(cmd string, key string) (item *Item, err error)
}

// Get is a retrieval command. It will return Item or nil
func (client *Client) Get(key string) (*Item, error) {
return client.getOrGets("get", key)
func (client *Client) Get(ctx context.Context, key string) (*Item, error) {
return client.getOrGets(ctx, "get", key)
}

// Gets is a retrieval command. It will return Item(with casid) or nil
func (client *Client) Gets(key string) (*Item, error) {
return client.getOrGets("gets", key)
func (client *Client) Gets(ctx context.Context, key string) (*Item, error) {
return client.getOrGets(ctx, "gets", key)
}

// GetMulti will return a map of multi values
func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) {
func (client *Client) GetMulti(ctx context.Context, keys []string) (rv map[string]*Item, err error) {
nKeys := len(keys)
var rawKeys []string
if len(client.prefix) == 0 {
Expand All @@ -1156,7 +1157,7 @@ func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) {
var rst **C.retrieval_result_t
var n C.size_t

cn, err1 := client.conn(context.Background())
cn, err1 := client.conn(ctx)
if err1 != nil {
err = err1
return
Expand Down Expand Up @@ -1200,7 +1201,7 @@ func (client *Client) GetMulti(keys []string) (rv map[string]*Item, err error) {
}

// Touch command
func (client *Client) Touch(key string, expiration int64) error {
func (client *Client) Touch(ctx context.Context, key string, expiration int64) error {
rawKey := client.addPrefix(key)

cKey := C.CString(rawKey)
Expand All @@ -1212,7 +1213,7 @@ func (client *Client) Touch(key string, expiration int64) error {
var rst **C.message_result_t
var n C.size_t

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1242,7 +1243,7 @@ func (client *Client) Touch(key string, expiration int64) error {
return networkError(errorMessage(errCode))
}

func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64, error) {
func (client *Client) incrOrDecr(ctx context.Context, cmd string, key string, delta uint64) (uint64, error) {
rawKey := client.addPrefix(key)
cKey := C.CString(rawKey)
defer C.free(unsafe.Pointer(cKey))
Expand All @@ -1255,7 +1256,7 @@ func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64,

var errCode C.err_code_t

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1296,22 +1297,22 @@ func (client *Client) incrOrDecr(cmd string, key string, delta uint64) (uint64,
}

// Incr will increase the value in key by delta
func (client *Client) Incr(key string, delta uint64) (uint64, error) {
return client.incrOrDecr("incr", key, delta)
func (client *Client) Incr(ctx context.Context, key string, delta uint64) (uint64, error) {
return client.incrOrDecr(ctx, "incr", key, delta)
}

// Decr will decrease the value in key by delta
func (client *Client) Decr(key string, delta uint64) (uint64, error) {
return client.incrOrDecr("decr", key, delta)
func (client *Client) Decr(ctx context.Context, key string, delta uint64) (uint64, error) {
return client.incrOrDecr(ctx, "decr", key, delta)
}

// Version will return a map reflecting versions of each memcached server
func (client *Client) Version() (map[string]string, error) {
func (client *Client) Version(ctx context.Context) (map[string]string, error) {
var rst *C.broadcast_result_t
var n C.size_t
rv := make(map[string]string)

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return rv, err
}
Expand Down Expand Up @@ -1342,13 +1343,13 @@ func (client *Client) Version() (map[string]string, error) {
}

// Stats will return a map reflecting stats map of each memcached server
func (client *Client) Stats() (map[string](map[string]string), error) {
func (client *Client) Stats(ctx context.Context) (map[string](map[string]string), error) {
var rst *C.broadcast_result_t
var n C.size_t

rv := make(map[string](map[string]string))

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
if err != nil {
return rv, err
}
Expand Down Expand Up @@ -1401,12 +1402,12 @@ func (client *Client) ToggleFlushAllFeature(enabled bool) {
// FlushAll will flush all memcached servers
// You must call ToggleFlushAllFeature(True) first to
// enable this feature.
func (client *Client) FlushAll() ([]string, error) {
func (client *Client) FlushAll(ctx context.Context) ([]string, error) {
var rst *C.broadcast_result_t
var n C.size_t
flushedHosts := []string{}

cn, err := client.conn(context.Background())
cn, err := client.conn(ctx)
C.client_toggle_flush_all_feature(
cn._imp, C.bool(client.flushAllEnabled),
)
Expand Down Expand Up @@ -1484,6 +1485,7 @@ func (cn *conn) expired(timeout time.Duration) bool {
}

// FIXME(Harry): We are using quit everywhere when the conn is failed,
//
// I think we can just close socket instead of send quit, it should save some time.
// So don't mix up Quit and Close, implement a Close function plz.
func (cn *conn) quit() error {
Expand Down
Loading