Skip to content

Commit 01991d1

Browse files
authored
fix backpressure with DoSnap (#168)
1 parent 5dd0706 commit 01991d1

File tree

5 files changed

+168
-58
lines changed

5 files changed

+168
-58
lines changed

deps/config/doc_gen.go

Lines changed: 24 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deps/config/types.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ func DefaultCurioConfig() *CurioConfig {
5555
MaxQueueSDR: 8, // default to 8 (will cause backpressure even if deal sectors are 0)
5656
MaxQueueTrees: 0, // default don't use this limit
5757
MaxQueuePoRep: 0, // default don't use this limit
58-
MaxDealWaitTime: Duration(1 * time.Hour),
58+
59+
MaxQueueSnapEncode: 16,
60+
MaxQueueSnapProve: 0,
61+
62+
MaxDealWaitTime: Duration(1 * time.Hour),
5963
},
6064
Alerting: CurioAlertingConfig{
6165
MinimumWalletBalance: types.MustParseFIL("5"),
@@ -424,22 +428,37 @@ type CurioIngestConfig struct {
424428
// The SDR queue includes deals which are in the process of entering the sealing pipeline. In case of the SDR tasks it is
425429
// possible that this queue grows more than this limit(CC sectors), the backpressure is only applied to sectors
426430
// entering the pipeline.
431+
// Only applies to PoRep pipeline (DoSnap = false)
427432
MaxQueueSDR int
428433

429434
// Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
430435
// 0 = unlimited
431436
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
432437
// In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
433438
// applied to sectors entering the pipeline.
439+
// Only applies to PoRep pipeline (DoSnap = false)
434440
MaxQueueTrees int
435441

436442
// Maximum number of sectors that can be queued waiting for PoRep to start processing.
437443
// 0 = unlimited
438444
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
439445
// Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
440446
// applied to sectors entering the pipeline.
447+
// Only applies to PoRep pipeline (DoSnap = false)
441448
MaxQueuePoRep int
442449

450+
// MaxQueueSnapEncode is the maximum number of sectors that can be queued waiting for UpdateEncode to start processing.
451+
// 0 means unlimited.
452+
// This applies backpressure to the market subsystem by delaying the ingestion of deal data.
453+
// Only applies to the Snap Deals pipeline (DoSnap = true).
454+
MaxQueueSnapEncode int
455+
456+
// MaxQueueSnapProve is the maximum number of sectors that can be queued waiting for UpdateProve to start processing.
457+
// 0 means unlimited.
458+
// This applies backpressure to the market subsystem by delaying the ingestion of deal data.
459+
// Only applies to the Snap Deals pipeline (DoSnap = true).
460+
MaxQueueSnapProve int
461+
443462
// Maximum time an open deal sector should wait for more deal before it starts sealing
444463
MaxDealWaitTime Duration
445464

documentation/en/configuration/default-curio-configuration.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ description: The default curio configuration
417417
# The SDR queue includes deals which are in the process of entering the sealing pipeline. In case of the SDR tasks it is
418418
# possible that this queue grows more than this limit(CC sectors), the backpressure is only applied to sectors
419419
# entering the pipeline.
420+
# Only applies to PoRep pipeline (DoSnap = false)
420421
#
421422
# type: int
422423
#MaxQueueSDR = 8
@@ -426,6 +427,7 @@ description: The default curio configuration
426427
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
427428
# In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
428429
# applied to sectors entering the pipeline.
430+
# Only applies to PoRep pipeline (DoSnap = false)
429431
#
430432
# type: int
431433
#MaxQueueTrees = 0
@@ -435,10 +437,27 @@ description: The default curio configuration
435437
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
436438
# Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
437439
# applied to sectors entering the pipeline.
440+
# Only applies to PoRep pipeline (DoSnap = false)
438441
#
439442
# type: int
440443
#MaxQueuePoRep = 0
441444

445+
# MaxQueueSnapEncode is the maximum number of sectors that can be queued waiting for UpdateEncode to start processing.
446+
# 0 means unlimited.
447+
# This applies backpressure to the market subsystem by delaying the ingestion of deal data.
448+
# Only applies to the Snap Deals pipeline (DoSnap = true).
449+
#
450+
# type: int
451+
#MaxQueueSnapEncode = 16
452+
453+
# MaxQueueSnapProve is the maximum number of sectors that can be queued waiting for UpdateProve to start processing.
454+
# 0 means unlimited.
455+
# This applies backpressure to the market subsystem by delaying the ingestion of deal data.
456+
# Only applies to the Snap Deals pipeline (DoSnap = true).
457+
#
458+
# type: int
459+
#MaxQueueSnapProve = 0
460+
442461
# Maximum time an open deal sector should wait for more deal before it starts sealing
443462
#
444463
# type: Duration

market/lmrpc/lmrpc.go

Lines changed: 98 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -542,66 +542,112 @@ func closeDataReader(pieceData storiface.Data) {
542542
}
543543

544544
func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig, ssize abi.SectorSize) (wait bool, err error) {
545-
var bufferedSDR, bufferedTrees, bufferedPoRep, waitDealSectors int
546-
err = tx.QueryRow(`
547-
WITH BufferedSDR AS (
548-
SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_sdr_count
549-
FROM sectors_sdr_pipeline p
550-
LEFT JOIN harmony_task t ON p.task_id_sdr = t.id
551-
WHERE p.after_sdr = false
552-
),
553-
BufferedTrees AS (
554-
SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count
555-
FROM sectors_sdr_pipeline p
556-
LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id
557-
WHERE p.after_sdr = true AND p.after_tree_r = false
558-
),
559-
BufferedPoRep AS (
560-
SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count
561-
FROM sectors_sdr_pipeline p
562-
LEFT JOIN harmony_task t ON p.task_id_porep = t.id
563-
WHERE p.after_tree_r = true AND p.after_porep = false
564-
),
565-
WaitDealSectors AS (
566-
SELECT COUNT(DISTINCT sip.sector_number) AS wait_deal_sectors_count
567-
FROM sectors_sdr_initial_pieces sip
568-
LEFT JOIN sectors_sdr_pipeline sp ON sip.sp_id = sp.sp_id AND sip.sector_number = sp.sector_number
569-
WHERE sp.sector_number IS NULL
570-
)
571-
SELECT
572-
(SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered_sdr,
573-
(SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count,
574-
(SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count,
575-
(SELECT wait_deal_sectors_count FROM WaitDealSectors) AS wait_deal_sectors_count
576-
`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep, &waitDealSectors)
577-
if err != nil {
578-
return false, xerrors.Errorf("counting buffered sectors: %w", err)
579-
}
580-
581545
var pieceSizes []abi.PaddedPieceSize
582546

583547
err = tx.Select(&pieceSizes, `SELECT piece_padded_size FROM parked_pieces WHERE complete = false;`)
584548
if err != nil {
585549
return false, xerrors.Errorf("getting in-process pieces: %w", err)
586550
}
587-
588551
sectors := sectorCount(pieceSizes, abi.PaddedPieceSize(ssize))
589-
if cfg.MaxQueueDealSector != 0 && waitDealSectors+sectors > cfg.MaxQueueDealSector {
590-
log.Debugw("backpressure", "reason", "too many wait deal sectors", "wait_deal_sectors", waitDealSectors, "max", cfg.MaxQueueDealSector)
591-
return true, nil
592-
}
593552

594-
if bufferedSDR > cfg.MaxQueueSDR {
595-
log.Debugw("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR)
596-
return true, nil
597-
}
598-
if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees {
599-
log.Debugw("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees)
600-
return true, nil
601-
}
602-
if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep {
603-
log.Debugw("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep)
604-
return true, nil
553+
if cfg.DoSnap {
554+
var bufferedEncode, bufferedProve, waitDealSectors int
555+
err = tx.QueryRow(`
556+
WITH BufferedEncode AS (
557+
SELECT COUNT(p.task_id_encode) - COUNT(t.owner_id) AS buffered_encode
558+
FROM sectors_snap_pipeline p
559+
LEFT JOIN harmony_task t ON p.task_id_encode = t.id
560+
WHERE p.after_encode = false
561+
),
562+
BufferedProve AS (
563+
SELECT COUNT(p.task_id_prove) - COUNT(t.owner_id) AS buffered_prove
564+
FROM sectors_snap_pipeline p
565+
LEFT JOIN harmony_task t ON p.task_id_prove = t.id
566+
WHERE p.after_prove = true AND p.after_move_storage = false
567+
),
568+
WaitDealSectors AS (
569+
SELECT COUNT(DISTINCT sip.sector_number) AS wait_deal_sectors_count
570+
FROM sectors_snap_initial_pieces sip
571+
LEFT JOIN curio.sectors_snap_pipeline sp ON sip.sp_id = sp.sp_id AND sip.sector_number = sp.sector_number
572+
WHERE sp.sector_number IS NULL
573+
)
574+
SELECT
575+
(SELECT buffered_encode FROM BufferedEncode) AS total_encode,
576+
(SELECT buffered_prove FROM BufferedProve) AS buffered_prove,
577+
(SELECT wait_deal_sectors_count FROM WaitDealSectors) AS wait_deal_sectors_count
578+
`).Scan(&bufferedEncode, &bufferedProve, &waitDealSectors)
579+
if err != nil {
580+
return false, xerrors.Errorf("counting buffered sectors: %w", err)
581+
}
582+
583+
if cfg.MaxQueueDealSector != 0 && waitDealSectors+sectors > cfg.MaxQueueDealSector {
584+
log.Infow("backpressure", "reason", "too many wait deal sectors", "wait_deal_sectors", waitDealSectors, "max", cfg.MaxQueueDealSector)
585+
return true, nil
586+
}
587+
588+
if cfg.MaxQueueSnapEncode != 0 && bufferedEncode > cfg.MaxQueueSnapEncode {
589+
log.Infow("backpressure", "reason", "too many encode tasks", "buffered", bufferedEncode, "max", cfg.MaxQueueSnapEncode)
590+
return true, nil
591+
}
592+
593+
if cfg.MaxQueueSnapProve != 0 && bufferedProve > cfg.MaxQueueSnapProve {
594+
log.Infow("backpressure", "reason", "too many prove tasks", "buffered", bufferedProve, "max", cfg.MaxQueueSnapProve)
595+
return
596+
}
597+
} else {
598+
var bufferedSDR, bufferedTrees, bufferedPoRep, waitDealSectors int
599+
err = tx.QueryRow(`
600+
WITH BufferedSDR AS (
601+
SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_sdr_count
602+
FROM sectors_sdr_pipeline p
603+
LEFT JOIN harmony_task t ON p.task_id_sdr = t.id
604+
WHERE p.after_sdr = false
605+
),
606+
BufferedTrees AS (
607+
SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count
608+
FROM sectors_sdr_pipeline p
609+
LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id
610+
WHERE p.after_sdr = true AND p.after_tree_r = false
611+
),
612+
BufferedPoRep AS (
613+
SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count
614+
FROM sectors_sdr_pipeline p
615+
LEFT JOIN harmony_task t ON p.task_id_porep = t.id
616+
WHERE p.after_tree_r = true AND p.after_porep = false
617+
),
618+
WaitDealSectors AS (
619+
SELECT COUNT(DISTINCT sip.sector_number) AS wait_deal_sectors_count
620+
FROM sectors_sdr_initial_pieces sip
621+
LEFT JOIN sectors_sdr_pipeline sp ON sip.sp_id = sp.sp_id AND sip.sector_number = sp.sector_number
622+
WHERE sp.sector_number IS NULL
623+
)
624+
SELECT
625+
(SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered_sdr,
626+
(SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count,
627+
(SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count,
628+
(SELECT wait_deal_sectors_count FROM WaitDealSectors) AS wait_deal_sectors_count
629+
`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep, &waitDealSectors)
630+
if err != nil {
631+
return false, xerrors.Errorf("counting buffered sectors: %w", err)
632+
}
633+
634+
if cfg.MaxQueueDealSector != 0 && waitDealSectors+sectors > cfg.MaxQueueDealSector {
635+
log.Infow("backpressure", "reason", "too many wait deal sectors", "wait_deal_sectors", waitDealSectors, "max", cfg.MaxQueueDealSector)
636+
return true, nil
637+
}
638+
639+
if bufferedSDR > cfg.MaxQueueSDR {
640+
log.Infow("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR)
641+
return true, nil
642+
}
643+
if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees {
644+
log.Infow("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees)
645+
return true, nil
646+
}
647+
if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep {
648+
log.Infow("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep)
649+
return true, nil
650+
}
605651
}
606652

607653
return false, nil

web/api/webrpc/sector.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,9 +419,14 @@ func (a *WebRPC) SectorResume(ctx context.Context, spid, id int) error {
419419
}
420420

421421
func (a *WebRPC) SectorRemove(ctx context.Context, spid, id int) error {
422-
_, err := a.deps.DB.Exec(ctx, `DELETE FROM sectors_sdr_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, id)
422+
_, err := a.deps.DB.Exec(ctx, `DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, spid, id)
423423
if err != nil {
424-
return xerrors.Errorf("failed to resume sector: %w", err)
424+
return xerrors.Errorf("failed to remove sector batch refs: %w", err)
425+
}
426+
427+
_, err = a.deps.DB.Exec(ctx, `DELETE FROM sectors_sdr_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, id)
428+
if err != nil {
429+
return xerrors.Errorf("failed to remove sector: %w", err)
425430
}
426431

427432
_, err = a.deps.DB.Exec(ctx, `INSERT INTO storage_removal_marks (sp_id, sector_num, sector_filetype, storage_id, created_at, approved, approved_at)

0 commit comments

Comments
 (0)