Skip to content

Commit 781971e

Browse files
authored
Update Eventloop + CoroutineGen!
1 parent 13ab195 commit 781971e

File tree

8 files changed

+133
-73
lines changed

8 files changed

+133
-73
lines changed

src/vennv/vapm/AwaitGroup.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525

2626
use Generator;
2727

28+
/**
29+
* @author VennDev <[email protected]>
30+
* @package vennv\vapm
31+
*
32+
* This interface is used to create a await group object that can be used to wait for a group of coroutines to complete.
33+
*/
2834
interface AwaitGroupInterface
2935
{
3036

src/vennv/vapm/CoroutineGen.php

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
use SplQueue;
2929
use Generator;
3030
use Throwable;
31-
use function call_user_func;
3231

3332
interface CoroutineGenInterface
3433
{
@@ -102,13 +101,12 @@ public static function runNonBlocking(mixed ...$coroutines): void
102101
System::init();
103102
self::$taskQueue ??= new SplQueue();
104103
foreach ($coroutines as $coroutine) {
105-
if (is_callable($coroutine)) $coroutine = call_user_func($coroutine);
106-
if ($coroutine instanceof Generator) {
107-
self::schedule(new ChildCoroutine($coroutine));
108-
} else {
109-
call_user_func(fn() => $coroutine);
110-
}
104+
$result = is_callable($coroutine) ? $coroutine() : $coroutine;
105+
$result instanceof Generator
106+
? self::schedule(new ChildCoroutine($result))
107+
: $result;
111108
}
109+
self::run();
112110
}
113111

114112
/**
@@ -119,7 +117,9 @@ public static function runNonBlocking(mixed ...$coroutines): void
119117
public static function runBlocking(mixed ...$coroutines): void
120118
{
121119
self::runNonBlocking(...$coroutines);
122-
while (self::$taskQueue?->isEmpty() === false) self::run();
120+
while (!self::$taskQueue?->isEmpty()) {
121+
self::run();
122+
}
123123
}
124124

125125
/**
@@ -130,18 +130,23 @@ private static function processCoroutine(mixed ...$coroutines): Closure
130130
{
131131
return function () use ($coroutines): void {
132132
foreach ($coroutines as $coroutine) {
133-
if (is_callable($coroutine)) {
134-
$coroutine = call_user_func($coroutine);
135-
}
136-
!$coroutine instanceof Generator ? call_user_func(fn() => $coroutine) : self::schedule(new ChildCoroutine($coroutine));
133+
$result = is_callable($coroutine) ? $coroutine() : $coroutine;
134+
$result instanceof Generator
135+
? self::schedule(new ChildCoroutine($result))
136+
: $result;
137137
}
138138
self::run();
139139
};
140140
}
141141

142142
public static function repeat(callable $callback, int $times): Closure
143143
{
144-
for ($i = 0; $i <= $times; $i++) if (call_user_func($callback) instanceof Generator) $callback = self::processCoroutine($callback);
144+
for ($i = 0; $i < $times; $i++) {
145+
$result = $callback();
146+
if ($result instanceof Generator) {
147+
$callback = self::processCoroutine($result);
148+
}
149+
}
145150
return fn() => null;
146151
}
147152

@@ -161,8 +166,8 @@ private static function schedule(ChildCoroutine $childCoroutine): void
161166
*/
162167
public static function run(): void
163168
{
164-
if (self::$taskQueue?->isEmpty() === false) {
165-
$coroutine = self::$taskQueue->dequeue();
169+
if (!self::$taskQueue?->isEmpty()) {
170+
$coroutine = self::$taskQueue?->dequeue();
166171
if ($coroutine instanceof ChildCoroutine && !$coroutine->isFinished()) {
167172
self::schedule($coroutine->run());
168173
}

src/vennv/vapm/Deferred.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525

2626
use Generator;
2727

28+
/**
29+
* @author VennDev <[email protected]>
30+
* @package vennv\vapm
31+
*
32+
* This interface is used to create a deferred object that can be used to get the result of a coroutine.
33+
*/
2834
interface DeferredInterface
2935
{
3036

src/vennv/vapm/EventLoop.php

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
use SplQueue;
2727
use Generator;
2828
use Throwable;
29-
use function count;
3029
use const PHP_INT_MAX;
3130

3231
interface EventLoopInterface
@@ -58,12 +57,12 @@ public static function getReturns(): Generator;
5857
class EventLoop implements EventLoopInterface
5958
{
6059

61-
protected const LIMIT = 20;
60+
protected const LIMIT = 20; // 20 times run
6261

6362
protected static int $nextId = 0;
6463

6564
/**
66-
* @var SplQueue
65+
* @var SplQueue<Promise>
6766
*/
6867
protected static SplQueue $queues;
6968

@@ -74,7 +73,7 @@ class EventLoop implements EventLoopInterface
7473

7574
public static function init(): void
7675
{
77-
if (!isset(self::$queues)) self::$queues = new SplQueue();
76+
self::$queues ??= new SplQueue();
7877
}
7978

8079
public static function generateId(): int
@@ -142,34 +141,28 @@ private static function clearGarbage(): void
142141
*/
143142
protected static function run(): void
144143
{
145-
CoroutineGen::run(); // Run CoroutineGen
144+
CoroutineGen::run();
146145

147146
$i = 0;
148-
while (!self::$queues->isEmpty()) {
149-
if ($i++ >= self::LIMIT) break;
150-
/**
151-
* @var Promise $promise
152-
*/
147+
while (!self::$queues->isEmpty() && $i++ < self::LIMIT) {
148+
/** @var Promise $promise */
153149
$promise = self::$queues->dequeue();
154-
155-
$id = $promise->getId();
156150
$fiber = $promise->getFiber();
157-
158151
if ($fiber->isSuspended()) $fiber->resume();
159152
if ($fiber->isTerminated() && ($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult())) {
160153
try {
161-
if ($promise->isJustGetResult()) $promise->setResult($fiber->getReturn());
154+
$promise->isJustGetResult() && $promise->setResult($fiber->getReturn());
162155
} catch (Throwable $e) {
163156
echo $e->getMessage();
164157
}
165-
MicroTask::addTask($id, $promise);
158+
MicroTask::addTask($promise->getId(), $promise);
166159
} else {
167160
self::$queues->enqueue($promise);
168161
}
169162
}
170163

171-
if (MicroTask::isPrepare()) MicroTask::run();
172-
if (MacroTask::isPrepare()) MacroTask::run();
164+
MicroTask::isPrepare() && MicroTask::run();
165+
MacroTask::isPrepare() && MacroTask::run();
173166

174167
self::clearGarbage();
175168
}

src/vennv/vapm/Mutex.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@
2525

2626
use Generator;
2727

28+
/**
29+
* @author VennDev <[email protected]>
30+
* @package vennv\vapm
31+
*
32+
* This class is used to create a mutex object that can be used to synchronize access to shared resources.
33+
* Note: this just for coroutine, if you want to use it in other places, you need to implement it yourself.
34+
*/
2835
interface MutexInterface
2936
{
3037

src/vennv/vapm/PHPUtils.php

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
use Throwable;
2727

28-
final class PHPUtils
28+
interface PHPUtilsInterface
2929
{
3030

3131
/**
@@ -35,7 +35,72 @@ final class PHPUtils
3535
*
3636
* @phpstan-param array<int|float|string|object> $array
3737
* @throws Throwable
38+
*
39+
* This function is used to iterate over an array and call a callback function for each element.
3840
*/
41+
public static function forEach(array $array, callable $callback): Async;
42+
43+
/**
44+
* @param array<int|float|string|object> $array
45+
* @param callable $callback
46+
* @return Async
47+
*
48+
* @phpstan-param array<int|float|string|object> $array
49+
* @throws Throwable
50+
*
51+
* This function is used to map over an array and apply a callback function to each element.
52+
*/
53+
public static function arrayMap(array $array, callable $callback): Async;
54+
55+
/**
56+
* @param array<int|float|string|object> $array
57+
* @param callable $callback
58+
* @return Async
59+
*
60+
* @phpstan-param array<int|float|string|object> $array
61+
* @throws Throwable
62+
*/
63+
public static function arrayFilter(array $array, callable $callback): Async;
64+
65+
/**
66+
* @param array<int|float|string|object> $array
67+
* @param callable $callback
68+
* @param mixed $initialValue
69+
* @return Async
70+
*
71+
* @throws Throwable
72+
*
73+
* This function is used to reduce an array to a single value by applying a callback function to each element.
74+
*/
75+
public static function arrayReduce(array $array, callable $callback, mixed $initialValue): Async;
76+
77+
/**
78+
* @param array<int|float|string|object> $array
79+
* @param string $className
80+
* @return Async
81+
*
82+
* @throws Throwable
83+
*
84+
* This function is used to check if all elements in an array are instances of a specific class.
85+
*/
86+
public static function instanceOfAll(array $array, string $className): Async;
87+
88+
/**
89+
* @param array<int|float|string|object> $array
90+
* @param string $className
91+
* @return Async
92+
*
93+
* @throws Throwable
94+
*
95+
* This function is used to check if any element in an array is an instance of a specific class.
96+
*/
97+
public static function instanceOfAny(array $array, string $className): Async;
98+
99+
}
100+
101+
final class PHPUtils implements PHPUtilsInterface
102+
{
103+
39104
public static function forEach(array $array, callable $callback): Async
40105
{
41106
return new Async(function () use ($array, $callback) {
@@ -46,14 +111,6 @@ public static function forEach(array $array, callable $callback): Async
46111
});
47112
}
48113

49-
/**
50-
* @param array<int|float|string|object> $array
51-
* @param callable $callback
52-
* @return Async
53-
*
54-
* @phpstan-param array<int|float|string|object> $array
55-
* @throws Throwable
56-
*/
57114
public static function arrayMap(array $array, callable $callback): Async
58115
{
59116
return new Async(function () use ($array, $callback) {
@@ -66,14 +123,6 @@ public static function arrayMap(array $array, callable $callback): Async
66123
});
67124
}
68125

69-
/**
70-
* @param array<int|float|string|object> $array
71-
* @param callable $callback
72-
* @return Async
73-
*
74-
* @phpstan-param array<int|float|string|object> $array
75-
* @throws Throwable
76-
*/
77126
public static function arrayFilter(array $array, callable $callback): Async
78127
{
79128
return new Async(function () use ($array, $callback) {
@@ -88,14 +137,6 @@ public static function arrayFilter(array $array, callable $callback): Async
88137
});
89138
}
90139

91-
/**
92-
* @param array<int|float|string|object> $array
93-
* @param callable $callback
94-
* @param mixed $initialValue
95-
* @return Async
96-
*
97-
* @throws Throwable
98-
*/
99140
public static function arrayReduce(array $array, callable $callback, mixed $initialValue): Async
100141
{
101142
return new Async(function () use ($array, $callback, $initialValue) {
@@ -108,13 +149,6 @@ public static function arrayReduce(array $array, callable $callback, mixed $init
108149
});
109150
}
110151

111-
/**
112-
* @param array<int|float|string|object> $array
113-
* @param string $className
114-
* @return Async
115-
*
116-
* @throws Throwable
117-
*/
118152
public static function instanceOfAll(array $array, string $className): Async
119153
{
120154
return new Async(function () use ($array, $className) {
@@ -126,13 +160,6 @@ public static function instanceOfAll(array $array, string $className): Async
126160
});
127161
}
128162

129-
/**
130-
* @param array<int|float|string|object> $array
131-
* @param string $className
132-
* @return Async
133-
*
134-
* @throws Throwable
135-
*/
136163
public static function instanceOfAny(array $array, string $className): Async
137164
{
138165
return new Async(function () use ($array, $className) {

src/vennv/vapm/Promise.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
use function is_callable;
3232
use function microtime;
3333

34+
/**
35+
* @author VennDev <[email protected]>
36+
* @package vennv\vapm
37+
*
38+
* This interface is used to create a promise object that can be used to get the result of a coroutine.
39+
*/
3440
interface PromiseInterface
3541
{
3642

0 commit comments

Comments
 (0)