Skip to content

Commit 5f548df

Browse files
authored
rabbit feedback (#55)
1 parent 446df29 commit 5f548df

File tree

2 files changed

+132
-66
lines changed

2 files changed

+132
-66
lines changed

kinesis/dynamodb.py

Lines changed: 81 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async def _initialize(self):
112112
if self.create_table:
113113
await self._create_table(dynamodb)
114114
else:
115-
raise Exception(f"DynamoDB table {self.table_name} does not exist")
115+
raise Exception(f"DynamoDB table {self.table_name} does not exist") from e
116116
else:
117117
raise
118118

@@ -255,7 +255,7 @@ async def _checkpoint(self, shard_id: str, sequence: str) -> None:
255255

256256
except ClientError as e:
257257
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
258-
raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it")
258+
raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it") from e
259259
else:
260260
raise
261261

@@ -296,83 +296,98 @@ async def allocate(self, shard_id: str) -> Tuple[bool, Optional[str]]:
296296
await self._initialize()
297297

298298
key = self.get_key(shard_id)
299-
ts = self.get_ts()
299+
max_retries = 3
300300

301-
async with self._session.resource("dynamodb", region_name=self.region_name) as dynamodb:
302-
table = await dynamodb.Table(self.table_name)
301+
for retry in range(max_retries):
302+
ts = self.get_ts()
303303

304-
# First, try to create a new record
305-
try:
306-
await table.put_item(
307-
Item={
308-
"shard_id": key,
309-
"ref": self.get_ref(),
310-
"ts": ts,
311-
"sequence": None,
312-
"ttl": self.get_ttl(),
313-
},
314-
ConditionExpression="attribute_not_exists(shard_id)",
315-
)
304+
async with self._session.resource("dynamodb", region_name=self.region_name) as dynamodb:
305+
table = await dynamodb.Table(self.table_name)
316306

317-
log.info(f"{self.get_ref()} allocated {shard_id} (new checkpoint)")
318-
self._items[shard_id] = None
319-
return True, None
307+
# First, try to create a new record
308+
try:
309+
await table.put_item(
310+
Item={
311+
"shard_id": key,
312+
"ref": self.get_ref(),
313+
"ts": ts,
314+
"sequence": None,
315+
"ttl": self.get_ttl(),
316+
},
317+
ConditionExpression="attribute_not_exists(shard_id)",
318+
)
319+
320+
log.info(f"{self.get_ref()} allocated {shard_id} (new checkpoint)")
321+
self._items[shard_id] = None
322+
return True, None
320323

321-
except ClientError as e:
322-
if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
323-
raise
324+
except ClientError as e:
325+
if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
326+
raise
324327

325-
# Record exists, check if we can take it
326-
response = await table.get_item(Key={"shard_id": key})
328+
# Record exists, check if we can take it
329+
response = await table.get_item(Key={"shard_id": key})
327330

328-
if "Item" not in response:
329-
# Race condition - someone deleted it, try again
330-
return await self.allocate(shard_id)
331+
if "Item" not in response:
332+
# Race condition - someone deleted it, retry
333+
if retry < max_retries - 1:
334+
log.debug(f"Race condition detected for {shard_id}, retrying...")
335+
await asyncio.sleep(0.1) # Small delay before retry
336+
continue
337+
else:
338+
log.warning(f"Failed to allocate {shard_id} after {max_retries} retries due to race conditions")
339+
return False, None
331340

332-
item = response["Item"]
341+
item = response["Item"]
333342

334-
if item.get("ts") and item.get("ref"):
335-
# Check if current owner is still alive
336-
age = ts - item["ts"]
343+
if item.get("ts") and item.get("ref"):
344+
# Check if current owner is still alive
345+
age = ts - item["ts"]
337346

338-
if age < self.session_timeout:
339-
log.info(f"{self.get_ref()} could not allocate {shard_id}, still in use by {item['ref']}")
340-
await asyncio.sleep(1) # Avoid spamming
341-
return False, None
347+
if age < self.session_timeout:
348+
log.info(f"{self.get_ref()} could not allocate {shard_id}, still in use by {item['ref']}")
349+
await asyncio.sleep(1) # Avoid spamming
350+
return False, None
342351

343-
log.info(f"Attempting to take lock as {item['ref']} is {age - self.session_timeout} seconds overdue")
352+
log.info(
353+
f"Attempting to take lock as {item['ref']} is {age - self.session_timeout} seconds overdue"
354+
)
344355

345-
# Try to take ownership
346-
try:
347-
await table.update_item(
348-
Key={"shard_id": key},
349-
UpdateExpression="SET #ref = :ref, #ts = :ts, #ttl = :ttl",
350-
ExpressionAttributeNames={
351-
"#ref": "ref",
352-
"#ts": "ts",
353-
"#ttl": "ttl",
354-
},
355-
ExpressionAttributeValues={
356-
":ref": self.get_ref(),
357-
":ts": ts,
358-
":ttl": self.get_ttl(),
359-
":old_ts": item.get("ts"),
360-
},
361-
ConditionExpression="#ts = :old_ts OR attribute_not_exists(#ts)",
362-
ReturnValues="ALL_NEW",
363-
)
356+
# Try to take ownership
357+
try:
358+
await table.update_item(
359+
Key={"shard_id": key},
360+
UpdateExpression="SET #ref = :ref, #ts = :ts, #ttl = :ttl",
361+
ExpressionAttributeNames={
362+
"#ref": "ref",
363+
"#ts": "ts",
364+
"#ttl": "ttl",
365+
},
366+
ExpressionAttributeValues={
367+
":ref": self.get_ref(),
368+
":ts": ts,
369+
":ttl": self.get_ttl(),
370+
":old_ts": item.get("ts"),
371+
},
372+
ConditionExpression="#ts = :old_ts OR attribute_not_exists(#ts)",
373+
ReturnValues="ALL_NEW",
374+
)
375+
376+
sequence = item.get("sequence")
377+
log.info(f"{self.get_ref()} allocated {shard_id}@{sequence}")
378+
self._items[shard_id] = sequence
379+
return True, sequence
364380

365-
sequence = item.get("sequence")
366-
log.info(f"{self.get_ref()} allocated {shard_id}@{sequence}")
367-
self._items[shard_id] = sequence
368-
return True, sequence
381+
except ClientError as e:
382+
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
383+
log.info(f"Someone else beat us to allocating {shard_id}")
384+
return False, None
385+
else:
386+
raise
369387

370-
except ClientError as e:
371-
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
372-
log.info(f"Someone else beat us to allocating {shard_id}")
373-
return False, None
374-
else:
375-
raise
388+
# Should not reach here, but just in case
389+
log.warning(f"Failed to allocate {shard_id} after {max_retries} retries")
390+
return False, None
376391

377392
async def close(self) -> None:
378393
"""Clean up resources."""

tests/test_dynamodb_checkpointer.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,54 @@ async def test_no_create_table_error(self, mock_dynamodb):
338338

339339
with pytest.raises(Exception, match="does not exist"):
340340
await checkpointer._initialize()
341+
342+
@pytest.mark.asyncio
343+
async def test_allocate_race_condition_retry(self, mock_dynamodb):
344+
"""Test allocation retries on race condition."""
345+
checkpointer = DynamoDBCheckPointer("test-app")
346+
await checkpointer._initialize()
347+
348+
# Mock shard already exists
349+
mock_dynamodb["table"].put_item.side_effect = ClientError(
350+
{"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem"
351+
)
352+
353+
# Mock get_item returns empty on first two calls (race condition), then returns item
354+
mock_dynamodb["table"].get_item.side_effect = [
355+
{}, # First call - no item (race condition)
356+
{}, # Second call - no item (race condition)
357+
{ # Third call - item exists
358+
"Item": {"shard_id": "shard-001", "ref": None, "ts": None, "sequence": "seq-123"}
359+
},
360+
]
361+
362+
# Mock successful update
363+
mock_dynamodb["table"].update_item.return_value = {}
364+
365+
success, sequence = await checkpointer.allocate("shard-001")
366+
367+
assert success is True
368+
assert sequence == "seq-123"
369+
# Verify get_item was called 3 times
370+
assert mock_dynamodb["table"].get_item.call_count == 3
371+
372+
@pytest.mark.asyncio
373+
async def test_allocate_race_condition_max_retries(self, mock_dynamodb):
374+
"""Test allocation fails after max retries on persistent race condition."""
375+
checkpointer = DynamoDBCheckPointer("test-app")
376+
await checkpointer._initialize()
377+
378+
# Mock shard already exists
379+
mock_dynamodb["table"].put_item.side_effect = ClientError(
380+
{"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem"
381+
)
382+
383+
# Mock get_item always returns empty (persistent race condition)
384+
mock_dynamodb["table"].get_item.return_value = {}
385+
386+
success, sequence = await checkpointer.allocate("shard-001")
387+
388+
assert success is False
389+
assert sequence is None
390+
# Verify get_item was called 3 times (max retries)
391+
assert mock_dynamodb["table"].get_item.call_count == 3

0 commit comments

Comments
 (0)