Skip to content

Commit 198e262

Browse files
committed
save prog
1 parent 627d410 commit 198e262

File tree

5 files changed

+177
-70
lines changed

5 files changed

+177
-70
lines changed

src/replicator/interface.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
import type { GossipHelia, ComponentProtocol } from '@/interface.js'
1+
import type { ComponentProtocol } from '@/interface.js'
22
import type { Replica } from '@/replica/index.js'
33
import { HLDB_PREFIX } from '@/utils/constants.js'
44
import type { Playable } from '@/utils/playable.js'
55
import type { Datastore } from 'interface-datastore'
66
import type { Blockstore } from 'interface-blockstore'
77
import type { Ed25519PeerId } from '@libp2p/interface/peer-id'
8+
import type { PubSub } from '@libp2p/interface/pubsub'
9+
import type { DualKadDHT } from '@libp2p/kad-dht'
810

911
export interface Config {
10-
ipfs: GossipHelia
12+
peerId: Ed25519PeerId
13+
pubsub: PubSub
14+
dht?: DualKadDHT
1115
replica: Replica
1216
datastore: Datastore
1317
blockstore: Blockstore

src/replicator/live/index.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import all from 'it-all'
22
import { start, stop } from '@libp2p/interface/startable'
33
import { base32 } from 'multiformats/bases/base32'
4-
import type { GossipHelia } from '@/interface'
54
import type { CID } from 'multiformats/cid'
6-
import type { SignedMessage, PublishResult } from '@libp2p/interface/pubsub'
5+
import type { SignedMessage, PublishResult, PubSub } from '@libp2p/interface/pubsub'
76

87
import { dagLinks, loadEntry, traverser } from '@/replica/traversal.js'
98
import { cidstring, parsedcid } from '@/utils/index.js'
@@ -19,11 +18,13 @@ import type { Config, ReplicatorModule } from '../interface.js'
1918
import * as Advert from './message.js'
2019
import protocol from './protocol.js'
2120
import type { Blockstore } from 'interface-blockstore'
21+
import type { Ed25519PeerId } from '@libp2p/interface/dist/src/peer-id/index.js'
2222

2323
const getSharedChannelTopic = (manifest: Manifest): string => `${protocol}${cidstring(manifest.address.cid)}`
2424

2525
export class LiveReplicator extends Playable {
26-
readonly ipfs: GossipHelia
26+
readonly localPeerId: Ed25519PeerId
27+
readonly pubsub: PubSub
2728
readonly blockstore: Blockstore
2829
readonly manifest: Manifest
2930
readonly replica: Replica
@@ -40,7 +41,8 @@ export class LiveReplicator extends Playable {
4041
readonly _onHeadsMessage: typeof onHeadsMessage
4142

4243
constructor ({
43-
ipfs,
44+
peerId,
45+
pubsub,
4446
replica,
4547
blockstore
4648
}: Config) {
@@ -72,7 +74,8 @@ export class LiveReplicator extends Playable {
7274
}
7375
super({ starting, stopping })
7476

75-
this.ipfs = ipfs
77+
this.localPeerId = peerId
78+
this.pubsub = pubsub
7679
this.blockstore = blockstore
7780
this.replica = replica
7881
this.manifest = replica.manifest
@@ -85,7 +88,7 @@ export class LiveReplicator extends Playable {
8588
this.#onReplicaHeadsUpdate = onReplicaHeadsUpdate.bind(this)
8689
this._onHeadsMessage = onHeadsMessage.bind(this)
8790

88-
this.shared = new Monitor(this.ipfs.libp2p, getSharedChannelTopic(this.manifest))
91+
this.shared = new Monitor(this.pubsub, getSharedChannelTopic(this.manifest))
8992
this.directs = new Map()
9093
}
9194

@@ -142,7 +145,7 @@ function onPeerJoin (
142145
evt: CustomEvent<PeerStatusChangeData>
143146
): void {
144147
const { peerId: remotePeerId } = evt.detail
145-
const direct = new Direct(this.ipfs.libp2p, remotePeerId)
148+
const direct = new Direct(this.pubsub, this.localPeerId, remotePeerId)
146149
direct.addEventListener(
147150
'peered',
148151
() => {
Lines changed: 38 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { start, stop } from '@libp2p/interface/startable'
33
import { Key } from 'interface-datastore'
44
import { MemoryDatastore, NamespaceDatastore } from 'datastore-core'
55

6-
import { LiveReplicator as Replicator } from '@/replicator/live/index.js'
6+
import { LiveReplicator } from '@/replicator/live/index.js'
77
import { Replica } from '@/replica/index.js'
88
import { StaticAccess as Access } from '@/access/static/index.js'
99
import staticAccessProtocol from '@/access/static/protocol.js'
@@ -14,39 +14,40 @@ import { getTestManifest } from '../test-utils/manifest.js'
1414
import { getTestIdentities, getTestIdentity } from '../test-utils/identities.js'
1515
import { basalEntry } from '@/entry/basal/index.js'
1616
import { basalIdentity } from '@/identity/basal/index.js'
17-
import type { Multiaddr } from '@multiformats/multiaddr'
18-
import { getDhtService, getIdentifyService, getPubsubService, type UsedServices } from '../test-utils/libp2p/services.js'
19-
import type { Helia } from '@helia/interface'
20-
import { createLibp2p, Libp2pOptions, type Libp2p } from 'libp2p'
21-
import { getLibp2pDefaults } from '../test-utils/libp2p/defaults.js'
22-
import { getPeerDiscovery } from '../test-utils/libp2p/peerDiscovery.js'
23-
import { createHelia } from 'helia'
24-
import { waitForMultiaddrs } from '../test-utils/network.js'
17+
import { getTestKeyChain } from 'test/test-utils/keychain.js'
18+
import { MemoryBlockstore } from 'blockstore-core'
19+
import type { Ed25519PeerId } from '@libp2p/interface/dist/src/peer-id/index.js'
20+
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
21+
import { getTestPubSubNetwork } from 'test/test-mocks/pubsub.js'
22+
import type { PubSub } from '@libp2p/interface/pubsub'
2523

26-
const testName = 'live-replicator'
27-
28-
type TestServices = UsedServices<'identify' | 'pubsub' | 'dht'>
24+
const testName = 'replicator/live'
2925

3026
describe(testName, () => {
3127
let
32-
helia1: Helia<Libp2p<TestServices>>,
33-
helia2: Helia<Libp2p<TestServices>>,
34-
libp2p1: Libp2p<TestServices>,
35-
libp2p2: Libp2p<TestServices>,
36-
addr2: Multiaddr[],
28+
id1: Ed25519PeerId,
29+
id2: Ed25519PeerId,
30+
pubsub1: PubSub,
31+
pubsub2: PubSub,
3732
replica1: Replica,
3833
replica2: Replica,
39-
replicator1: Replicator,
40-
replicator2: Replicator,
34+
replicator1: LiveReplicator,
35+
replicator2: LiveReplicator,
4136
testPaths1: TestPaths,
4237
testPaths2: TestPaths,
4338
access: Access,
4439
datastore: MemoryDatastore,
4540
datastore1: NamespaceDatastore,
46-
datastore2: NamespaceDatastore
41+
datastore2: NamespaceDatastore,
42+
blockstore: MemoryBlockstore
4743

4844
before(async () => {
49-
// debug.enable('libp2p:*')
45+
id1 = await createEd25519PeerId()
46+
id2 = await createEd25519PeerId()
47+
48+
const { createPubSubPeer } = getTestPubSubNetwork()
49+
pubsub1 = createPubSubPeer(id1)
50+
pubsub2 = createPubSubPeer(id2)
5051

5152
testPaths1 = getTestPaths(tempPath, testName + '/1')
5253
testPaths2 = getTestPaths(tempPath, testName + '/2')
@@ -55,43 +56,24 @@ describe(testName, () => {
5556
datastore1 = new NamespaceDatastore(datastore, new Key(testPaths1.replica))
5657
datastore2 = new NamespaceDatastore(datastore, new Key(testPaths2.replica))
5758

58-
const createLibp2pOptions = async (): Promise<Libp2pOptions<TestServices>> => ({
59-
...(await getLibp2pDefaults()),
60-
peerDiscovery: await getPeerDiscovery(),
61-
services: {
62-
identify: getIdentifyService(),
63-
pubsub: getPubsubService(),
64-
dht: getDhtService(true)
65-
}
66-
})
67-
68-
libp2p1 = await createLibp2p(await createLibp2pOptions())
69-
libp2p2 = await createLibp2p(await createLibp2pOptions())
70-
71-
await Promise.all([
72-
waitForMultiaddrs(libp2p1),
73-
waitForMultiaddrs(libp2p2)
74-
])
75-
76-
helia1 = await createHelia({ libp2p: libp2p1 })
77-
helia2 = await createHelia({ libp2p: libp2p2 })
78-
79-
addr2 = libp2p2.getMultiaddrs()
80-
8159
const identities1 = await getTestIdentities(testPaths1)
8260
const identities2 = await getTestIdentities(testPaths2)
61+
const keychain1 = getTestKeyChain()
62+
const keychain2 = getTestKeyChain()
8363

8464
const identity1 = await getTestIdentity(
8565
identities1,
86-
libp2p1.keychain,
66+
keychain1,
8767
testName
8868
)
8969
const identity2 = await getTestIdentity(
9070
identities2,
91-
libp2p2.keychain,
71+
keychain2,
9272
testName
9373
)
9474

75+
blockstore = new MemoryBlockstore()
76+
9577
const write = [identity1.id, identity2.id]
9678
const accessConfig = {
9779
access: { protocol: staticAccessProtocol, config: { write } }
@@ -104,7 +86,7 @@ describe(testName, () => {
10486
replica1 = new Replica({
10587
manifest,
10688
datastore: datastore1,
107-
blockstore: helia1.blockstore,
89+
blockstore,
10890
access,
10991
identity: identity1,
11092
components: {
@@ -115,7 +97,7 @@ describe(testName, () => {
11597
replica2 = new Replica({
11698
manifest,
11799
datastore: datastore2,
118-
blockstore: helia2.blockstore,
100+
blockstore,
119101
access,
120102
identity: identity2,
121103
components: {
@@ -125,35 +107,31 @@ describe(testName, () => {
125107
})
126108
await start(replica1, replica2)
127109

128-
replicator1 = new Replicator({
129-
ipfs: helia1,
110+
replicator1 = new LiveReplicator({
111+
peerId: id1,
112+
pubsub: pubsub1,
130113
replica: replica1,
131114
datastore: datastore1,
132-
blockstore: helia1.blockstore
115+
blockstore
133116
})
134-
replicator2 = new Replicator({
135-
ipfs: helia2,
117+
replicator2 = new LiveReplicator({
118+
peerId: id2,
119+
pubsub: pubsub2,
136120
replica: replica2,
137121
datastore: datastore2,
138-
blockstore: helia2.blockstore
122+
blockstore
139123
})
140124
})
141125

142126
after(async () => {
143127
await stop(access)
144128
await stop(replicator1, replicator2)
145129
await stop(replica1, replica2)
146-
await stop(helia1)
147-
await stop(helia2)
148130
})
149131

150132
describe('instance', () => {
151133
before(async () => {
152134
await start(replicator1, replicator2)
153-
await Promise.all([
154-
libp2p1.dial(addr2),
155-
new Promise(resolve => { libp2p2.addEventListener('peer:connect', resolve, { once: true }) })
156-
])
157135
})
158136

159137
it('exposes instance properties', () => {

test/replicator/zzzync.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { createHelia } from 'helia'
3030
import { getPeerDiscovery } from '../test-utils/libp2p/peerDiscovery.js'
3131
import { waitForMultiaddrs } from '../test-utils/network.js'
3232

33-
const testName = 'zzzync-replicator'
33+
const testName = 'replicator/zzzync'
3434
const token = process.env.W3_TOKEN as string
3535

3636
const noToken = token == null

test/test-mocks/dht.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { EventTypes, type KadDHT, type QueryEvent, type ProviderEvent, type QueryOptions } from '@libp2p/kad-dht'
2+
import type { Ed25519PeerId, PeerId } from '@libp2p/interface/peer-id'
3+
import type { CID } from 'multiformats'
4+
import { base32 } from 'multiformats/dist/types/src/bases/base32'
5+
import { peerIdFromString } from '@libp2p/peer-id'
6+
7+
type Mode = 'client' | 'server'
8+
9+
const cid2key = (cid: CID): Uint8Array => cid.multihash.bytes
10+
const key2str = (key: Uint8Array): string => base32.encode(key)
11+
const cid2str = (cid: CID): string => key2str(cid2key(cid))
12+
// const str2key = (str: string): Uint8Array => base32.decode(str)
13+
14+
class TestKadDht implements KadDHT {
15+
mode: 'client' | 'server'
16+
17+
constructor (
18+
readonly peerId: Ed25519PeerId,
19+
readonly peers: Set<string>,
20+
readonly providers: Map<string, Set<string>>,
21+
mode: Mode
22+
) {
23+
this.mode = mode
24+
}
25+
26+
/**
27+
* Get a value from the DHT, the final ValueEvent will be the best value
28+
*/
29+
async * get (key: Uint8Array, options?: QueryOptions): AsyncIterable<QueryEvent> {}
30+
31+
/**
32+
* Find providers of a given CID
33+
*/
34+
async * findProviders (key: CID, options?: QueryOptions): AsyncIterable<QueryEvent> {
35+
const providers = this.providers.get(cid2str(key))
36+
37+
if (providers == null) {
38+
return
39+
}
40+
41+
const providerEvent: ProviderEvent = {
42+
from: this.peerId,
43+
type: EventTypes.PROVIDER,
44+
name: 'PROVIDER',
45+
providers: [...providers].map(peerId => ({ id: peerIdFromString(peerId), multiaddrs: [], protocols: [] }))
46+
}
47+
48+
yield providerEvent
49+
}
50+
51+
/**
52+
* Find a peer on the DHT
53+
*/
54+
async * findPeer (id: PeerId, options?: QueryOptions): AsyncIterable<QueryEvent> {}
55+
56+
/**
57+
* Find the closest peers to the passed key
58+
*/
59+
async * getClosestPeers (key: Uint8Array, options?: QueryOptions): AsyncIterable<QueryEvent> {}
60+
61+
/**
62+
* Store provider records for the passed CID on the DHT pointing to us
63+
*/
64+
async * provide (key: CID, options?: QueryOptions): AsyncIterable<QueryEvent> {
65+
const providers = this.providers.get(cid2str(key)) ?? new Set()
66+
67+
const providerEvent: ProviderEvent = {
68+
from: this.peerId,
69+
type: EventTypes.PROVIDER,
70+
name: 'PROVIDER',
71+
providers: [...providers].map(provider => ({
72+
id: peerIdFromString(provider),
73+
multiaddrs: [],
74+
protocols: []
75+
}))
76+
}
77+
78+
providers.add(this.peerId.toString())
79+
this.providers.set(cid2str(key), providers)
80+
81+
yield providerEvent
82+
}
83+
84+
/**
85+
* Store the passed value under the passed key on the DHT
86+
*/
87+
async * put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): AsyncIterable<QueryEvent> {}
88+
89+
/**
90+
* Returns the mode this node is in
91+
*/
92+
async getMode (): Promise<Mode> {
93+
return this.mode
94+
}
95+
96+
/**
97+
* If 'server' this node will respond to DHT queries, if 'client' this node will not
98+
*/
99+
async setMode (mode: Mode): Promise<void> {
100+
this.mode = mode
101+
}
102+
103+
/**
104+
* Force a routing table refresh
105+
*/
106+
async refreshRoutingTable (): Promise<void> {}
107+
}
108+
109+
interface TestDhtNetwork {
110+
createDhtPeer: (peerId: Ed25519PeerId, mode: Mode) => TestKadDht
111+
}
112+
113+
export const getTestDhtNetwork = (): TestDhtNetwork => {
114+
const peers: Set<string> = new Set()
115+
const providers: Map<string, Set<string>> = new Map()
116+
117+
return {
118+
createDhtPeer (peerId: Ed25519PeerId, mode: Mode) {
119+
return new TestKadDht(peerId, peers, providers, mode)
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)