@@ -156,13 +156,27 @@ protected static function getShardingFn(): Closure {
156156 /** @var array{0:array{data?:array{0:array{value:string}}}} $result */
157157 $ value = simdjson_decode ($ result [0 ]['data ' ][0 ]['value ' ] ?? '[] ' , true );
158158
159- /** @var array{result:string,status?:string,type?:string} $value */
159+ // FLOW EXPLANATION:
160+ // 1. Table->shard() creates initial state with status='processing', result=null
161+ // 2. Operator->checkTableStatus() monitors queue completion
162+ // 3. When all queue items processed, sets status='done' and result=response_body
163+ // 4. We wait for BOTH status completion AND result to be set
164+ // 5. Only then we can safely call Response::fromBody() with non-null result
165+
166+ /** @var array{result:?string,status?:string,type?:string} $value */
160167 $ type = $ value ['type ' ] ?? 'unknown ' ;
161168 $ status = $ value ['status ' ] ?? 'processing ' ;
162- if ($ type === 'create ' && $ status !== 'processing ' ) {
163- return TaskResult::fromResponse (Response::fromBody ($ value ['result ' ]));
169+ $ result = $ value ['result ' ] ?? null ;
170+
171+ // Only proceed when:
172+ // - Type is 'create' (table creation)
173+ // - Status is not 'processing' (operation completed)
174+ // - Result is not null (response body is available)
175+ if ($ type === 'create ' && $ status !== 'processing ' && $ result !== null ) {
176+ return TaskResult::fromResponse (Response::fromBody ($ result ));
164177 }
165178 if ((time () - $ ts ) > $ timeout ) {
179+ Buddy::debugvv ("Sharding: CreateHandler timeout exceeded for table {$ payload ->table }" );
166180 break ;
167181 }
168182 Coroutine::sleep (1 );
0 commit comments