@@ -178,132 +178,29 @@ const hash = await beeThreads.worker<typeof hashPassword>('./workers/hash-passwo
178178
179179## ` worker().turbo() ` - File Workers + Parallel Arrays
180180
181- ** The killer feature for data-intensive applications.**
182-
183- When you have thousands (or millions) of records that need to be enriched with data from databases, APIs, or external services β ` worker().turbo() ` distributes the workload across multiple workers, each with its own connection pool.
184-
185- ### Real-World Example: E-commerce Order Enrichment
181+ Process large arrays with ** database access** across multiple workers. Each worker has its own connection pool.
186182
187183``` ts
188- // workers/enrich-orders.ts
189- import { prisma } from ' ../lib/prisma'
190- import { redis } from ' ../lib/redis'
191- import { stripe } from ' ../lib/stripe'
192-
193- interface Order {
194- id: string
195- userId: string
196- productIds: string []
197- paymentIntentId: string
198- }
199-
200- interface EnrichedOrder extends Order {
201- user: { name: string ; email: string ; tier: string }
202- products: { id: string ; name: string ; price: number ; stock: number }[]
203- payment: { status: string ; amount: number ; currency: string }
204- cached: boolean
205- }
184+ // workers/process-users.ts
185+ import { db } from ' ../database'
186+ import { calculateScore } from ' ../scoring'
206187
207- export default async function (orders : Order []): Promise <EnrichedOrder []> {
188+ export default async function (users : User []): Promise <ProcessedUser []> {
208189 return Promise .all (
209- orders .map (async order => {
210- // Check Redis cache first
211- const cached = await redis .get (` order:${order .id }:enriched ` )
212- if (cached ) return { ... JSON .parse (cached ), cached: true }
213-
214- // Parallel fetches for each order
215- const [user, products, payment] = await Promise .all ([
216- prisma .user .findUnique ({
217- where: { id: order .userId },
218- select: { name: true , email: true , tier: true },
219- }),
220- prisma .product .findMany ({
221- where: { id: { in: order .productIds } },
222- select: { id: true , name: true , price: true , stock: true },
223- }),
224- stripe .paymentIntents .retrieve (order .paymentIntentId ),
225- ])
226-
227- const enriched: EnrichedOrder = {
228- ... order ,
229- user: user ! ,
230- products ,
231- payment: {
232- status: payment .status ,
233- amount: payment .amount ,
234- currency: payment .currency ,
235- },
236- cached: false ,
237- }
238-
239- // Cache for 5 minutes
240- await redis .setex (` order:${order .id }:enriched ` , 300 , JSON .stringify (enriched ))
241-
242- return enriched
243- })
190+ users .map (async user => ({
191+ ... user ,
192+ score: await calculateScore (user ),
193+ data: await db .fetch (user .id ),
194+ }))
244195 )
245196}
246- ```
247-
248- ``` ts
249- // main.ts - Enrich 50,000 orders across 8 workers
250- import { beeThreads } from ' bee-threads'
251197
252- const orders = await prisma .order .findMany ({
253- where: { status: ' pending_enrichment' },
254- take: 50_000 ,
255- })
256-
257- // Each worker has its own Prisma, Redis, and Stripe connections
258- // 50,000 orders Γ· 8 workers = ~6,250 orders per worker (in parallel!)
259- const enrichedOrders = await beeThreads .worker (' ./workers/enrich-orders' ).turbo (orders , { workers: 8 })
260-
261- console .log (` Enriched ${enrichedOrders .length } orders ` )
262- // β Enriched 50000 orders (in ~15 seconds instead of ~2 minutes)
263- ```
264-
265- ### How It Works
266-
267- ```
268- βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
269- β 50,000 orders to enrich β
270- βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
271- β
272- ββββββββββββ΄βββββββββββ
273- β SPLIT (8 chunks) β
274- ββββββββββββ¬βββββββββββ
275- β
276- βββββββββββββ¬ββββββββββββ¬ββββββββ΄ββββββββ¬ββββββββββββ¬ββββββββββββ
277- βΌ βΌ βΌ βΌ βΌ βΌ
278- βββββββββ βββββββββ βββββββββ βββββββββ βββββββββ βββββββββ
279- βWorker1β βWorker2β βWorker3β ... βWorker6β βWorker7β βWorker8β
280- β 6,250 β β 6,250 β β 6,250 β β 6,250 β β 6,250 β β 6,250 β
281- βorders β βorders β βorders β βorders β βorders β βorders β
282- βββββββββ€ βββββββββ€ βββββββββ€ βββββββββ€ βββββββββ€ βββββββββ€
283- βPrisma β βPrisma β βPrisma β βPrisma β βPrisma β βPrisma β
284- βRedis β βRedis β βRedis β βRedis β βRedis β βRedis β
285- βStripe β βStripe β βStripe β βStripe β βStripe β βStripe β
286- βββββββββ βββββββββ βββββββββ βββββββββ βββββββββ βββββββββ
287- β β β β β β
288- βββββββββββββ΄ββββββββββββ΄ββββββββ¬ββββββββ΄ββββββββββββ΄ββββββββββββ
289- βΌ
290- ββββββββββββββββββββββββ
291- β MERGE (order kept) β
292- β 50,000 enriched β
293- ββββββββββββββββββββββββ
198+ // main.ts - 10,000 users across 8 workers
199+ const results = await beeThreads .worker (' ./workers/process-users' ).turbo (users , { workers: 8 })
294200```
295201
296202> ** Default workers:** ` os.cpus().length - 1 ` (if not specified)
297203
298- ### Use Cases
299-
300- | Scenario | Without worker().turbo() | With worker().turbo() |
301- | ------------------------------- | ----------------------------- | ------------------------------ |
302- | 50K orders enrichment | ~ 2 min (sequential) | ** ~ 15 sec** (8 workers) |
303- | 100K users + ML scoring | ~ 5 min | ** ~ 40 sec** |
304- | 1M records ETL pipeline | ~ 30 min | ** ~ 4 min** |
305- | Batch payment processing | I/O bound, single connection | ** Parallel connections** |
306-
307204### When to Use
308205
309206| Need | Use |
0 commit comments