Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions key-value/key-value-aio/tests/stores/wrappers/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,70 @@
assert key_str_one != key_str_two
assert key_str_one != key_str_three
assert key_str_two != key_str_three


def test_fernet_with_source_material_and_salt(memory_store: MemoryStore):
"""Test that FernetEncryptionWrapper works with source_material and salt."""
wrapper = FernetEncryptionWrapper(
key_value=memory_store,
source_material="my-secret-key",
salt="my-unique-salt",
)
assert wrapper is not None


def test_fernet_cannot_provide_fernet_with_source_material(memory_store: MemoryStore):
"""Test that providing both fernet and source_material raises ValueError."""
fernet = Fernet(key=Fernet.generate_key())
with pytest.raises(ValueError, match="Cannot provide fernet together with source_material or salt"):
FernetEncryptionWrapper(

Check failure on line 194 in key-value/key-value-aio/tests/stores/wrappers/test_encryption.py

View workflow job for this annotation

GitHub Actions / static_analysis (key-value/key-value-aio)

No overloads for "__init__" match the provided arguments   Argument types: (MemoryStore, Fernet, Literal['test']) (reportCallIssue)
key_value=memory_store,
fernet=fernet,
source_material="test",
)


def test_fernet_cannot_provide_fernet_with_salt(memory_store: MemoryStore):
"""Test that providing both fernet and salt raises ValueError."""
fernet = Fernet(key=Fernet.generate_key())
with pytest.raises(ValueError, match="Cannot provide fernet together with source_material or salt"):
FernetEncryptionWrapper(

Check failure on line 205 in key-value/key-value-aio/tests/stores/wrappers/test_encryption.py

View workflow job for this annotation

GitHub Actions / static_analysis (key-value/key-value-aio)

No overloads for "__init__" match the provided arguments   Argument types: (MemoryStore, Fernet, Literal['test']) (reportCallIssue)
key_value=memory_store,
fernet=fernet,
salt="test",
)
Comment on lines +190 to +209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add type: ignore comments for intentional overload violations.

These tests intentionally pass invalid argument combinations to verify error handling. The type checker correctly flags them as not matching any overload signature. Add type: ignore comments to suppress the warnings.

🔎 Proposed fix
 def test_fernet_cannot_provide_fernet_with_source_material(memory_store: MemoryStore):
     """Test that providing both fernet and source_material raises ValueError."""
     fernet = Fernet(key=Fernet.generate_key())
     with pytest.raises(ValueError, match="Cannot provide fernet together with source_material or salt"):
-        FernetEncryptionWrapper(
+        FernetEncryptionWrapper(  # type: ignore[call-overload]
             key_value=memory_store,
             fernet=fernet,
             source_material="test",
         )


 def test_fernet_cannot_provide_fernet_with_salt(memory_store: MemoryStore):
     """Test that providing both fernet and salt raises ValueError."""
     fernet = Fernet(key=Fernet.generate_key())
     with pytest.raises(ValueError, match="Cannot provide fernet together with source_material or salt"):
-        FernetEncryptionWrapper(
+        FernetEncryptionWrapper(  # type: ignore[call-overload]
             key_value=memory_store,
             fernet=fernet,
             salt="test",
         )
🧰 Tools
🪛 GitHub Actions: Run Tests

[error] 194-194: No overloads for "init" match the provided arguments. Argument types: (MemoryStore, Fernet, Literal['test'])

🪛 GitHub Check: static_analysis (key-value/key-value-aio)

[failure] 205-205:
No overloads for "init" match the provided arguments
  Argument types: (MemoryStore, Fernet, Literal['test']) (reportCallIssue)


[failure] 194-194:
No overloads for "init" match the provided arguments
  Argument types: (MemoryStore, Fernet, Literal['test']) (reportCallIssue)

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_encryption.py around lines
190 to 209, the two tests intentionally call FernetEncryptionWrapper with both
fernet and source_material/salt which triggers static type-checker overload
errors; add a trailing "# type: ignore[call-overload]" (or "# type: ignore" if
call-overload isn't available) to each FernetEncryptionWrapper(...) call to
suppress the type-check warnings while keeping the runtime tests unchanged.



def test_fernet_must_provide_source_material(memory_store: MemoryStore):
"""Test that not providing fernet or source_material raises ValueError."""
with pytest.raises(ValueError, match="Must provide either fernet or source_material"):
FernetEncryptionWrapper(key_value=memory_store)

Check failure on line 215 in key-value/key-value-aio/tests/stores/wrappers/test_encryption.py

View workflow job for this annotation

GitHub Actions / static_analysis (key-value/key-value-aio)

No overloads for "__init__" match the provided arguments   Argument types: (MemoryStore) (reportCallIssue)
Comment on lines +212 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add type: ignore comment for intentional overload violation.

This test intentionally omits required parameters to verify error handling. Add a type: ignore comment to suppress the type checker warning.

🔎 Proposed fix
 def test_fernet_must_provide_source_material(memory_store: MemoryStore):
     """Test that not providing fernet or source_material raises ValueError."""
     with pytest.raises(ValueError, match="Must provide either fernet or source_material"):
-        FernetEncryptionWrapper(key_value=memory_store)
+        FernetEncryptionWrapper(key_value=memory_store)  # type: ignore[call-overload]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_fernet_must_provide_source_material(memory_store: MemoryStore):
"""Test that not providing fernet or source_material raises ValueError."""
with pytest.raises(ValueError, match="Must provide either fernet or source_material"):
FernetEncryptionWrapper(key_value=memory_store)
def test_fernet_must_provide_source_material(memory_store: MemoryStore):
"""Test that not providing fernet or source_material raises ValueError."""
with pytest.raises(ValueError, match="Must provide either fernet or source_material"):
FernetEncryptionWrapper(key_value=memory_store) # type: ignore[call-overload]
🧰 Tools
🪛 GitHub Check: static_analysis (key-value/key-value-aio)

[failure] 215-215:
No overloads for "init" match the provided arguments
  Argument types: (MemoryStore) (reportCallIssue)

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_encryption.py around lines
212 to 215, the test intentionally calls FernetEncryptionWrapper without
required params to assert a ValueError but the type checker raises a
call-argument error; suppress this expected type-checker warning by adding a
type ignore comment to the constructor call (e.g. append "# type:
ignore[call-arg]" or "# type: ignore" to that line) so the test still verifies
runtime behavior while avoiding static-check failures.



def test_fernet_must_provide_salt_with_source_material(memory_store: MemoryStore):
"""Test that providing source_material without salt raises ValueError."""
with pytest.raises(ValueError, match="Must provide a salt"):
FernetEncryptionWrapper(

Check failure on line 221 in key-value/key-value-aio/tests/stores/wrappers/test_encryption.py

View workflow job for this annotation

GitHub Actions / static_analysis (key-value/key-value-aio)

No overloads for "__init__" match the provided arguments   Argument types: (MemoryStore, Literal['test-source']) (reportCallIssue)
key_value=memory_store,
source_material="test-source",
)
Comment on lines +218 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add type: ignore comment for intentional overload violation.

This test intentionally omits the required salt parameter to verify error handling. Add a type: ignore comment to suppress the type checker warning.

🔎 Proposed fix
 def test_fernet_must_provide_salt_with_source_material(memory_store: MemoryStore):
     """Test that providing source_material without salt raises ValueError."""
     with pytest.raises(ValueError, match="Must provide a salt"):
-        FernetEncryptionWrapper(
+        FernetEncryptionWrapper(  # type: ignore[call-overload]
             key_value=memory_store,
             source_material="test-source",
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_fernet_must_provide_salt_with_source_material(memory_store: MemoryStore):
"""Test that providing source_material without salt raises ValueError."""
with pytest.raises(ValueError, match="Must provide a salt"):
FernetEncryptionWrapper(
key_value=memory_store,
source_material="test-source",
)
def test_fernet_must_provide_salt_with_source_material(memory_store: MemoryStore):
"""Test that providing source_material without salt raises ValueError."""
with pytest.raises(ValueError, match="Must provide a salt"):
FernetEncryptionWrapper( # type: ignore[call-overload]
key_value=memory_store,
source_material="test-source",
)
🧰 Tools
🪛 GitHub Check: static_analysis (key-value/key-value-aio)

[failure] 221-221:
No overloads for "init" match the provided arguments
  Argument types: (MemoryStore, Literal['test-source']) (reportCallIssue)

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_encryption.py around lines
218 to 224, the test intentionally omits the required salt parameter to trigger
a ValueError but triggers a static type-checker warning; add a type ignore
comment to the FernetEncryptionWrapper(...) invocation (e.g. append "# type:
ignore[call-arg]" or "# type: ignore" to that line) so the overload violation is
suppressed while keeping the runtime behavior and assertion unchanged.



def test_fernet_empty_source_material(memory_store: MemoryStore):
"""Test that empty source_material raises ValueError."""
with pytest.raises(ValueError, match="Must provide either fernet or source_material"):
FernetEncryptionWrapper(
key_value=memory_store,
source_material=" ",
salt="test",
)


def test_fernet_empty_salt(memory_store: MemoryStore):
"""Test that empty salt raises ValueError."""
with pytest.raises(ValueError, match="Must provide a salt"):
FernetEncryptionWrapper(
key_value=memory_store,
source_material="test-source",
salt=" ",
)
112 changes: 111 additions & 1 deletion key-value/key-value-aio/tests/stores/wrappers/test_fallback.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from typing import Any, SupportsFloat

import pytest
Expand All @@ -17,11 +17,43 @@ async def get(self, key: str, *, collection: str | None = None) -> dict[str, Any
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def get_many(self, keys: Sequence[str], *, collection: str | None = None) -> list[dict[str, Any] | None]:
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def ttl(self, key: str, *, collection: str | None = None) -> tuple[dict[str, Any] | None, float | None]:
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def ttl_many(self, keys: Sequence[str], *, collection: str | None = None) -> list[tuple[dict[str, Any] | None, float | None]]:
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def put(self, key: str, value: Mapping[str, Any], *, collection: str | None = None, ttl: SupportsFloat | None = None):
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def put_many(
self, keys: Sequence[str], values: Sequence[Mapping[str, Any]], *, collection: str | None = None, ttl: SupportsFloat | None = None
) -> None:
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def delete(self, key: str, *, collection: str | None = None) -> bool:
msg = "Primary store unavailable"
raise ConnectionError(msg)

@override
async def delete_many(self, keys: Sequence[str], *, collection: str | None = None) -> int:
msg = "Primary store unavailable"
raise ConnectionError(msg)


class TestFallbackWrapper(BaseStoreTests):
@override
Expand Down Expand Up @@ -77,3 +109,81 @@ async def test_write_to_fallback_enabled(self):
# Verify it was written to fallback
result = await fallback_store.get(collection="test", key="test")
assert result == {"test": "value"}

async def test_fallback_get_many(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store)

# Put data in fallback store
await fallback_store.put(collection="test", key="k1", value={"v": "1"})
await fallback_store.put(collection="test", key="k2", value={"v": "2"})

# Should fall back for get_many
result = await wrapper.get_many(collection="test", keys=["k1", "k2"])
assert result == [{"v": "1"}, {"v": "2"}]

async def test_fallback_ttl(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store)

# Put data in fallback store with TTL
await fallback_store.put(collection="test", key="test", value={"v": "1"}, ttl=100)

# Should fall back for ttl
value, ttl = await wrapper.ttl(collection="test", key="test")
assert value == {"v": "1"}
assert ttl is not None

async def test_fallback_ttl_many(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store)

# Put data in fallback store
await fallback_store.put(collection="test", key="k1", value={"v": "1"}, ttl=100)
await fallback_store.put(collection="test", key="k2", value={"v": "2"}, ttl=200)

# Should fall back for ttl_many
results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
assert results[0][0] == {"v": "1"}
assert results[1][0] == {"v": "2"}
Comment on lines +139 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider verifying TTL values.

Test correctly validates ttl_many fallback, but only checks values. Optionally assert that results[0][1] and results[1][1] are not None (or check approximate TTL values) for more thorough validation.

🔎 Optional enhancement
     results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
     assert results[0][0] == {"v": "1"}
     assert results[1][0] == {"v": "2"}
+    assert results[0][1] is not None
+    assert results[1][1] is not None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def test_fallback_ttl_many(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store)
# Put data in fallback store
await fallback_store.put(collection="test", key="k1", value={"v": "1"}, ttl=100)
await fallback_store.put(collection="test", key="k2", value={"v": "2"}, ttl=200)
# Should fall back for ttl_many
results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
assert results[0][0] == {"v": "1"}
assert results[1][0] == {"v": "2"}
async def test_fallback_ttl_many(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store)
# Put data in fallback store
await fallback_store.put(collection="test", key="k1", value={"v": "1"}, ttl=100)
await fallback_store.put(collection="test", key="k2", value={"v": "2"}, ttl=200)
# Should fall back for ttl_many
results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
assert results[0][0] == {"v": "1"}
assert results[1][0] == {"v": "2"}
assert results[0][1] is not None
assert results[1][1] is not None
🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_fallback.py around lines
139 to 151, the test asserts returned values from ttl_many but does not verify
the TTLs; update the test to also assert that results[0][1] and results[1][1]
are not None (or assert they are within expected ranges, e.g. <= 100 and <= 200
and > 0) so the fallback returns both value and a valid TTL; add these
assertions after the existing value checks.


async def test_fallback_put_many_enabled(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store, write_to_fallback=True)

# Should fall back for put_many
await wrapper.put_many(collection="test", keys=["k1", "k2"], values=[{"v": "1"}, {"v": "2"}])

# Verify in fallback
assert await fallback_store.get(collection="test", key="k1") == {"v": "1"}
assert await fallback_store.get(collection="test", key="k2") == {"v": "2"}
Comment on lines +153 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

LGTM!

Test correctly validates put_many falls back to secondary store when write_to_fallback is enabled.

Optionally add a corresponding test for write_to_fallback=False with put_many (should raise ConnectionError, similar to test_write_to_fallback_disabled for put).

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_fallback.py around lines
153-163, add a complementary test to verify behavior when write_to_fallback is
False: create a FailingStore as primary and a MemoryStore as fallback, construct
the FallbackWrapper with write_to_fallback=False, call wrapper.put_many(...) and
assert that it raises a ConnectionError (or the same exception used by
test_write_to_fallback_disabled for put); also assert the fallback store remains
empty/unmodified for the attempted keys. Ensure the test is async and follows
the same naming and assertion style as existing tests.


async def test_fallback_delete_enabled(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store, write_to_fallback=True)

# Put data in fallback
await fallback_store.put(collection="test", key="test", value={"v": "1"})

# Should fall back for delete
result = await wrapper.delete(collection="test", key="test")
assert result is True
assert await fallback_store.get(collection="test", key="test") is None

async def test_fallback_delete_many_enabled(self):
primary_store = FailingStore()
fallback_store = MemoryStore()
wrapper = FallbackWrapper(primary_key_value=primary_store, fallback_key_value=fallback_store, write_to_fallback=True)

# Put data in fallback
await fallback_store.put(collection="test", key="k1", value={"v": "1"})
await fallback_store.put(collection="test", key="k2", value={"v": "2"})

# Should fall back for delete_many
result = await wrapper.delete_many(collection="test", keys=["k1", "k2"])
assert result == 2
Comment on lines +165 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

LGTM!

Both delete tests correctly validate fallback behavior when write_to_fallback is enabled. The tests verify return values and actual deletion in the fallback store.

Optionally add corresponding tests for write_to_fallback=False with delete/delete_many (should raise ConnectionError).

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_fallback.py around lines
165 to 189, add tests covering the negative case where write_to_fallback=False:
create a FailingStore as primary and a MemoryStore as fallback, construct
FallbackWrapper with write_to_fallback=False, seed the fallback store with
entries, then assert that calling delete(...) raises ConnectionError and that
delete_many(...) raises ConnectionError (and verify that fallback data remains
unchanged). Ensure both tests use async/await and proper pytest.raises for the
ConnectionError.

Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,80 @@ async def cache_store(self, memory_store: MemoryStore) -> MemoryStore:
async def store(self, primary_store: DiskStore, cache_store: MemoryStore) -> PassthroughCacheWrapper:
primary_store._cache.clear() # pyright: ignore[reportPrivateUsage]
return PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

async def test_ttl_caches_from_primary(self):
"""Test that ttl retrieves from primary and caches the result."""
primary_store = MemoryStore()
cache_store = MemoryStore()
wrapper = PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

# Put data in primary with TTL
await primary_store.put(collection="test", key="test", value={"v": "1"}, ttl=100)

# Call ttl - should get from primary and cache it
value, ttl = await wrapper.ttl(collection="test", key="test")
assert value == {"v": "1"}
assert ttl is not None

# Verify it's now in cache
cached_value = await cache_store.get(collection="test", key="test")
assert cached_value == {"v": "1"}
Comment on lines +32 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Optional: Validate TTL values more precisely.

The test checks that ttl is not None but doesn't verify the value is close to the expected 100 seconds. Consider asserting ttl > 0 or ttl <= 100 to strengthen validation.

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_passthrough_cache.py
around lines 32 to 48, the test currently only asserts `ttl is not None`;
tighten this by asserting the TTL is within expected bounds to catch regressions
— replace or add assertions such as `assert ttl > 0` and `assert ttl <= 100` (or
`assert 0 < ttl <= 100`) so the returned TTL is positive and does not exceed the
original 100-second value, allowing for small elapsed time.


async def test_ttl_returns_cached_value(self):
"""Test that ttl returns cached value when available."""
primary_store = MemoryStore()
cache_store = MemoryStore()
wrapper = PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

# Put data only in cache
await cache_store.put(collection="test", key="test", value={"v": "cached"}, ttl=100)

# Call ttl - should return cached value
value, ttl = await wrapper.ttl(collection="test", key="test")
assert value == {"v": "cached"}
assert ttl is not None

async def test_ttl_returns_none_for_missing(self):
"""Test that ttl returns (None, None) for missing entries."""
primary_store = MemoryStore()
cache_store = MemoryStore()
wrapper = PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

# Call ttl for non-existent key
value, ttl = await wrapper.ttl(collection="test", key="missing")
assert value is None
assert ttl is None

async def test_ttl_many_caches_from_primary(self):
"""Test that ttl_many retrieves from primary and caches results."""
primary_store = MemoryStore()
cache_store = MemoryStore()
wrapper = PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

# Put data in primary with TTL
await primary_store.put(collection="test", key="k1", value={"v": "1"}, ttl=100)
await primary_store.put(collection="test", key="k2", value={"v": "2"}, ttl=200)

# Call ttl_many - should get from primary and cache
results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
assert results[0][0] == {"v": "1"}
assert results[1][0] == {"v": "2"}

# Verify in cache
assert await cache_store.get(collection="test", key="k1") == {"v": "1"}
assert await cache_store.get(collection="test", key="k2") == {"v": "2"}
Comment on lines +75 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider testing partial cache hits.

Current tests cover full primary fetch and full cache hit scenarios. A test where some keys are cached and others require primary fetch would strengthen coverage of ttl_many behavior.

🤖 Prompt for AI Agents
In key-value/key-value-aio/tests/stores/wrappers/test_passthrough_cache.py
around lines 75 to 92, add a test for partial cache hits: seed the cache_store
with one key (e.g., "k1") and the primary_store with both keys ("k1" and "k2")
with TTLs, call wrapper.ttl_many(collection="test", keys=["k1","k2"]), assert
returned values for both keys are correct, assert the cached key remains
unchanged and the missing key ("k2") is fetched from primary and written into
cache, and verify cache_store.get for "k2" now returns the expected value; keep
assertions for TTL behavior consistent with existing tests.


async def test_ttl_many_returns_cached_values(self):
"""Test that ttl_many returns cached values when available."""
primary_store = MemoryStore()
cache_store = MemoryStore()
wrapper = PassthroughCacheWrapper(primary_key_value=primary_store, cache_key_value=cache_store)

# Put data in cache
await cache_store.put(collection="test", key="k1", value={"v": "cached1"}, ttl=100)
await cache_store.put(collection="test", key="k2", value={"v": "cached2"}, ttl=200)

# Call ttl_many - should return cached values
results = await wrapper.ttl_many(collection="test", keys=["k1", "k2"])
assert results[0][0] == {"v": "cached1"}
assert results[1][0] == {"v": "cached2"}
92 changes: 91 additions & 1 deletion key-value/key-value-shared/tests/utils/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import pytest
from inline_snapshot import snapshot

from key_value.shared.errors import DeserializationError, SerializationError
from key_value.shared.utils.managed_entry import ManagedEntry
from key_value.shared.utils.serialization import BasicSerializationAdapter
from key_value.shared.utils.serialization import BasicSerializationAdapter, key_must_be, parse_datetime_str

FIXED_DATETIME_ONE = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
FIXED_DATETIME_ONE_ISOFORMAT = FIXED_DATETIME_ONE.isoformat()
Expand Down Expand Up @@ -80,3 +81,92 @@ def test_entry_two(self, adapter: BasicSerializationAdapter):

assert adapter.load_dict(data=adapter.dump_dict(entry=TEST_ENTRY_TWO)) == snapshot(TEST_ENTRY_TWO)
assert adapter.load_json(json_str=adapter.dump_json(entry=TEST_ENTRY_TWO)) == snapshot(TEST_ENTRY_TWO)

def test_dump_dict_with_key_and_collection(self, adapter: BasicSerializationAdapter):
"""Test dump_dict includes key and collection when provided."""
result = adapter.dump_dict(entry=TEST_ENTRY_ONE, key="my-key", collection="my-collection")
assert result["key"] == "my-key"
assert result["collection"] == "my-collection"

def test_dump_dict_with_datetime_format(self):
"""Test dump_dict with datetime format instead of isoformat."""
adapter = BasicSerializationAdapter(date_format="datetime")
result = adapter.dump_dict(entry=TEST_ENTRY_ONE)
assert result["created_at"] == FIXED_DATETIME_ONE
assert result["expires_at"] == FIXED_DATETIME_ONE_PLUS_10_SECONDS

def test_load_dict_with_datetime_format(self):
"""Test load_dict with datetime format instead of isoformat."""
adapter = BasicSerializationAdapter(date_format="datetime")
data = {
"created_at": FIXED_DATETIME_ONE,
"expires_at": FIXED_DATETIME_ONE_PLUS_10_SECONDS,
"value": TEST_DATA_ONE,
}
result = adapter.load_dict(data=data)
assert result.created_at == FIXED_DATETIME_ONE
assert result.expires_at == FIXED_DATETIME_ONE_PLUS_10_SECONDS

def test_dump_json_with_datetime_format_raises_error(self):
"""Test dump_json raises error when date_format is datetime."""
adapter = BasicSerializationAdapter(date_format="datetime")
with pytest.raises(SerializationError, match="dump_json is incompatible"):
adapter.dump_json(entry=TEST_ENTRY_ONE)

def test_load_dict_with_string_value(self, adapter: BasicSerializationAdapter):
"""Test load_dict with value as JSON string."""
data = {
"created_at": FIXED_DATETIME_ONE_ISOFORMAT,
"expires_at": FIXED_DATETIME_ONE_PLUS_10_SECONDS_ISOFORMAT,
"value": '{"key": "value"}',
}
result = adapter.load_dict(data=data)
assert result.value == {"key": "value"}

def test_load_dict_missing_value_raises_error(self, adapter: BasicSerializationAdapter):
"""Test load_dict raises error when value is missing."""
data = {
"created_at": FIXED_DATETIME_ONE_ISOFORMAT,
"expires_at": FIXED_DATETIME_ONE_PLUS_10_SECONDS_ISOFORMAT,
}
with pytest.raises(DeserializationError, match="Value field not found"):
adapter.load_dict(data=data)

def test_load_dict_invalid_value_type_raises_error(self, adapter: BasicSerializationAdapter):
"""Test load_dict raises error when value is not string or dict."""
data = {
"created_at": FIXED_DATETIME_ONE_ISOFORMAT,
"expires_at": FIXED_DATETIME_ONE_PLUS_10_SECONDS_ISOFORMAT,
"value": 12345,
}
with pytest.raises(DeserializationError, match="Value field is not a string or dictionary"):
adapter.load_dict(data=data)


class TestKeyMustBe:
def test_key_missing(self):
"""Test key_must_be returns None when key is missing."""
result = key_must_be({"other": "value"}, key="missing", expected_type=str)
assert result is None

def test_key_wrong_type(self):
"""Test key_must_be raises TypeError when type is wrong."""
with pytest.raises(TypeError, match="created_at must be a str"):
key_must_be({"created_at": 12345}, key="created_at", expected_type=str)

def test_key_correct_type(self):
"""Test key_must_be returns value when type is correct."""
result = key_must_be({"created_at": "2025-01-01"}, key="created_at", expected_type=str)
assert result == "2025-01-01"


class TestParseDatetimeStr:
def test_valid_datetime(self):
"""Test parse_datetime_str with valid datetime string."""
result = parse_datetime_str("2025-01-01T00:00:00+00:00")
assert result == FIXED_DATETIME_ONE

def test_invalid_datetime(self):
"""Test parse_datetime_str raises error for invalid string."""
with pytest.raises(DeserializationError, match="Invalid datetime string"):
parse_datetime_str("not-a-datetime")
Loading
Loading