Skip to content
This repository was archived by the owner on Jun 16, 2025. It is now read-only.

Commit 005c779

Browse files
committed
Refactor BackgroundLogger and ProcessPool to use TaskGroup for improved task management; update RabbitMQListener and WebScraper for consistency
1 parent 4589b11 commit 005c779

File tree

8 files changed

+47
-34
lines changed

8 files changed

+47
-34
lines changed

examples/BackgroundLogger/BackgroundLogger.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@
55
namespace BackgroundLogger;
66

77
use Async\Scope;
8+
use Async\TaskGroup;
89

910
class BackgroundLogger
1011
{
11-
private Scope $scope;
12+
private TaskGroup $taskGroup;
1213

1314
public function __construct()
1415
{
15-
$this->scope = new Scope();
16+
$this->taskGroup = new TaskGroup(new Scope());
1617
}
1718

1819
public function logAsync(string $message): void
1920
{
20-
spawn with $this->scope static use($message) {
21+
$this->taskGroup->spawn(static function() use($message) {
2122
try {
2223
file_put_contents(
2324
'app.log',
@@ -27,12 +28,12 @@ public function logAsync(string $message): void
2728
} catch (\Throwable $e) {
2829
error_log("Async log failed: " . $e->getMessage());
2930
}
30-
};
31+
});
3132
}
3233

3334
public function __destruct()
3435
{
35-
$this->scope->dispose();
36+
$this->taskGroup->dispose();
3637
}
3738
}
3839

examples/ChatServer/ChatServer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ function startChatServer(string $host, int $port): void
5151
} finally {
5252
// Gracefully handle connection cancellation
5353
try {
54-
await $clientScope->allTask() until Async\timeout(2000);
54+
await $clientScope until Async\timeout(2000);
5555
} finally {
5656
// Clean up when client disconnects
5757
socket_close($client);

examples/DistributedTaskOrchestration/DistributedTaskOrchestration.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
use function Async\currentContext;
4+
35
function orchestrateDistributedProcess(array $nodes, array $taskConfig): array
46
{
57
async $orchestratorScope {

examples/FanOut.php

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@ function fetchUrl(string $url): string {
1313
function fetchAllUrls(array $urls): array
1414
{
1515
async bounded $scope {
16+
17+
$tasks = new \Async\TaskGroup();
18+
1619
foreach ($urls as $url) {
17-
spawn fetchUrl($url);
20+
$tasks->add(spawn fetchUrl($url));
1821
}
1922

2023
$results = [];
2124

22-
foreach (await $scope->directTasks() as $url => $future) {
23-
$results[$url] = $future->getResult();
24-
}
25+
try {
26+
foreach (await $tasks as $url => $future) {
27+
$results[$url] = $future->getResult();
28+
}
2529

26-
return $results;
30+
return $results;
31+
} finally {
32+
$tasks->dispose();
33+
}
2734
}
2835
}
2936

examples/ProcessPool/ProcessPool.php

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
use Async\CancellationException;
88
use Async\Scope;
9+
use Async\TaskGroup;
910

1011
final class ProcessPool
1112
{
1213
private Scope $watcherScope;
13-
private Scope $poolScope;
14-
private Scope $jobsScope;
14+
private TaskGroup $poolTasks;
15+
private TaskGroup $jobsTasks;
1516
/**
1617
* List of pipes for each process.
1718
* @var array
@@ -27,32 +28,32 @@ final class ProcessPool
2728
public function __construct(readonly public string $entryPoint, readonly public int $max, readonly public int $min)
2829
{
2930
// Define the coroutine scopes for the pool, watcher, and jobs
30-
$this->poolScope = new Scope();
31+
$this->poolTasks = new TaskGroup(new Scope());
32+
$this->jobsTasks = new TaskGroup(new Scope());
3133
$this->watcherScope = new Scope();
32-
$this->jobsScope = new Scope();
3334
}
3435

3536
public function __destruct()
3637
{
3738
$this->watcherScope->dispose();
38-
$this->poolScope->dispose();
39-
$this->jobsScope->dispose();
39+
$this->poolTasks->dispose();
40+
$this->jobsTasks->dispose();
4041
}
4142

4243
public function start(): void
4344
{
4445
spawn with $this->watcherScope $this->processWatcher();
4546

4647
for ($i = 0; $i < $this->min; $i++) {
47-
spawn with $this->poolScope $this->startProcess();
48+
$this->poolTasks->spawn($this->startProcess(...));
4849
}
4950
}
5051

5152
public function stop(): void
5253
{
5354
$this->watcherScope->cancel();
54-
$this->poolScope->cancel();
55-
$this->jobsScope->cancel();
55+
$this->poolTasks->dispose();
56+
$this->jobsTasks->dispose();
5657
}
5758

5859
/**
@@ -67,10 +68,10 @@ public function executeJob(mixed $job, callable $resultHandle): void
6768
$pid = array_search(true, $this->descriptors, true);
6869

6970
if ($pid === false && count($this->descriptors) < $this->max) {
70-
spawn with $this->poolScope $this->startProcess();
71+
$this->poolTasks->spawn($this->startProcess(...));
7172

7273
// Try to find a free process again after a short delay
73-
spawn with $this->jobsScope use($job, $resultHandle) {
74+
$this->jobsTasks->spawn(function() use($job, $resultHandle) {
7475
usleep(100);
7576

7677
$pid = array_search(true, $this->descriptors, true);
@@ -80,14 +81,14 @@ public function executeJob(mixed $job, callable $resultHandle): void
8081
}
8182

8283
$this->sendJobAndReceiveResult($pid, $job, $resultHandle);
83-
};
84+
});
8485

8586
return;
8687
} elseif ($pid === false) {
8788
$resultHandle(new \Exception('No free process'));
8889
} else {
8990
$this->descriptors[$pid] = false;
90-
spawn with $this->jobsScope $this->sendJobAndReceiveResult($pid, $job, $resultHandle);
91+
$this->jobsTasks->spawn($this->sendJobAndReceiveResult(...), $pid, $job, $resultHandle);
9192
}
9293
}
9394

@@ -96,12 +97,12 @@ private function processWatcher(): void
9697
while (true) {
9798

9899
try {
99-
await $this->poolScope->directTasks();
100+
await $this->poolTasks;
100101
} catch (StopProcessException $exception) {
101102
echo "Process was stopped with message: {$exception->getMessage()}\n";
102103

103104
if($exception->getCode() !== 0 || count($this->descriptors) < $this->min) {
104-
spawn with $this->poolScope $this->startProcess();
105+
$this->poolTasks->spawn($this->startProcess(...));
105106
}
106107
}
107108
}

examples/Rabbitmq/RabbitMQListener.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ private function handleMessage(AMQPMessage $msg, string $queue): void
7171
public function stop(): void
7272
{
7373
$this->scope->cancel();
74+
7475
try {
75-
await $this->scope->allTasks();
76+
await $this->scope;
7677
} finally {
7778
$this->connection->close();
7879
}

examples/TaskGroup.php

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
function taskGroup(array $files): void
88
{
9-
$scope = Scope::inherit();
10-
119
$fileReadTask = function (string $fileName): string {
1210

1311
if(!is_file($fileName)) {
@@ -27,17 +25,20 @@ function taskGroup(array $files): void
2725
return $result;
2826
};
2927

30-
$tasks = [];
28+
$scope = Scope::inherit();
29+
$tasks = new \Async\TaskGroup($scope);
3130

3231
foreach ($files as $file) {
33-
$tasks[$file] = spawn in $scope $fileReadTask($file);
32+
$tasks->add(spawn in $scope $fileReadTask($file));
3433
}
3534

3635
try {
37-
foreach (await Async\all($tasks) as $file => $result) {
38-
echo "File $file: $result\n";
36+
foreach (await $tasks as $result) {
37+
echo "File $result\n";
3938
}
4039
} catch (Exception $e) {
4140
echo "Caught exception: ", $e->getMessage();
41+
} finally {
42+
$tasks->dispose();
4243
}
4344
}

examples/WebScraper/WebScraper.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ function scrapeWebsites(array $urls, int $concurrency = 5): array
3939
};
4040
}
4141

42-
$scraperScope->awaitAll();
42+
await $scraperScope;
4343
return $results;
4444
}
4545
}

0 commit comments

Comments
 (0)