Skip to content

Commit 1502469

Browse files
authored
fix: fix reclaim_request in SqlRequestQueueClient to correctly update the request state (#1486)
### Description - fix `reclaim_request` in `SqlRequestQueueClient` to correctly update the request state ### Issues - Closes: #1484 ### Testing - Add a new test for `RequestQueue.reclaim_request` to check state persistence
1 parent b9d9152 commit 1502469

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,12 +546,20 @@ async def reclaim_request(
546546
block_until = now + timedelta(seconds=self._BLOCK_REQUEST_TIME)
547547
# Extend blocking for forefront request, it is considered blocked by the current client.
548548
stmt = stmt.values(
549-
sequence_number=new_sequence, time_blocked_until=block_until, client_key=self.client_key
549+
sequence_number=new_sequence,
550+
time_blocked_until=block_until,
551+
client_key=self.client_key,
552+
data=request.model_dump_json(),
550553
)
551554
else:
552555
new_sequence = state.sequence_counter
553556
state.sequence_counter += 1
554-
stmt = stmt.values(sequence_number=new_sequence, time_blocked_until=None, client_key=None)
557+
stmt = stmt.values(
558+
sequence_number=new_sequence,
559+
time_blocked_until=None,
560+
client_key=None,
561+
data=request.model_dump_json(),
562+
)
555563

556564
result = await session.execute(stmt)
557565
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result

tests/unit/storages/test_request_queue.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,3 +1291,27 @@ async def test_validate_name(storage_client: StorageClient, name: str, *, is_val
12911291
else:
12921292
with pytest.raises(ValueError, match=rf'Invalid storage name "{name}".*'):
12931293
await RequestQueue.open(name=name, storage_client=storage_client)
1294+
1295+
1296+
async def test_reclaim_request_with_change_state(rq: RequestQueue) -> None:
1297+
"""Test reclaiming a request and changing its state."""
1298+
# Add a request
1299+
await rq.add_request(Request.from_url('https://example.com/original', user_data={'state': 'original'}))
1300+
1301+
# Fetch the request
1302+
request = await rq.fetch_next_request()
1303+
assert request is not None
1304+
assert request.url == 'https://example.com/original'
1305+
assert request.user_data['state'] == 'original'
1306+
1307+
# Reclaim the request with modified user data
1308+
request.user_data['state'] = 'modified'
1309+
result = await rq.reclaim_request(request)
1310+
assert result is not None
1311+
assert result.was_already_handled is False
1312+
1313+
# Fetch the reclaimed request
1314+
reclaimed_request = await rq.fetch_next_request()
1315+
assert reclaimed_request is not None
1316+
assert reclaimed_request.url == 'https://example.com/original'
1317+
assert reclaimed_request.user_data['state'] == 'modified'

0 commit comments

Comments
 (0)