Skip to content

Commit e27b019

Browse files
committed
finish libp2p init, fix tests
1 parent 55fcc28 commit e27b019

File tree

15 files changed

+670
-531
lines changed

15 files changed

+670
-531
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ debug: build
118118
all: build
119119
.PHONY: all
120120

121-
build: curio
121+
build: curio sptool
122122
@[[ $$(type -P "curio") ]] && echo "Caution: you have \
123123
an existing curio binary in your PATH. This may cause problems if you don't run 'sudo make install'" || true
124124

cmd/curio/rpc/rpc.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ func (p *CurioAPI) LogSetLevel(ctx context.Context, subsystem, level string) err
250250
}
251251

252252
func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error {
253-
log.Errorf("ENTERED RPC SERVER")
254253
fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}}
255254
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
256255
if !auth.HasPerm(r.Context(), nil, lapi.PermAdmin) {
@@ -262,8 +261,6 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
262261
fh.ServeHTTP(w, r)
263262
}
264263

265-
log.Errorf("CREATED STORAGE HANDLER")
266-
267264
var authVerify func(context.Context, string) ([]auth.Permission, error)
268265
{
269266
privateKey, err := base64.StdEncoding.DecodeString(dependencies.Cfg.Apis.StorageRPCSecret)

cmd/curio/run.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ var runCmd = &cli.Command{
130130
}
131131
defer taskEngine.GracefullyTerminate()
132132

133-
log.Infof("WILL START RPC SERVER NOW")
134133
err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown.
135134
if err != nil {
136135
return err

cmd/curio/tasks/tasks.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/filecoin-project/curio/lib/curiochain"
2828
"github.com/filecoin-project/curio/lib/fastparamfetch"
2929
"github.com/filecoin-project/curio/lib/ffi"
30+
"github.com/filecoin-project/curio/lib/libp2p"
3031
"github.com/filecoin-project/curio/lib/multictladdr"
3132
"github.com/filecoin-project/curio/lib/paths"
3233
"github.com/filecoin-project/curio/lib/slotmgr"
@@ -221,15 +222,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
221222
dealFindTask := storage_market.NewFindDealTask(dm, db, full, &cfg.Market.StorageMarketConfig.MK12)
222223
activeTasks = append(activeTasks, psdTask, dealFindTask)
223224

224-
//p2pMap, err := libp2p.NewLibp2pHost(ctx, db, cfg)
225-
//if err != nil {
226-
// return nil, err
227-
//}
228-
//
229-
//for _, h := range p2pMap {
230-
// dp := libp2pimpl.NewDealProvider(h, db, dm.MK12Handler, full)
231-
// go dp.Start(ctx)
232-
//}
225+
// Start libp2p hosts and handle streams
226+
err = libp2p.NewDealProvider(ctx, db, cfg, dm.MK12Handler, full, machine)
227+
if err != nil {
228+
return nil, err
229+
}
233230

234231
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg)
235232
activeTasks = append(activeTasks, indexingTask)

deps/config/doc_gen.go

Lines changed: 34 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deps/config/types.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func DefaultCurioConfig() *CurioConfig {
7878
InsertBatchSize: 15000,
7979
},
8080
MK12: MK12Config{
81-
Miners: []string{},
81+
Libp2p: []Libp2pConfig{},
8282
PublishMsgPeriod: Duration(30 * time.Minute),
8383
MaxDealsPerPublishMsg: 8,
8484
MaxPublishDealsFee: types.MustParseFIL("5 FIL"),
@@ -575,8 +575,9 @@ type StorageMarketConfig struct {
575575
}
576576

577577
type MK12Config struct {
578-
// Miners is a list of miner addresses to enable MK12 deals(Boost) for
579-
Miners []string
578+
// Libp2p is a list of libp2p config for each miner ID. These values must be set explicitly
579+
// for each miner ID.
580+
Libp2p []Libp2pConfig
580581

581582
// When a deal is ready to publish, the amount of time to wait for more
582583
// deals to be ready to publish before publishing them all as a batch
@@ -617,3 +618,18 @@ type IndexingConfig struct {
617618
// Number of concurrent inserts to split AddIndex calls to
618619
InsertConcurrency int
619620
}
621+
622+
type Libp2pConfig struct {
623+
// Miners ID for which MK12 deals (boosts) should be enabled and associated with this libp2p configuration.
624+
Miner string
625+
// Binding address for the libp2p host - 0 means random port.
626+
// Format: multiaddress; see https://multiformats.io/multiaddr/
627+
ListenAddresses []string
628+
// Addresses to explicitally announce to other peers. If not specified,
629+
// all interface addresses are announced
630+
// Format: multiaddress
631+
AnnounceAddresses []string
632+
// Addresses to not announce
633+
// Format: multiaddress
634+
NoAnnounceAddresses []string
635+
}

documentation/en/configuration/default-curio-configuration.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,10 +412,11 @@ description: The default curio configuration
412412
#InsertConcurrency = 8
413413

414414
[Market.StorageMarketConfig.MK12]
415-
# Miners is a list of miner addresses to enable MK12 deals(Boost) for
415+
# Libp2p is a list of libp2p config for each miner ID. These values must be set explicitly
416+
# for each miner ID.
416417
#
417-
# type: []string
418-
#Miners = []
418+
# type: []Libp2pConfig
419+
#Libp2p = []
419420

420421
# When a deal is ready to publish, the amount of time to wait for more
421422
# deals to be ready to publish before publishing them all as a batch

harmony/harmonydb/sql/20240228-piece-park.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ create table parked_piece_refs (
3434
data_headers jsonb not null default '{}',
3535

3636
-- host Added in 202240730-market-migrations.sql
37-
host text,
37+
-- host text,
3838

3939
foreign key (piece_id) references parked_pieces(id) on delete cascade
4040
);

harmony/harmonydb/sql/20240730-market-migration.sql

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,75 @@ CREATE TABLE market_offline_urls (
240240
unique (piece_cid)
241241
);
242242

243-
CREATE TABLE libp2p_keys (
243+
-- This table is used for coordinating libp2p nodes
244+
CREATE TABLE libp2p (
244245
sp_id BIGINT NOT NULL,
245246
priv_key BYTEA NOT NULL,
246-
listen_address TEXT NOT NULL,
247-
announce_address TEXT NOT NULL,
248-
no_announce_address TEXT NOT NULL
247+
listen_address TEXT DEFAULT NULL,
248+
announce_address TEXT DEFAULT NULL,
249+
no_announce_address TEXT DEFAULT NULL,
250+
running_on TEXT DEFAULT NULL,
251+
updated_at TIMESTAMPTZ DEFAULT NULL,
252+
253+
-- sp_id, priv_key must be unique for miner<>peerID combo uniqueness
254+
-- announce address should be unique to avoid having 2 peerID reachable on same address
255+
constraint market_libp2p_identity_key unique (sp_id, priv_key, announce_address)
249256
);
250257

258+
-- -- Function used to update the libp2p table
259+
CREATE OR REPLACE FUNCTION insert_or_update_libp2p(
260+
_sp_id BIGINT,
261+
_listen_address TEXT,
262+
_announce_address TEXT,
263+
_no_announce_address TEXT,
264+
_running_on TEXT
265+
)
266+
RETURNS BYTEA AS $$
267+
DECLARE
268+
_priv_key BYTEA;
269+
_current_running_on TEXT;
270+
_current_updated_at TIMESTAMPTZ;
271+
BEGIN
272+
-- Check if the sp_id exists and retrieve the current values
273+
SELECT priv_key, running_on, updated_at INTO _priv_key, _current_running_on, _current_updated_at
274+
FROM libp2p
275+
WHERE sp_id = _sp_id;
276+
277+
-- Raise an exception if no row was found
278+
IF NOT FOUND THEN
279+
RAISE EXCEPTION 'libp2p key for sp_id "%" does not exist', _sp_id;
280+
END IF;
281+
282+
-- If the sp_id exists and running_on is NULL or matches _running_on
283+
IF _current_running_on IS NULL OR _current_running_on = _running_on THEN
284+
-- Update the record with the provided values and set updated_at to NOW
285+
UPDATE libp2p
286+
SET
287+
listen_address = _listen_address,
288+
announce_address = _announce_address,
289+
no_announce_address = _no_announce_address,
290+
running_on = _running_on,
291+
updated_at = NOW() AT TIME ZONE 'UTC'
292+
WHERE sp_id = _sp_id;
293+
ELSIF _current_updated_at > NOW() - INTERVAL '10 seconds' THEN
294+
-- Raise an exception if running_on is different and updated_at is recent
295+
RAISE EXCEPTION 'Libp2p node already running on "%"', _current_running_on;
296+
ELSE
297+
-- Update running_on and other columns if updated_at is older than 10 seconds
298+
UPDATE libp2p
299+
SET
300+
listen_address = _listen_address,
301+
announce_address = _announce_address,
302+
no_announce_address = _no_announce_address,
303+
running_on = _running_on,
304+
updated_at = NOW() AT TIME ZONE 'UTC'
305+
WHERE sp_id = _sp_id;
306+
END IF;
307+
308+
RETURN _priv_key;
309+
END;
310+
$$ LANGUAGE plpgsql;
311+
251312
-- Add host column to allow local file based
252313
-- piece park
253314
ALTER TABLE parked_piece_refs
@@ -275,9 +336,6 @@ CREATE TABLE market_legacy_deals (
275336
publish_cid TEXT NOT NULL,
276337
chain_deal_id BIGINT NOT NULL,
277338

278-
piece_cid TEXT NOT NULL,
279-
piece_size BIGINT NOT NULL,
280-
281339
fast_retrieval BOOLEAN NOT NULL,
282340

283341
created_at TIMESTAMPTZ NOT NULL,

itests/alertnow_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ func TestAlertNow(t *testing.T) {
2020
tp,
2121
}
2222
// Create dependencies
23-
db, err := harmonydb.NewFromConfigWithITestID(t, "alertnow")
23+
sharedITestID := harmonydb.ITestNewID()
24+
db, err := harmonydb.NewFromConfigWithITestID(t, sharedITestID)
25+
2426
require.NoError(t, err)
2527

2628
an := alertmanager.NewAlertNow(db, "alertNowMachine")
@@ -33,7 +35,7 @@ func TestAlertNow(t *testing.T) {
3335
done, err := at.Do(123, func() bool { return true })
3436
require.NoError(t, err)
3537
require.True(t, done)
36-
require.Equal(t, "alertNowMachine: testMessage", tp.output)
38+
require.Equal(t, "Machine alertNowMachine: testMessage", tp.output)
3739
}
3840

3941
// testPlugin is a test plugin
@@ -42,6 +44,6 @@ type testPlugin struct {
4244
}
4345

4446
func (tp *testPlugin) SendAlert(data *plugin.AlertPayload) error {
45-
tp.output = data.Summary
47+
tp.output = data.Details["NowCheck"].(string)
4648
return nil
4749
}

0 commit comments

Comments
 (0)