Skip to content

Commit 8e2e373

Browse files
committed
worked on the jobs to run parallel
1 parent ccf069d commit 8e2e373

File tree

7 files changed

+75
-38
lines changed

7 files changed

+75
-38
lines changed

app/Http/Controllers/FileController.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public function store(
7777
'status' => FileStatus::PROCESSING,
7878
]);
7979

80-
ProcessFileJob::dispatch($fileModel);
80+
ProcessFileJob::dispatch($fileModel->id);
8181

8282
return $fileModel;
8383
});

app/Jobs/FinalizeFileJob.php

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class FinalizeFileJob implements ShouldQueue
1515
/**
1616
* Create a new job instance.
1717
*/
18-
public function __construct(public File $file)
18+
public function __construct(public int $fileId)
1919
{
2020
//
2121
}
@@ -25,18 +25,20 @@ public function __construct(public File $file)
2525
*/
2626
public function handle(): void
2727
{
28-
if ($this->file->processed_chunks > $this->file->total_chunks) {
29-
$this->file->processed_chunks = $this->file->total_chunks;
30-
$this->file->save();
31-
$this->file->refresh();
28+
$file = File::findOrFail($this->fileId);
29+
30+
if ($file->processed_chunks > $file->total_chunks) {
31+
$file->processed_chunks = $file->total_chunks;
32+
$file->save();
33+
$file->refresh();
3234
}
3335

34-
if ($this->file->processed_chunks == $this->file->total_chunks) {
36+
if ($file->processed_chunks == $file->total_chunks) {
3537
/* @phpstan-ignore-next-line */
36-
$this->file->status = FileStatus::COMPLETED;
37-
$this->file->save();
38+
$file->status = FileStatus::COMPLETED;
39+
$file->save();
3840

39-
event(new FilesStatusUpdated($this->file));
41+
event(new FilesStatusUpdated($file));
4042
}
4143
}
4244
}

app/Jobs/ProcessChunkJob.php

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use App\Facades\VectorDatabase;
1010
use App\Models\File;
1111
use App\Services\VectorDatabase\Data\QdrantUpsertPayload;
12+
use Illuminate\Bus\Batchable;
1213
use Illuminate\Contracts\Queue\ShouldQueue;
1314
use Illuminate\Foundation\Queue\Queueable;
1415
use Illuminate\Queue\Middleware\RateLimited;
@@ -17,7 +18,7 @@
1718

1819
class ProcessChunkJob implements ShouldQueue
1920
{
20-
use Queueable;
21+
use Batchable, Queueable;
2122

2223
public $tries = 3;
2324

@@ -28,7 +29,7 @@ class ProcessChunkJob implements ShouldQueue
2829
*/
2930
public function __construct(
3031
public array $chunk,
31-
public File $file
32+
public int $fileId,
3233
) {}
3334

3435
public function middleware(): array
@@ -43,6 +44,8 @@ public function middleware(): array
4344
*/
4445
public function handle(): void
4546
{
47+
$file = File::findOrFail($this->fileId);
48+
4649
$embedding = Llm::embed($this->chunk['text']);
4750

4851
$uuid = Str::uuid();
@@ -52,7 +55,7 @@ public function handle(): void
5255
'id' => $uuid,
5356
'vector' => $embedding,
5457
'payload' => [
55-
'doc_id' => $this->file->path,
58+
'doc_id' => $file->path,
5659
'page' => $this->chunk['page'] ?? null,
5760
'chunk_index' => $this->chunk['chunk_index'],
5861
'text' => $this->chunk['text'],
@@ -69,26 +72,26 @@ public function handle(): void
6972

7073
Log::info('Chunk '.$this->chunk['chunk_index'].' processed successfully');
7174

72-
/* @phpstan-ignore-next-line */
73-
$this->file->processed_chunks = $this->file->processed_chunks + 1;
74-
$this->file->save();
75+
$file->increment('processed_chunks');
7576

76-
event(new FilesStatusUpdated($this->file));
77+
event(new FilesStatusUpdated($file));
7778

7879
// '595c678e-b6b3-4dac-8a51-b316cf03a50a';
7980
}
8081

8182
public function failed(\Throwable $exception): void
8283
{
84+
$file = File::findOrFail($this->fileId);
85+
8386
Log::error('Chunk failed: '.$exception->getMessage());
8487

8588
/* @phpstan-ignore-next-line */
86-
$this->file->status = FileStatus::FAILED;
89+
$file->status = FileStatus::FAILED;
8790
/* @phpstan-ignore-next-line */
88-
$this->file->embedding_status = FileStatus::FAILED;
89-
$this->file->save();
91+
$file->embedding_status = FileStatus::FAILED;
92+
$file->save();
9093

91-
event(new FileProgressUpdated($this->file));
94+
event(new FileProgressUpdated($file));
9295

9396
$this->release(10);
9497
}

app/Jobs/ProcessDocumentJob.php

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
use App\Models\File;
66
use App\Services\Pdf\PdfService;
7+
use Illuminate\Bus\Batch;
78
use Illuminate\Contracts\Queue\ShouldQueue;
89
use Illuminate\Foundation\Queue\Queueable;
910
use Illuminate\Support\Facades\Bus;
1011
use Illuminate\Support\Facades\Log;
12+
use Throwable;
1113

1214
class ProcessDocumentJob implements ShouldQueue
1315
{
@@ -17,15 +19,19 @@ class ProcessDocumentJob implements ShouldQueue
1719
* Create a new job instance.
1820
*/
1921
public function __construct(
20-
protected File $file,
22+
protected int $fileId,
2123
) {}
2224

2325
/**
2426
* Execute the job.
2527
*/
2628
public function handle(): void
2729
{
28-
$pdfText = app(PdfService::class)->getPdfText($this->file->path);
30+
$file = File::findOrFail($this->fileId);
31+
32+
$fileId = $file->id;
33+
34+
$pdfText = app(PdfService::class)->getPdfText($file->path);
2935

3036
$chunks = $this->chuckText($pdfText, 1500, 500);
3137

@@ -35,20 +41,29 @@ public function handle(): void
3541

3642
$chunks = array_slice($chunks, 0, 2);
3743

38-
foreach ($chunks as $chunk) {
39-
$jobs[] = new ProcessChunkJob($chunk, $this->file);
40-
}
41-
42-
$jobs[] = new FinalizeFileJob($this->file);
43-
4444
// $this->file->total_chunks = $chuckCount;
4545
/* @phpstan-ignore-next-line */
46-
$this->file->total_chunks = 2;
47-
$this->file->save();
46+
$file->total_chunks = 2;
47+
$file->save();
4848

4949
Log::info("Started processing $chuckCount chunks");
5050

51-
Bus::chain($jobs)->dispatch();
51+
$jobs = array_map(
52+
fn ($chunk) => new ProcessChunkJob($chunk, $file->id),
53+
$chunks
54+
);
55+
56+
Bus::batch($jobs)
57+
->then(function (Batch $batch) use ($fileId) {
58+
FinalizeFileJob::dispatch($fileId);
59+
})
60+
->catch(function (Batch $batch, Throwable $e) use ($fileId) {
61+
Log::error("Batch failed for file_id={$fileId}: ".$e->getMessage());
62+
})
63+
->finally(function (Batch $batch) use ($fileId) {
64+
Log::info("Batch finished for file_id={$fileId}. processed={$batch->processedJobs()} failed={$batch->failedJobs}");
65+
})
66+
->dispatch();
5267
}
5368

5469
protected function chuckText(
@@ -87,4 +102,19 @@ protected function chuckText(
87102

88103
return $chunks;
89104
}
105+
106+
protected function sequenceJobs(): void
107+
{
108+
// foreach ($chunks as $chunk) {
109+
// $jobs[] = new ProcessChunkJob($chunk, $this->file);
110+
// }
111+
112+
// $jobs[] = new FinalizeFileJob($this->file);
113+
114+
// $this->file->total_chunks = $chuckCount;
115+
// $this->file->total_chunks = 2;
116+
// $this->file->save();
117+
118+
// Bus::chain($jobs)->dispatch();
119+
}
90120
}

app/Jobs/ProcessFileJob.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@ class ProcessFileJob
1717
* Create a new job instance.
1818
*/
1919
public function __construct(
20-
protected File $file,
20+
protected int $fileId,
2121
) {}
2222

2323
/**
2424
* Execute the job.
2525
*/
2626
public function handle(): void
2727
{
28+
$file = File::findOrFail($this->fileId);
29+
2830
$parser = new Parser;
2931

3032
$pdf = $parser->parseFile(
31-
Storage::disk('public')->path($this->file->path)
33+
Storage::disk('public')->path($file->path)
3234
);
3335

3436
$pages = $pdf->getPages();
@@ -39,14 +41,14 @@ public function handle(): void
3941
$author = $details['Author'] ?? null;
4042
$title = $details['Title'] ?? null;
4143

42-
$this->file->update([
44+
$file->update([
4345
'name' => $title,
4446
'author' => $author,
4547
'pages' => $pagesCount,
4648
'status' => FileStatus::PROCESSING,
4749
]);
4850

4951
// event(new FileDetailsUpdated($this->file));
50-
ProcessDocumentJob::dispatch($this->file);
52+
ProcessDocumentJob::dispatch($file->id);
5153
}
5254
}

resources/js/pages/chats/Details.vue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ onMounted(async () => {
155155
<InputGroupAddon align="block-end" class="flex justify-end">
156156
<InputGroupButton
157157
variant="default"
158-
class="rounded-full"
158+
class="rounded-full cursor-pointer"
159159
size="icon-sm"
160160
type="submit"
161161
:disabled="processing || disabledButton"

resources/js/pages/chats/Index.vue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ const handleSuccess = () => {
9797
<InputGroupAddon align="block-end" class="flex justify-end">
9898
<InputGroupButton
9999
variant="default"
100-
class="rounded-full"
100+
class="rounded-full cursor-pointer"
101101
size="icon-sm"
102102
type="submit"
103103
:disabled="processing"

0 commit comments

Comments
 (0)