Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions tests/dragonfly/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from time import sleep
from typing import Dict, List, Union

import pymemcache
import pytest
import pytest_asyncio
import redis
Expand Down Expand Up @@ -314,15 +313,6 @@ def port_picker():
yield PortPicker()


@pytest.fixture(scope="function")
def memcached_client(df_server: DflyInstance):
client = pymemcache.Client(f"127.0.0.1:{df_server.mc_port}", default_noreply=False)

yield client

client.flush_all()


@pytest.fixture(scope="session")
def with_tls_ca_cert_args(tmp_dir):
ca_key = os.path.join(tmp_dir, "ca-key.pem")
Expand Down
22 changes: 0 additions & 22 deletions tests/dragonfly/list_family_test.py

This file was deleted.

323 changes: 149 additions & 174 deletions tests/dragonfly/pymemcached_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,180 +3,161 @@
import socket
import ssl
import time
import pytest

import pymemcache
from pymemcache.client.base import Client as MCClient

from . import dfly_args
from .instance import DflyInstance

DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4}


# Generic basic tests


@dfly_args(DEFAULT_ARGS)
def test_basic(memcached_client: MCClient):
assert not memcached_client.default_noreply

# set -> replace -> add -> get
assert memcached_client.set("key1", "value1")
assert memcached_client.replace("key1", "value2")
assert not memcached_client.add("key1", "value3")
assert memcached_client.get("key1") == b"value2"

# add -> get
assert memcached_client.add("key2", "value1")
assert memcached_client.get("key2") == b"value1"

# delete
assert memcached_client.delete("key1")
assert not memcached_client.delete("key3")
assert memcached_client.get("key1") is None

# prepend append
assert memcached_client.set("key4", "B")
assert memcached_client.prepend("key4", "A")
assert memcached_client.append("key4", "C")
assert memcached_client.get("key4") == b"ABC"

# incr
memcached_client.set("key5", 0)
assert memcached_client.incr("key5", 1) == 1
assert memcached_client.incr("key5", 1) == 2
assert memcached_client.decr("key5", 1) == 1

assert memcached_client.gets("key5") == (b"1", b"0")


# Noreply (and pipeline) tests


@dfly_args(DEFAULT_ARGS)
async def test_noreply_pipeline(df_server: DflyInstance, memcached_client: MCClient):
"""
With the noreply option the python client doesn't wait for replies,
so all the commands are pipelined. Assert pipelines work correctly and the
succeeding regular command receives a reply (it should join the pipeline as last).
"""

client = df_server.client()
for attempts in range(2):
keys = [f"k{i}" for i in range(1000)]
values = [f"d{i}" for i in range(len(keys))]

for k, v in zip(keys, values):
memcached_client.set(k, v, noreply=True)

# quick follow up before the pipeline finishes
assert memcached_client.get("k10") == b"d10"
# check all commands were executed
assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)}

info = await client.info()
if info["total_pipelined_commands"] > 100:
return
logging.warning(
f"Have not identified pipelining at attempt {attempts} Info: \n" + str(info)
)
await client.flushall()

assert False, "Pipelining not detected"


@dfly_args(DEFAULT_ARGS)
def test_noreply_alternating(memcached_client: MCClient):
"""
Assert alternating noreply works correctly, will cause many dispatch queue emptyings.
"""
for i in range(200):
if i % 2 == 0:
memcached_client.set(f"k{i}", "D1", noreply=True)
memcached_client.set(f"k{i}", "D2", noreply=True)
memcached_client.set(f"k{i}", "D3", noreply=True)
assert memcached_client.add(f"k{i}", "DX", noreply=False) == (i % 2 != 0)


# Raw connection tests


@dfly_args(DEFAULT_ARGS)
def test_length_in_set_command(df_server: DflyInstance, memcached_client: MCClient):
"""
Test parser correctly reads value based on length and complains about bad chunks
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", int(df_server["memcached_port"])))

cases = [b"NOTFOUR", b"FOUR", b"F4\r\n", b"\r\n\r\n"]

# TODO: \r\n hangs

for case in cases:
print("case", case)
client.sendall(b"set foo 0 0 4\r\n" + case + b"\r\n")
response = client.recv(256)
if len(case) == 4:
assert response == b"STORED\r\n"
else:
assert response == b"CLIENT_ERROR bad data chunk\r\n"

client.close()


# Auxiliary tests


@dfly_args(DEFAULT_ARGS)
def test_large_request(memcached_client):
assert memcached_client.set(b"key1", b"d" * 4096, noreply=False)
assert memcached_client.set(b"key2", b"d" * 4096 * 2, noreply=False)


@dfly_args(DEFAULT_ARGS)
def test_version(memcached_client: MCClient):
"""
php-memcached client expects version to be in the format of "n.n.n", so we return 1.5.0 emulating an old memcached server.
Our real version is being returned in the stats command.
Also verified manually that php client parses correctly the version string that ends with "DF".
"""
assert b"1.6.0 DF" == memcached_client.version()
stats = memcached_client.stats()
version = stats[b"version"].decode("utf-8")
assert version.startswith("v") or version == "dev"


@dfly_args(DEFAULT_ARGS)
def test_flags(memcached_client: MCClient):
for i in range(1, 20):
flags = random.randrange(50, 1000)
memcached_client.set("a", "real-value", flags=flags, noreply=True)

res = memcached_client.raw_command("get a", "END\r\n").split()
# workaround sometimes memcached_client.raw_command returns empty str
if len(res) > 0:
assert res[2].decode() == str(flags)


@dfly_args(DEFAULT_ARGS)
def test_expiration(memcached_client: MCClient):
assert not memcached_client.default_noreply

assert memcached_client.set("key1", "value1", 2)
assert memcached_client.set("key2", "value2", int(time.time()) + 2)
assert memcached_client.set("key3", "value3", int(time.time()) + 200)
assert memcached_client.get("key1") == b"value1"
assert memcached_client.get("key2") == b"value2"
assert memcached_client.get("key3") == b"value3"
assert memcached_client.set("key3", "value3", int(time.time()) - 200)
assert memcached_client.get("key3") == None
time.sleep(2)
assert memcached_client.get("key1") == None
assert memcached_client.get("key2") == None
assert memcached_client.get("key3") == None


@dfly_args(DEFAULT_ARGS)
@dfly_args({"memcached_port": 11211, "proactor_threads": 4})
class TestBasic:
@pytest.fixture(scope="function")
def memcached_client(self, df_server: DflyInstance):
client = pymemcache.Client(f"127.0.0.1:{df_server.mc_port}", default_noreply=False)
yield client
client.flush_all()

def test_getset(self, memcached_client: MCClient):
assert not memcached_client.default_noreply

# set -> replace -> add -> get
assert memcached_client.set("key1", "value1")
assert memcached_client.replace("key1", "value2")
assert not memcached_client.add("key1", "value3")
assert memcached_client.get("key1") == b"value2"

# add -> get
assert memcached_client.add("key2", "value1")
assert memcached_client.get("key2") == b"value1"

# delete
assert memcached_client.delete("key1")
assert not memcached_client.delete("key3")
assert memcached_client.get("key1") is None

# prepend append
assert memcached_client.set("key4", "B")
assert memcached_client.prepend("key4", "A")
assert memcached_client.append("key4", "C")
assert memcached_client.get("key4") == b"ABC"

# incr
memcached_client.set("key5", 0)
assert memcached_client.incr("key5", 1) == 1
assert memcached_client.incr("key5", 1) == 2
assert memcached_client.decr("key5", 1) == 1

assert memcached_client.gets("key5") == (b"1", b"0")

async def test_noreply_pipeline(self, df_server: DflyInstance, memcached_client: MCClient):
"""
With the noreply option the python client doesn't wait for replies,
so all the commands are pipelined. Assert pipelines work correctly and the
succeeding regular command receives a reply (it should join the pipeline as last).
"""

client = df_server.client()
for attempts in range(2):
keys = [f"k{i}" for i in range(1000)]
values = [f"d{i}" for i in range(len(keys))]

for k, v in zip(keys, values):
memcached_client.set(k, v, noreply=True)

# quick follow up before the pipeline finishes
assert memcached_client.get("k10") == b"d10"
# check all commands were executed
assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)}

info = await client.info()
if info["total_pipelined_commands"] > 100:
return
logging.warning(
f"Have not identified pipelining at attempt {attempts} Info: \n" + str(info)
)
await client.flushall()

assert False, "Pipelining not detected"

def test_noreply_alternating(self, memcached_client: MCClient):
"""
Assert alternating noreply works correctly, will cause many dispatch queue emptyings.
"""
for i in range(200):
if i % 2 == 0:
memcached_client.set(f"k{i}", "D1", noreply=True)
memcached_client.set(f"k{i}", "D2", noreply=True)
memcached_client.set(f"k{i}", "D3", noreply=True)
assert memcached_client.add(f"k{i}", "DX", noreply=False) == (i % 2 != 0)

def test_length_in_set_command(self, df_server: DflyInstance, memcached_client: MCClient):
"""
Test parser correctly reads value based on length and complains about bad chunks
"""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", int(df_server["memcached_port"])))

cases = [b"NOTFOUR", b"FOUR", b"F4\r\n", b"\r\n\r\n"]

# TODO: \r\n hangs

for case in cases:
print("case", case)
client.sendall(b"set foo 0 0 4\r\n" + case + b"\r\n")
response = client.recv(256)
if len(case) == 4:
assert response == b"STORED\r\n"
else:
assert response == b"CLIENT_ERROR bad data chunk\r\n"

client.close()

def test_large_request(self, memcached_client):
assert memcached_client.set(b"key1", b"d" * 4096, noreply=False)
assert memcached_client.set(b"key2", b"d" * 4096 * 2, noreply=False)

def test_version(self, memcached_client: MCClient):
"""
php-memcached client expects version to be in the format of "n.n.n", so we return 1.5.0 emulating an old memcached server.
Our real version is being returned in the stats command.
Also verified manually that php client parses correctly the version string that ends with "DF".
"""
assert b"1.6.0 DF" == memcached_client.version()
stats = memcached_client.stats()
version = stats[b"version"].decode("utf-8")
assert version.startswith("v") or version == "dev"

def test_flags(self, memcached_client: MCClient):
for i in range(1, 20):
flags = random.randrange(50, 1000)
memcached_client.set("a", "real-value", flags=flags, noreply=True)

res = memcached_client.raw_command("get a", "END\r\n").split()
# workaround sometimes memcached_client.raw_command returns empty str
if len(res) > 0:
assert res[2].decode() == str(flags)

def test_expiration(self, memcached_client: MCClient):
assert not memcached_client.default_noreply

assert memcached_client.set("key1", "value1", 2)
assert memcached_client.set("key2", "value2", int(time.time()) + 2)
assert memcached_client.set("key3", "value3", int(time.time()) + 200)
assert memcached_client.get("key1") == b"value1"
assert memcached_client.get("key2") == b"value2"
assert memcached_client.get("key3") == b"value3"
assert memcached_client.set("key3", "value3", int(time.time()) - 200)
assert memcached_client.get("key3") == None
time.sleep(2)
assert memcached_client.get("key1") == None
assert memcached_client.get("key2") == None
assert memcached_client.get("key3") == None


@dfly_args({"memcached_port": 11211, "proactor_threads": 4})
def test_memcached_tls_no_requirepass(df_factory, with_tls_server_args, with_tls_ca_cert_args):
"""
Test for issue #5084: ability to use TLS for Memcached without requirepass.
Expand All @@ -185,16 +166,10 @@ def test_memcached_tls_no_requirepass(df_factory, with_tls_server_args, with_tls
does not support password authentication. This test verifies that we can start
the server with TLS enabled but without specifying requirepass and with the Memcached port.
"""
# Create arguments for TLS without specifying requirepass
server_args = {**DEFAULT_ARGS, **with_tls_server_args, "requirepass": "test_password"}

# Create and start the server - it should not crash
server = df_factory.create(**server_args)
server = df_factory.create(**{**with_tls_server_args, "requirepass": "test_password"})
server.start()

# Give the server time to start
time.sleep(1)

# Create SSL context for client
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(with_tls_ca_cert_args["ca_cert"])
Expand Down
Loading
Loading