Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions sources/platform/storage/request_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,20 @@ You can lock a request so that no other clients receive it when they fetch the q
This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)).
If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking).

In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request.
In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs.

:::info
The lock mechanism works on the client level, as well as the run level, when running the Actor on the Apify platform.

This means you can unlock or prolong the lock the locked request only if:

1. You are using the same client key, or
2. The operation is being called from the same Actor run.

:::

<Tabs groupId="main">
<TabItem value="Actor 1" label="Actor 1">

```js
import { Actor, ApifyClient } from 'apify';
Expand All @@ -425,9 +438,6 @@ const requestQueue = await client.requestQueues().getOrCreate('example-queue');
const requestQueueClientOne = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueueone',
});
const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Adds multiple requests to the queue.
await requestQueueClientOne.batchAddRequests([
Expand Down Expand Up @@ -457,23 +467,71 @@ await requestQueueClientOne.batchAddRequests([
const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead(
{
limit: 2,
lockSecs: 60,
lockSecs: 120,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of these two variables is pretty confusing, I don't really know what's the difference between theFirstRequestLockedByClientOne and requestLockedByClientOne, especially by name. Could you come up with some better names?

theFirstRequestLockedByClientOne.id,
);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);

// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 120 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);

// Cleans up the queue.
await requestQueueClientOne.delete();

await Actor.exit();
```

</TabItem>
<TabItem value="Actor 2" label="Actor 2">

```js
import { Actor, ApifyClient } from 'apify';

await Actor.init();

const client = new ApifyClient({
token: 'MY-APIFY-TOKEN',
});

// Waits for the first Actor to lock the requests.
await new Promise((resolve) => setTimeout(resolve, 5000));

// Creates a new request queue.
const requestQueue = await client.requestQueues().getOrCreate('example-queue');

const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Get all requests from the queue and check one locked by the first Actor.
const requests = await requestQueueClientTwo.listRequests();
const requestLockedByClientOne = requests.items.filter((request) => request.lockedByClientKey === 'requestqueueone');
const theFirstRequestLockedByClientOne = requestLockedByClientOne[0];

// Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue.
const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead(
{
limit: 2,
limit: 10,
lockSecs: 60,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
theFirstRequestLockedByClientOne.id,
const wasTheClientTwoLockedSameRequest = !!processingRequestsClientTwo.items.find(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm also not sure what the varaible means, could you rename it, or explain better in the comment?

(request) => request.id === theFirstRequestLockedByClientOne.id,
);

console.log(`Was the request locked by the first client locked by the second client? ${wasTheClientTwoLockedSameRequest}`);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);

// Other clients cannot modify the lock; attempting to do so will throw an error.
Expand All @@ -486,21 +544,12 @@ try {
// This will throw an error.
}

// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 60 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);

// Cleans up the queue.
await requestQueueClientOne.delete();

await Actor.exit();
```

</TabItem>
</Tabs>

A detailed tutorial on how to process one request queue with multiple Actor runs can be found in [Academy tutorials](https://docs.apify.com/academy/node-js/multiple-runs-scrape).

## Sharing
Expand Down
Loading