Skip to content

Commit 33c6192

Browse files
committed
remove indexing table, add defaults
1 parent 48e953d commit 33c6192

File tree

12 files changed

+249
-311
lines changed

12 files changed

+249
-311
lines changed

.circleci/config.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ jobs:
144144
description: Flags passed to go test.
145145
target:
146146
type: string
147-
default: "./..."
147+
default: "./... | grep -v itests"
148148
description: Import paths of packages to be tested.
149149
proofs-log-test:
150150
type: string
@@ -276,9 +276,8 @@ workflows:
276276
get-params: true
277277
resource_class: 2xlarge
278278
- test:
279-
name: test-idxStore
279+
name: test-all
280280
requires:
281281
- build
282-
suite: idxStore
283-
target: "./lib/indexing/indexstore/indexstore_test.go"
282+
suite: test-all
284283
get-params: true

cmd/curio/market.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ var marketImportdataCmd = &cli.Command{
186186
}
187187

188188
// Insert the offline deal into the deal pipeline
189-
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (url, file_size)
189+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (url, raw_size)
190190
VALUES ($1, $2) WHERE uuid = $3 ON CONFLICT (uuid) DO NOTHING`,
191191
pieceIDUrl, rawSize)
192192
if err != nil {

deps/config/types.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@ func DefaultCurioConfig() *CurioConfig {
7474
InsertBatchSize: 15000,
7575
},
7676
MK12: MK12Config{
77-
Miners: []string{},
77+
Miners: []string{},
78+
PublishMsgPeriod: Duration(30 * time.Minute),
79+
MaxDealsPerPublishMsg: 8,
80+
MaxPublishDealsFee: types.MustParseFIL("5 FIL"),
81+
ExpectedPoRepSealDuration: Duration(8 * time.Hour),
82+
ExpectedSnapSealDuration: Duration(2 * time.Hour),
7883
},
7984
},
8085
},
@@ -551,7 +556,7 @@ type StorageMarketConfig struct {
551556
}
552557

553558
type MK12Config struct {
554-
// Miners is a list of miner to enable MK12 deals(Boost) for
559+
// Miners is a list of miner addresses to enable MK12 deals(Boost) for
555560
Miners []string
556561

557562
// When a deal is ready to publish, the amount of time to wait for more
@@ -567,10 +572,13 @@ type MK12Config struct {
567572
// The maximum fee to pay when sending the PublishStorageDeals message
568573
MaxPublishDealsFee types.FIL
569574

570-
// ExpectedSealDuration is the expected time it would take to seal the deal sector
575+
// ExpectedPoRepSealDuration is the expected time it would take to seal the deal sector
571576
// This will be used to fail the deals which cannot be sealed on time.
572-
// Please make sure to update this to shorter duration for snap deals
573-
ExpectedSealDuration Duration
577+
ExpectedPoRepSealDuration Duration
578+
579+
// ExpectedSnapSealDuration is the expected time it would take to snap the deal sector
580+
// This will be used to fail the deals which cannot be sealed on time.
581+
ExpectedSnapSealDuration Duration
574582

575583
// SkipCommP can be used to skip doing a commP check before PublishDealMessage is sent on chain
576584
// Warning: If this check is skipped and there is a commP mismatch, all deals in the

harmony/harmonydb/sql/20240730-market-migration.sql

Lines changed: 112 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -37,33 +37,14 @@ CREATE TABLE market_mk12_deals (
3737
unique (signed_proposal_cid)
3838
);
3939

40-
-- Table for old lotus market deals. This is just for deal
41-
-- which are still alive. It should not be used for any processing
42-
CREATE TABLE market_legacy_deals (
43-
signed_proposal_cid TEXT,
44-
proposal_signature BYTEA,
45-
proposal jsonb,
46-
piece_cid TEXT,
47-
piece_size BIGINT,
48-
offline BOOLEAN,
49-
verified BOOLEAN,
50-
sp_id BIGINT,
51-
start_epoch BIGINT,
52-
end_epoch BIGINT,
53-
publish_cid TEXT,
54-
fast_retrieval BOOLEAN,
55-
chain_deal_id BIGINT,
56-
created_at TIMESTAMPTZ,
57-
sector_num BIGINT,
58-
59-
primary key (sp_id, piece_cid, signed_proposal_cid)
60-
);
61-
6240
-- This table is used for storing piece metadata (piece indexing)
6341
CREATE TABLE market_piece_metadata (
6442
piece_cid TEXT NOT NULL PRIMARY KEY,
65-
version INT NOT NULL DEFAULT 2,
43+
44+
version INT NOT NULL DEFAULT 2
45+
6646
created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
47+
6748
indexed BOOLEAN NOT NULL DEFAULT FALSE,
6849
indexed_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
6950

@@ -103,21 +84,30 @@ CREATE OR REPLACE FUNCTION process_piece_deal(
10384
_piece_offset BIGINT,
10485
_piece_length BIGINT,
10586
_raw_size BIGINT,
87+
_indexed BOOLEAN,
10688
_legacy_deal BOOLEAN DEFAULT FALSE,
10789
_chain_deal_id BIGINT DEFAULT 0
10890
)
10991
RETURNS VOID AS $$
11092
BEGIN
111-
INSERT INTO market_piece_metadata (piece_cid, indexed) VALUES (_piece_cid, TRUE)
112-
ON CONFLICT (piece_cid) DO UPDATE SET indexed = TRUE;
93+
-- Insert or update the market_piece_metadata table
94+
INSERT INTO market_piece_metadata (piece_cid, indexed)
95+
VALUES (_piece_cid, _indexed)
96+
ON CONFLICT (piece_cid) DO UPDATE SET
97+
indexed = CASE
98+
WHEN market_piece_metadata.indexed = FALSE THEN EXCLUDED.indexed
99+
ELSE market_piece_metadata.indexed
100+
END;
113101

102+
-- Insert into the market_piece_deal table
114103
INSERT INTO market_piece_deal (
115104
id, piece_cid, boost_deal, legacy_deal, chain_deal_id,
116105
sp_id, sector_num, piece_offset, piece_length, raw_size
117-
) VALUES (
118-
_id, _piece_cid, _boost_deal, _legacy_deal, _chain_deal_id,
119-
_sp_id, _sector_num, _piece_offset, _piece_length, _raw_size
120-
) ON CONFLICT (sp_id, piece_cid, id) DO NOTHING;
106+
) VALUES (
107+
_id, _piece_cid, _boost_deal, _legacy_deal, _chain_deal_id,
108+
_sp_id, _sector_num, _piece_offset, _piece_length, _raw_size
109+
) ON CONFLICT (sp_id, piece_cid, id) DO NOTHING;
110+
121111
END;
122112
$$ LANGUAGE plpgsql;
123113

@@ -147,7 +137,7 @@ CREATE TABLE market_mk12_deal_pipeline (
147137

148138
piece_cid TEXT NOT NULL,
149139
piece_size BIGINT NOT NULL,
150-
file_size BIGINT DEFAULT NULL, -- raw piece size
140+
raw_size BIGINT DEFAULT NULL,
151141

152142
offline BOOLEAN NOT NULL,
153143

@@ -166,14 +156,78 @@ CREATE TABLE market_mk12_deal_pipeline (
166156
after_find_deal BOOLEAN DEFAULT FALSE,
167157

168158
sector BIGINT,
159+
reg_seal_proof INT NOT NULL,
169160
sector_offset BIGINT,
170161

171162
sealed BOOLEAN DEFAULT FALSE,
163+
164+
should_index BOOLEAN DEFAULT FALSE,
165+
indexing_created_at TIMESTAMPTZ,
166+
indexing_task_id BIGINT DEFAULT NULL,
172167
indexed BOOLEAN DEFAULT FALSE,
173168

169+
complete BOOLEAN NOT NULL DEFAULT FALSE,
170+
174171
constraint market_mk12_deal_pipeline_identity_key unique (uuid)
175172
);
176173

174+
-- This function creates indexing task based from move_storage tasks
175+
CREATE OR REPLACE FUNCTION create_indexing_task(task_id BIGINT, sealing_table TEXT)
176+
RETURNS VOID AS $$
177+
DECLARE
178+
query TEXT; -- Holds the dynamic SQL query
179+
pms RECORD; -- Holds each row returned by the query in the loop
180+
BEGIN
181+
-- Construct the dynamic SQL query based on the sealing_table
182+
IF sealing_table = 'sectors_sdr_pipeline' THEN
183+
query := format(
184+
'SELECT
185+
dp.uuid,
186+
ssp.reg_seal_proof
187+
FROM
188+
%I ssp
189+
JOIN
190+
market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector
191+
WHERE
192+
ssp.task_id_move_storage = $1', sealing_table);
193+
ELSIF sealing_table = 'sectors_snap_pipeline' THEN
194+
query := format(
195+
'SELECT
196+
dp.uuid,
197+
(SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = ssp.sp_id AND sector_num = ssp.sector_num) AS reg_seal_proof
198+
FROM
199+
%I ssp
200+
JOIN
201+
market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector
202+
WHERE
203+
ssp.task_id_move_storage = $1', sealing_table);
204+
ELSE
205+
RAISE EXCEPTION 'Invalid sealing_table name: %', sealing_table;
206+
END IF;
207+
208+
-- Execute the dynamic SQL query with the task_id parameter
209+
FOR pms IN EXECUTE query USING task_id
210+
LOOP
211+
-- Update the market_mk12_deal_pipeline table with the reg_seal_proof and indexing_created_at values
212+
UPDATE market_mk12_deal_pipeline
213+
SET
214+
reg_seal_proof = pms.reg_seal_proof,
215+
indexing_created_at = NOW() AT TIME ZONE 'UTC'
216+
WHERE
217+
uuid = pms.uuid;
218+
END LOOP;
219+
220+
-- If everything is successful, simply exit
221+
RETURN;
222+
223+
EXCEPTION
224+
WHEN OTHERS THEN
225+
-- Rollback the transaction and raise the exception for Go to catch
226+
ROLLBACK;
227+
RAISE EXCEPTION 'Failed to create indexing task: %', SQLERRM;
228+
END;
229+
$$ LANGUAGE plpgsql;
230+
177231
-- This table can be used to track remote piece for offline deals
178232
-- The entries must be created by users
179233
CREATE TABLE market_offline_urls (
@@ -187,27 +241,6 @@ CREATE TABLE market_offline_urls (
187241
unique (piece_cid)
188242
);
189243

190-
-- indexing tracker is separate from
191-
CREATE TABLE market_indexing_tasks (
192-
uuid TEXT NOT NULL,
193-
194-
sp_id BIGINT NOT NULL,
195-
sector_number BIGINT NOT NULL,
196-
reg_seal_proof INT NOT NULL,
197-
198-
piece_offset BIGINT NOT NULL,
199-
piece_size BIGINT NOT NULL,
200-
raw_size BIGINT NOT NULL,
201-
piece_cid TEXT NOT NULL,
202-
203-
created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
204-
205-
task_id BIGINT DEFAULT NULL,
206-
207-
constraint market_indexing_tasks_identity_key
208-
unique (id, sp_id, sector_number, piece_offset, piece_size, piece_cid, reg_seal_proof)
209-
);
210-
211244
CREATE TABLE libp2p_keys (
212245
sp_id BIGINT NOT NULL,
213246
priv_key BYTEA NOT NULL,
@@ -216,33 +249,38 @@ CREATE TABLE libp2p_keys (
216249
no_announce_address TEXT NOT NULL
217250
);
218251

252+
-- Add host column to allow local file based
253+
-- piece park
219254
ALTER TABLE parked_piece_refs
220255
ADD COLUMN host text;
221256

222-
create table file_parked_pieces (
223-
id bigserial primary key,
224-
created_at timestamp default current_timestamp,
225-
piece_cid text not null,
226-
piece_padded_size bigint not null,
227-
piece_raw_size bigint not null,
228-
complete boolean not null default false,
229-
task_id bigint default null,
230-
cleanup_task_id bigint default null,
231-
unique (piece_cid)
232-
);
257+
-- Table for old lotus market deals. This is just for deal
258+
-- which are still alive. It should not be used for any processing
259+
CREATE TABLE market_legacy_deals (
260+
signed_proposal_cid TEXT,
261+
sp_id BIGINT,
233262

234-
/*
235-
* This table is used to keep track of the references to the file parked pieces
236-
* so that we can delete them when they are no longer needed.
237-
*
238-
* All references into the file_parked_pieces table should be done through this table.
239-
*/
240-
create table file_parked_piece_refs (
241-
ref_id bigserial primary key,
242-
piece_id bigint not null,
243-
data_url text not null,
244-
node text not null,
245-
foreign key (piece_id) references file_parked_pieces(id) on delete cascade
263+
proposal_signature BYTEA,
264+
proposal jsonb,
265+
266+
piece_cid TEXT,
267+
piece_size BIGINT,
268+
269+
offline BOOLEAN,
270+
verified BOOLEAN,
271+
272+
start_epoch BIGINT,
273+
end_epoch BIGINT,
274+
275+
publish_cid TEXT,
276+
chain_deal_id BIGINT,
277+
278+
fast_retrieval BOOLEAN,
279+
280+
created_at TIMESTAMPTZ,
281+
sector_num BIGINT,
282+
283+
primary key (sp_id, piece_cid, signed_proposal_cid)
246284
);
247285

248286

market/mk12/mk12.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,18 +436,19 @@ func (m *MK12) processDeal(ctx context.Context, deal *ProviderDealState) (*Provi
436436
Opaque: fmt.Sprintf("%d", refID),
437437
}
438438

439-
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline, url, file_size)
440-
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (uuid) DO NOTHING`,
441-
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl, deal.Transfer.Size)
439+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline, url, raw_size, should_index)
440+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (uuid) DO NOTHING`,
441+
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl, deal.Transfer.Size,
442+
deal.FastRetrieval)
442443
if err != nil {
443444
return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err)
444445
}
445446

446447
} else {
447448
// Insert the offline deal into the deal pipeline
448-
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline)
449-
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (uuid) DO NOTHING`,
450-
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline)
449+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline, should_index)
450+
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (uuid) DO NOTHING`,
451+
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, deal.FastRetrieval)
451452
if err != nil {
452453
return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err)
453454
}

0 commit comments

Comments
 (0)