diff --git a/index.js b/index.js index 6b6d421..12b950c 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ const sub = require('subleveldown') const grpc = require('@grpc/grpc-js') const bjson = require('buffer-json-encoding') const processTop = require('process-top') +const varint = require('varint') const Corestore = require('corestore') const HypercoreCache = require('hypercore-cache') const SwarmNetworker = require('corestore-swarm-networking') @@ -22,6 +23,7 @@ const PeersManager = require('./lib/peers') const DebugManager = require('./lib/debug') const FuseManager = require('./lib/fuse') const { serverError } = require('./lib/errors') +const { getHandlers } = require('./lib/common') const log = require('./lib/log').child({ component: 'server' }) @@ -139,7 +141,7 @@ class HyperdriveDaemon extends EventEmitter { const dbs = { fuse: sub(this.db, 'fuse', { valueEncoding: bjson }), drives: sub(this.db, 'drives', { valueEncoding: bjson }), - profiles: sub(this.db, 'profiles', { valueEncoding: 'json' }) + network: sub(this.db, 'network', { valueEncoding: 'json'}) } this._dbs = dbs @@ -261,51 +263,52 @@ class HyperdriveDaemon extends EventEmitter { }) } - createMainHandlers () { - return { - status: async call => { - const rsp = new rpc.main.messages.StatusResponse() - rsp.setApiversion(apiVersion) - rsp.setUptime(Date.now() - this._startTime) - if (this._versions) { - rsp.setDaemonversion(this._versions.daemon) - rsp.setClientversion(this._versions.client) - rsp.setSchemaversion(this._versions.schema) - rsp.setHyperdriveversion(this._versions.hyperdrive) - rsp.setNoisekey(this.noiseKeyPair.publicKey) - - const swarm = this.networking && this.networking.swarm - if (swarm) { - const remoteAddress = swarm.remoteAddress() - rsp.setHolepunchable(swarm.holepunchable()) - rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '') - } + // RPC Methods + + async _rpcStatus (call) { + const rsp = new rpc.main.messages.StatusResponse() + rsp.setApiversion(apiVersion) + rsp.setUptime(Date.now() - this._startTime) + if (this._versions) { + rsp.setDaemonversion(this._versions.daemon) + rsp.setClientversion(this._versions.client) + rsp.setSchemaversion(this._versions.schema) + rsp.setHyperdriveversion(this._versions.hyperdrive) + rsp.setNoisekey(this.noiseKeyPair.publicKey) + + const swarm = this.networking && this.networking.swarm + if (swarm) { + const remoteAddress = swarm.remoteAddress() + rsp.setHolepunchable(swarm.holepunchable()) + rsp.setRemoteaddress(remoteAddress ? remoteAddress.host + ':' + remoteAddress.port : '') + } - if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative) - if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse) + if (this._versions.fuseNative) rsp.setFusenativeversion(this._versions.fuseNative) + if (this._versions.hyperdriveFuse) rsp.setHyperdrivefuseversion(this._versions.hyperdriveFuse) - if (hyperfuse) { - rsp.setFuseavailable(true) - rsp.setFuseconfigured(this.fuse.fuseConfigured) - } else { - rsp.setFuseavailable(false) - rsp.setFuseconfigured(false) - } - } - return rsp - }, - refreshFuse: async call => { - await this.fuse.ready() - if (this.fuse && this.fuse.fuseConfigured) { - hyperfuse = require('hyperdrive-fuse') - this._versions.fuseNative = require('fuse-native/package.json').version - this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version - } - return new rpc.main.messages.FuseRefreshResponse() + if (hyperfuse) { + rsp.setFuseavailable(true) + rsp.setFuseconfigured(this.fuse.fuseConfigured) + } else { + rsp.setFuseavailable(false) + rsp.setFuseconfigured(false) } } + return rsp } + async _rpcRefreshFuse (call) { + await this.fuse.ready() + if (this.fuse && this.fuse.fuseConfigured) { + hyperfuse = require('hyperdrive-fuse') + this._versions.fuseNative = require('fuse-native/package.json').version + this._versions.hyperdriveFuse = require('hyperdrive-fuse/package.json').version + } + return new rpc.main.messages.FuseRefreshResponse() + } + + // Public Methods + get uptime () { if (!this._startTime) return 0 return Date.now() - this._startTime @@ -370,24 +373,24 @@ class HyperdriveDaemon extends EventEmitter { this.server = new grpc.Server() this.server.addService(rpc.fuse.services.FuseService, { - ...wrap(this.metadata, this.fuse.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.fuse), { authenticate: true }) }) this.server.addService(rpc.drive.services.DriveService, { - ...wrap(this.metadata, this.drives.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.drives), { authenticate: true }) }) this.server.addService(rpc.peersockets.services.PeersocketsService, { - ...wrap(this.metadata, this.peersockets.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.peersockets), { authenticate: true }) }) this.server.addService(rpc.peers.services.PeersService, { - ...wrap(this.metadata, this.peers.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.peers), { authenticate: true }) }) if (this.debug) { this.server.addService(rpc.debug.services.DebugService, { - ...wrap(this.metadata, this.debug.getHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this.debug), { authenticate: true }) }) } this.server.addService(rpc.main.services.HyperdriveService, { - ...wrap(this.metadata, this.createMainHandlers(), { authenticate: true }) + ...wrap(this.metadata, getHandlers(this), { authenticate: true }) }) await new Promise((resolve, reject) => { diff --git a/lib/common.js b/lib/common.js index e69de29..b7323ec 100644 --- a/lib/common.js +++ b/lib/common.js @@ -0,0 +1,36 @@ +const collectStream = require('stream-collector') + +function getHandlers (manager) { + const handlers = {} + const rpcMethods = Object.getOwnPropertyNames(manager.__proto__).filter(methodName => methodName.startsWith('_rpc')) + for (let methodName of rpcMethods) { + let rpcMethodName = methodName.slice(4) + rpcMethodName = rpcMethodName[0].toLowerCase() + rpcMethodName.slice(1) + handlers[rpcMethodName] = manager[methodName].bind(manager) + } + return handlers +} + +function dbCollect (index, opts) { + return new Promise((resolve, reject) => { + collectStream(index.createReadStream(opts), (err, list) => { + if (err) return reject(err) + return resolve(list) + }) + }) +} + +async function dbGet (db, idx) { + try { + return await db.get(idx) + } catch (err) { + if (err && !err.notFound) throw err + return null + } +} + +module.exports = { + getHandlers, + dbCollect, + dbGet +} diff --git a/lib/drives/index.js b/lib/drives/index.js index 1370254..ae4af96 100644 --- a/lib/drives/index.js +++ b/lib/drives/index.js @@ -1,12 +1,11 @@ -const crypto = require('crypto') -const { EventEmitter } = require('events') - const hyperdrive = require('hyperdrive') -const collectStream = require('stream-collector') -const sub = require('subleveldown') -const bjson = require('buffer-json-encoding') +const hypercoreCrypto = require('hypercore-crypto') const datEncoding = require('dat-encoding') const pump = require('pump') +const sub = require('subleveldown') +const bjson = require('buffer-json-encoding') +const collectStream = require('stream-collector') +const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter') const { Transform } = require('streamx') const { @@ -14,25 +13,27 @@ const { fromStat, fromMount, fromMetadata, + fromDriveConfiguration, fromNetworkConfiguration, + toNetworkConfiguration, toHyperdriveOptions, toStat, toMount, toMountInfo, toDriveStats, toDiffEntry, - toNetworkConfiguration, setFileStats, toChunks } = require('hyperdrive-daemon-client/lib/common') const { rpc } = require('hyperdrive-daemon-client') -const ArrayIndex = require('./array-index.js') +const ArrayIndex = require('./array-index') +const { dbCollect, dbGet } = require('../common') const log = require('../log').child({ component: 'drive-manager' }) const TRIE_UPDATER_SYMBOL = Symbol('hyperdrive-daemon-trie-updater') -class DriveManager extends EventEmitter { +class DriveManager extends Nanoresource { constructor (corestore, networking, db, opts = {}) { super() @@ -41,44 +42,55 @@ class DriveManager extends EventEmitter { this.db = db this.opts = opts this.watchLimit = opts.watchLimit - this.noAnnounce = !!opts.noAnnounce this.memoryOnly = !!opts.memoryOnly - this._driveIndex = sub(this.db, 'drives', { valueEncoding: bjson }) - this._seedIndex = sub(this.db, 'seeding', { valueEncoding: 'json' }) - this._namespaceIndex = sub(this.db, 'namespaces', { valueEncoding: 'utf8' }) + const dbs = DriveManager.generateSubDbs(db) + this._driveIndex = dbs.drives + this._mirrorIndex = dbs.mirrors + this._seedIndex = dbs.seeding + + this._transientSeedIndex = new Map() this._drives = new Map() this._checkouts = new Map() this._watchers = new Map() this._sessionsByKey = new Map() - this._transientSeedIndex = new Map() this._configuredMounts = new Set() this._sessions = new ArrayIndex() this._downloads = new ArrayIndex() + this._mirrors = new Map() this._watchCount = 0 + } - this._readyPromise = null + ready () { + return this.open() + } - this.ready = () => { - if (this._readyPromise) return this._readyPromise - this._readyPromise = Promise.all([ - this._rejoin() - ]) - return this._readyPromise - } + async _open () { + return Promise.all([ + this._rejoin(), + this._remirror() + ]) } async _rejoin () { if (this.noAnnounce) return - const driveList = await collect(this._seedIndex) - for (const { key: discoveryKey, value: networkOpts } of driveList) { + const seedList = await dbCollect(this._seedIndex) + for (const { key: discoveryKey, value: networkOpts } of seedList) { const opts = networkOpts && networkOpts.opts if (!opts || !opts.announce) continue this.networking.join(discoveryKey, { ...networkOpts.opts }) } } + async _remirror () { + const mirrorList = await dbCollect(this._mirrorIndex) + for (const { key } of mirrorList) { + const drive = await this.get(key) + await this._startMirroring(drive) + } + } + _generateKeyString (key, opts) { var keyString = (key instanceof Buffer) ? key.toString('hex') : key if (opts && opts.version) keyString = keyString + '+' + opts.version @@ -86,27 +98,28 @@ class DriveManager extends EventEmitter { return keyString } - async _getNamespace (keyString) { - if (!keyString) return null - try { - const namespace = await this._namespaceIndex.get('by-drive/' + keyString) - return namespace - } catch (err) { - if (!err.notFound) throw err - return null - } + async _startMirroring (drive) { + // A mirrored drive should never be closed. + const { session: mirrorSession } = await this.createSession(drive) + const unmirror = drive.mirror() + const driveKey = drive.key.toString('hex') + this._mirrors.set(driveKey, { + session: mirrorSession, + unmirror + }) + // Only the key is relevant, but gets for valid keys shouldn't return null. + await this._mirrorIndex.put(driveKey, 'mirroring') + log.info({ discoveryKey: drive.discoveryKey.toString('hex') }, 'mirroring drive') } - async _createNamespace (keyString) { - const namespace = crypto.randomBytes(32).toString('hex') - try { - var existing = await this._namespaceIndex.get('by-namespace/' + namespace) - } catch (err) { - if (!err.notFound) throw err - existing = null - } - if (existing) return this._createNamespace(keyString) - return namespace + async _stopMirroring (drive) { + const driveKey = drive.key.toString('hex') + const mirrorInfo = this._mirrors.get(driveKey) + if (!mirrorInfo) return null + this._mirrors.delete(driveKey) + mirrorInfo.unmirror() + this.closeSession(mirrorInfo.session) + return this._mirrorIndex.del(driveKey) } driveForSession (sessionId) { @@ -180,6 +193,72 @@ class DriveManager extends EventEmitter { }) } + async configureNetwork (feed, opts = {}) { + const self = this + const encodedKey = datEncoding.encode(feed.discoveryKey) + const networkOpts = { + lookup: !!opts.lookup, + announce: !!opts.announce, + remember: !!opts.remember + } + const seeding = opts.lookup || opts.announce + var networkingPromise + + const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) + // If all the networking options are the same, exit early. + if (sameConfig) return + + const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } + if (opts.remember) { + if (seeding) await this._seedIndex.put(encodedKey, networkConfig) + else await this._seedIndex.del(encodedKey) + } else { + this._transientSeedIndex.set(encodedKey, networkConfig) + } + + // Failsafe + if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + + try { + if (seeding) { + networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) + } else { + networkingPromise = this.networking.leave(feed.discoveryKey) + } + networkingPromise.then(configurationSuccess) + networkingPromise.catch(configurationError) + } catch (err) { + configurationError(err) + } + + function sameNetworkConfig (discoveryKey, opts = {}) { + const swarmStatus = self.networking.status(discoveryKey) + if (!swarmStatus) return opts.lookup === false && opts.announce === false + return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup + } + + function configurationError (err) { + log.error({ err, discoveryKey: encodedKey }, 'network configuration error') + } + + function configurationSuccess () { + log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') + } + } + + async getNetworkConfiguration (discoveryKey) { + const encodedKey = datEncoding.encode(discoveryKey) + const networkOpts = this._transientSeedIndex.get(encodedKey) || await dbGet(this._seedIndex, encodedKey) + if (networkOpts) return networkOpts.opts + return null + } + + async getAllNetworkConfigurations () { + const storedConfigurations = (await dbCollect(this._seedIndex)).map(({ key, value }) => [key, value]) + const transientConfigurations = [...this._transientSeedIndex] + return new Map([...storedConfigurations, ...transientConfigurations]) + } + async getAllStats (opts) { const allStats = [] for (const [, drive] of this._drives) { @@ -236,13 +315,7 @@ class DriveManager extends EventEmitter { } listDrives () { - return collect(this._driveIndex) - } - - async getAllNetworkConfigurations () { - const storedConfigurations = (await collect(this._seedIndex)).map(({ key, value }) => [key, value]) - const transientConfigurations = [...this._transientSeedIndex] - return new Map([...storedConfigurations, ...transientConfigurations]) + return dbCollect(this._driveIndex) } async get (key, opts = {}) { @@ -268,11 +341,10 @@ class DriveManager extends EventEmitter { var unlisteners = [] if (!drive) { - var namespace = await this._getNamespace(keyString) - if (!namespace) namespace = await this._createNamespace(keyString) + const randomNamespace = hypercoreCrypto.randomBytes(32).toString('hex') drive = hyperdrive(this.corestore, key, { - ...driveOpts, - namespace + namespace: randomNamespace, + ...driveOpts }) const errorListener = err => log.error(err) @@ -313,21 +385,14 @@ class DriveManager extends EventEmitter { key = datEncoding.encode(drive.key) keyString = this._generateKeyString(key, opts) - if (namespace) { - await this._namespaceIndex.batch([ - { type: 'put', key: 'by-namespace/' + namespace, value: keyString }, - { type: 'put', key: 'by-drive/' + keyString, value: namespace } - ]) - } - var initialConfig // TODO: Need to fully work through all the default networking behaviors. if (opts.fuseNetwork) { - // TODO: The Network drive does not announce or remember any settings for now. - initialConfig = { lookup: true, announce: false, remember: false } + // TODO: The Network drive does not announce any settings for now. + initialConfig = { lookup: true, announce: false } await this.configureNetwork(drive.metadata, initialConfig) } else if (!drive.writable || opts.seed) { - initialConfig = { lookup: true, announce: false, remember: true } + initialConfig = { lookup: true, announce: false } await this.configureNetwork(drive.metadata, initialConfig) } @@ -338,27 +403,19 @@ class DriveManager extends EventEmitter { const mountKey = feed.key.toString('hex') log.info({ discoveryKey: feed.discoveryKey.toString('hex') }, 'registering mountpoint in drive index') - const parentConfig = (await this.getNetworkConfiguration(drive)) || initialConfig || {} - const existingMountConfig = (await this.getNetworkConfiguration(feed)) || {} + const parentConfig = (await this.getNetworkConfiguration(drive.discoveryKey)) || initialConfig || {} + const existingMountConfig = (await this.getNetworkConfiguration(feed.discoveryKey)) || {} const mountConfig = { lookup: (existingMountConfig.lookup !== false) && (parentConfig.lookup !== false), - announce: !!(existingMountConfig.announce || parentConfig.announce), - remember: true + announce: !!(existingMountConfig.announce || parentConfig.announce) } if (mountConfig) await this.configureNetwork(feed, mountConfig) this.emit('configured-mount', feed.key) this._configuredMounts.add(mountKey) - try { - await this._driveIndex.get(mountKey) - } catch (err) { - if (err && !err.notFound) log.error({ error: err }, 'error registering mountpoint in drive index') - try { - await this._driveIndex.put(mountKey, mountInfo) - } catch (err) { - log.error({ error: err }, 'could not register mountpoint in drive index') - } - } + + const existingConfig = await dbGet(this._driveIndex, mountKey) + if (!existingConfig) await this._driveIndex.put(mountKey, mountInfo) } drive.on('mount', mountListener) unlisteners.push(() => drive.removeAllListeners('mount')) @@ -388,734 +445,686 @@ class DriveManager extends EventEmitter { return checkout || drive } - async configureNetwork (feed, opts = {}) { - const self = this - const encodedKey = datEncoding.encode(feed.discoveryKey) - const networkOpts = { - lookup: !!opts.lookup, - announce: !!opts.announce, - remember: !!opts.remember - } - const seeding = opts.lookup || opts.announce - var networkingPromise + download (drive, path) { + const dl = drive.download(path) + return this._downloads.insert(dl) + } - const sameConfig = sameNetworkConfig(feed.discoveryKey, opts) - // If all the networking options are the same, exit early. - if (sameConfig) return + // RPC Methods + async _rpcVersion (call) { + const id = call.request.getId() - const networkConfig = { key: datEncoding.encode(feed.key), opts: networkOpts } - if (opts.remember) { - if (seeding) await this._seedIndex.put(encodedKey, networkConfig) - else await this._seedIndex.del(encodedKey) - } else { - this._transientSeedIndex.set(encodedKey, networkConfig) - } + if (!id) throw new Error('A version request must specify a session ID.') + const drive = this.driveForSession(id) - // Failsafe - if (networkOpts.announce && this.noAnnounce) networkOpts.announce = false + const rsp = new rpc.drive.messages.DriveVersionResponse() + rsp.setVersion(drive.version) - try { - if (seeding) { - networkingPromise = this.networking.join(feed.discoveryKey, networkOpts) - } else { - networkingPromise = this.networking.leave(feed.discoveryKey) - } - networkingPromise.then(configurationSuccess) - networkingPromise.catch(configurationError) - } catch (err) { - configurationError(err) - } + return rsp + } - function sameNetworkConfig (discoveryKey, opts = {}) { - const swarmStatus = self.networking.status(discoveryKey) - if (!swarmStatus) return opts.lookup === false && opts.announce === false - return swarmStatus.announce === opts.announce && swarmStatus.lookup === opts.lookup - } + async _rpcGet (call) { + var driveOpts = fromHyperdriveOptions(call.request.getOpts()) - function configurationError (err) { - log.error({ err, discoveryKey: encodedKey }, 'network configuration error') - } + const { drive, session } = await this.createSession(null, driveOpts.key, driveOpts) + driveOpts.key = drive.key + driveOpts.discoveryKey = drive.discoveryKey + driveOpts.version = drive.version + driveOpts.writable = drive.writable - function configurationSuccess () { - log.debug({ discoveryKey: encodedKey }, 'network configuration succeeded') - } - } + const rsp = new rpc.drive.messages.GetDriveResponse() + rsp.setId(session) + rsp.setOpts(toHyperdriveOptions(driveOpts)) - async getNetworkConfiguration (drive) { - const encodedKey = datEncoding.encode(drive.discoveryKey) - const networkOpts = this._transientSeedIndex.get(encodedKey) - if (networkOpts) return networkOpts.opts - try { - const persistentOpts = await this._seedIndex.get(encodedKey) - return persistentOpts.opts - } catch (err) { - return null - } + return rsp } - download (drive, path) { - const dl = drive.download(path) - return this._downloads.insert(dl) + async _rpcAllStats (call) { + const networkingOnly = call.request.getNetworkingonly() + var stats = await this.getAllStats({ networkingOnly }) + stats = stats.map(driveStats => toDriveStats(driveStats)) + + const rsp = new rpc.drive.messages.StatsResponse() + rsp.setStatsList(stats) + + return rsp } - getHandlers () { - return { - version: async (call) => { - const id = call.request.getId() + async _rpcAllNetworkConfigurations (call) { + const networkConfigurations = await this.getAllNetworkConfigurations() - if (!id) throw new Error('A version request must specify a session ID.') - const drive = this.driveForSession(id) + const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() + rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ + ...value.opts, + key: Buffer.from(value.key, 'hex') + }))) - const rsp = new rpc.drive.messages.DriveVersionResponse() - rsp.setVersion(drive.version) - - return rsp - }, - - get: async (call) => { - var driveOpts = fromHyperdriveOptions(call.request.getOpts()) - - const { drive, session } = await this.createSession(null, driveOpts.key, driveOpts) - driveOpts.key = drive.key - driveOpts.discoveryKey = drive.discoveryKey - driveOpts.version = drive.version - driveOpts.writable = drive.writable - - const rsp = new rpc.drive.messages.GetDriveResponse() - rsp.setId(session) - rsp.setOpts(toHyperdriveOptions(driveOpts)) - - return rsp - }, - - allStats: async (call) => { - const networkingOnly = call.request.getNetworkingonly() - var stats = await this.getAllStats({ networkingOnly }) - stats = stats.map(driveStats => toDriveStats(driveStats)) - - const rsp = new rpc.drive.messages.StatsResponse() - rsp.setStatsList(stats) - - return rsp - }, - - allNetworkConfigurations: async (call) => { - const networkConfigurations = await this.getAllNetworkConfigurations() - - const rsp = new rpc.drive.messages.NetworkConfigurationsResponse() - rsp.setConfigurationsList([...networkConfigurations].map(([, value]) => toNetworkConfiguration({ - ...value.opts, - key: Buffer.from(value.key, 'hex') - }))) - - return rsp - }, - - peerCounts: async (call) => { - const rsp = new rpc.drive.messages.PeerCountsResponse() - const keys = call.request.getKeysList() - if (!keys) return rsp - - const counts = [] - for (let key of keys) { - key = Buffer.from(key) - if (this.corestore.isLoaded(key)) { - const core = this.corestore.get(key) - const openPeers = core.peers.filter(p => p.remoteOpened) - counts.push(openPeers.length) - } else { - counts.push(0) - } - } + return rsp + } - rsp.setPeercountsList(counts) - return rsp - }, + async _rpcPeerCounts (call) { + const rsp = new rpc.drive.messages.PeerCountsResponse() + const keys = call.request.getKeysList() + if (!keys) return rsp + + const counts = [] + for (let key of keys) { + key = Buffer.from(key) + if (this.corestore.isLoaded(key)) { + const core = this.corestore.get(key) + const openPeers = core.peers.filter(p => p.remoteOpened) + counts.push(openPeers.length) + } else { + counts.push(0) + } + } - configureNetwork: async (call) => { - const id = call.request.getId() + rsp.setPeercountsList(counts) + return rsp + } - if (!id) throw new Error('A network configuration request must specify a session ID.') - const drive = this.driveForSession(id) - const opts = fromNetworkConfiguration(call.request.getNetwork()) + async _rpcConfigureNetwork (call) { + const id = call.request.getId() - await this.configureNetwork(drive.metadata, { ...opts }) + if (!id) throw new Error('A network configuration request must specify a session ID.') + const drive = this.driveForSession(id) + const opts = fromNetworkConfiguration(call.request.getNetwork()) - const rsp = new rpc.drive.messages.ConfigureNetworkResponse() - return rsp - }, + await this.configureNetwork(drive.metadata, { ...opts }) - stats: async (call) => { - const id = call.request.getId() + const rsp = new rpc.drive.messages.ConfigureNetworkResponse() + return rsp + } - if (!id) throw new Error('A stats request must specify a session ID.') - const drive = this.driveForSession(id) + async _rpcStats (call) { + const id = call.request.getId() - const recursive = call.request.getRecursive() - const networkingOnly = call.request.getNetworkingonly() - const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) - const networkConfig = await this.getNetworkConfiguration(drive) + if (!id) throw new Error('A stats request must specify a session ID.') + const drive = this.driveForSession(id) - const rsp = new rpc.drive.messages.DriveStatsResponse() - rsp.setStats(toDriveStats(driveStats)) - if (networkConfig) rsp.setNetwork(toNetworkConfiguration(networkConfig)) - return rsp - }, + const recursive = call.request.getRecursive() + const networkingOnly = call.request.getNetworkingonly() + const driveStats = await this.getDriveStats(drive, { recursive, networkingOnly }) + const networkConfig = await this.getNetworkConfiguration(drive.discoveryKey) - download: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + const rsp = new rpc.drive.messages.DriveStatsResponse() + rsp.setStats(toDriveStats(driveStats)) + if (networkConfig) rsp.setNetwork(toNetworkConfiguration(networkConfig)) + return rsp + } - if (!id) throw new Error('A download request must specify a session ID.') - const drive = this.driveForSession(id) - const downloadId = this.download(drive, path) + async _rpcMirror (call) { + const id = call.request.getId() - const rsp = new rpc.drive.messages.DownloadResponse() - rsp.setDownloadid(downloadId) - return rsp - }, + if (!id) throw new Error('A mirror request must specify a session ID.') + const drive = this.driveForSession(id) + await this._startMirroring(drive) - undownload: async (call) => { - const id = call.request.getId() - const downloadId = call.request.getDownloadid() + const rsp = new rpc.drive.messages.MirrorResponse() + return rsp + } - if (!id) throw new Error('An undownload request must specify a session ID.') - if (!downloadId) throw new Error('An undownload request must specify a download ID.') + async _rpcUnmirror (call) { + const id = call.request.getId() - const dl = this._downloads.get(downloadId) - if (dl) dl.destroy() - this._downloads.delete(downloadId) + if (!id) throw new Error('An unmirror request must specify a session ID.') + const drive = this.driveForSession(id) + await this._stopMirroring(drive) - return new rpc.drive.messages.UndownloadResponse() - }, + const rsp = new rpc.drive.messages.UnmirrorResponse() + return rsp + } - createDiffStream: async (call) => { - const id = call.request.getId() - const prefix = call.request.getPrefix() - const otherVersion = call.request.getOther() + async _rpcDownload (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('A diff stream request must specify a session ID.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A download request must specify a session ID.') + const drive = this.driveForSession(id) + const downloadId = this.download(drive, path) - const stream = drive.createDiffStream(otherVersion, prefix) + const rsp = new rpc.drive.messages.DownloadResponse() + rsp.setDownloadid(downloadId) + return rsp + } - const rspMapper = new Transform({ - transform (chunk, cb) { - const rsp = new rpc.drive.messages.DiffStreamResponse() - if (!chunk) return rsp + async _rpcUndownload (call) { + const id = call.request.getId() + const downloadId = call.request.getDownloadid() - const { name, type, value } = chunk - rsp.setType(type) - rsp.setName(name) - if (type === 'put') { - rsp.setValue(toDiffEntry({ stat: value })) - } else { - rsp.setValue(toDiffEntry({ mount: value })) - } + if (!id) throw new Error('An undownload request must specify a session ID.') + if (!downloadId) throw new Error('An undownload request must specify a download ID.') - return cb(null, rsp) - } - }) + const dl = this._downloads.get(downloadId) + if (dl) dl.destroy() + this._downloads.delete(downloadId) - pump(stream, rspMapper, call, err => { - if (err) { - log.error({ id, err }, 'createDiffStream error') - call.destroy(err) - } - }) - }, + return new rpc.drive.messages.UndownloadResponse() + } - createReadStream: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const start = call.request.getStart() - var end = call.request.getEnd() - const length = call.request.getLength() + async _rpcCreateDiffStream (call) { + const id = call.request.getId() + const prefix = call.request.getPrefix() + const otherVersion = call.request.getOther() - if (!id) throw new Error('A createReadStream request must specify a session ID.') - if (!path) throw new Error('A createReadStream request must specify a path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A diff stream request must specify a session ID.') + const drive = this.driveForSession(id) - const streamOpts = {} - if (end > 0 || (end === 0 && start === 0)) streamOpts.end = end - if (length !== 0) streamOpts.length = length - streamOpts.start = start + const stream = drive.createDiffStream(otherVersion, prefix) - const stream = drive.createReadStream(path, streamOpts) + const rspMapper = new Transform({ + transform (chunk, cb) { + const rsp = new rpc.drive.messages.DiffStreamResponse() + if (!chunk) return rsp - const rspMapper = new Transform({ - transform (chunk, cb) { - const rsp = new rpc.drive.messages.ReadStreamResponse() - rsp.setChunk(chunk) - return cb(null, rsp) - } - }) + const { name, type, value } = chunk + rsp.setType(type) + rsp.setName(name) + if (type === 'put') { + rsp.setValue(toDiffEntry({ stat: value })) + } else { + rsp.setValue(toDiffEntry({ mount: value })) + } - pump(stream, rspMapper, call, err => { - if (err) { - log.error({ id, err }, 'createReadStream error') - call.destroy(err) - } - }) - }, + return cb(null, rsp) + } + }) - readFile: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + pump(stream, rspMapper, call, err => { + if (err) { + log.error({ id, err }, 'createDiffStream error') + call.destroy(err) + } + }) + } - if (!id) throw new Error('A readFile request must specify a session ID.') - if (!path) throw new Error('A readFile request must specify a path.') - const drive = this.driveForSession(id) + async _rpcCreateReadStream (call) { + const id = call.request.getId() + const path = call.request.getPath() + const start = call.request.getStart() + var end = call.request.getEnd() + const length = call.request.getLength() + + if (!id) throw new Error('A createReadStream request must specify a session ID.') + if (!path) throw new Error('A createReadStream request must specify a path.') + const drive = this.driveForSession(id) + + const streamOpts = {} + if (end !== 0) streamOpts.end = end + if (length !== 0) streamOpts.length = length + streamOpts.start = start + const stream = drive.createReadStream(path, streamOpts) + + const rspMapper = new Transform({ + transform (chunk, cb) { + const rsp = new rpc.drive.messages.ReadStreamResponse() + rsp.setChunk(chunk) + return cb(null, rsp) + } + }) - const content = await new Promise((resolve, reject) => { - drive.readFile(path, (err, content) => { - if (err) return reject(err) - return resolve(content) - }) - }) + pump(stream, rspMapper, call, err => { + if (err) { + log.error({ id, err }, 'createReadStream error') + call.destroy(err) + } + }) + } - const chunks = toChunks(content) - for (const chunk of chunks) { - const rsp = new rpc.drive.messages.ReadFileResponse() - rsp.setChunk(chunk) - call.write(rsp) - } - call.end() - }, - - createWriteStream: async (call) => { - const unpack = new Transform({ - transform (msg, cb) { - const chunk = msg.getChunk() - return cb(null, Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)) - } - }) + async _rpcReadFile (call) { + const id = call.request.getId() + const path = call.request.getPath() - return new Promise((resolve, reject) => { - call.once('data', req => { - const id = req.getId() - const path = req.getPath() - const opts = fromStat(req.getOpts()) + if (!id) throw new Error('A readFile request must specify a session ID.') + if (!path) throw new Error('A readFile request must specify a path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A readFile request must specify a session ID.') - if (!path) throw new Error('A readFile request must specify a path.') - const drive = this.driveForSession(id) + const content = await new Promise((resolve, reject) => { + drive.readFile(path, (err, content) => { + if (err) return reject(err) + return resolve(content) + }) + }) - const stream = drive.createWriteStream(path, { mode: opts.mode, uid: opts.uid, gid: opts.gid, metadata: opts.metadata }) + const chunks = toChunks(content) + for (const chunk of chunks) { + const rsp = new rpc.drive.messages.ReadFileResponse() + rsp.setChunk(chunk) + call.write(rsp) + } + call.end() + } - return onstream(resolve, reject, stream) - }) - }) + async _rpcCreateWriteStream (call) { + const unpack = new Transform({ + transform (msg, cb) { + const chunk = msg.getChunk() + return cb(null, Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)) + } + }) - function onstream (resolve, reject, stream) { - pump(call, unpack, stream, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.WriteStreamResponse() - return resolve(rsp) - }) - } - }, + return new Promise((resolve, reject) => { + call.once('data', req => { + const id = req.getId() + const path = req.getPath() + const opts = fromStat(req.getOpts()) - writeFile: async (call) => { - return new Promise((resolve, reject) => { - call.once('data', req => { - const id = req.getId() - const path = req.getPath() - const opts = fromStat(req.getOpts()) + if (!id) throw new Error('A readFile request must specify a session ID.') + if (!path) throw new Error('A readFile request must specify a path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A writeFile request must specify a session ID.') - if (!path) throw new Error('A writeFile request must specify a path.') - const drive = this.driveForSession(id) + const stream = drive.createWriteStream(path, { mode: opts.mode, uid: opts.uid, gid: opts.gid, metadata: opts.metadata }) - return loadContent(resolve, reject, path, drive, opts) - }) - }) + return onstream(resolve, reject, stream) + }) + }) - function loadContent (resolve, reject, path, drive, opts) { - return collectStream(call, (err, reqs) => { - if (err) return reject(err) - const chunks = reqs.map(req => { - const chunk = req.getChunk() - return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) - }) - return drive.writeFile(path, Buffer.concat(chunks), opts, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.WriteFileResponse() - return resolve(rsp) - }) - }) - } - }, + function onstream (resolve, reject, stream) { + pump(call, unpack, stream, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.WriteStreamResponse() + return resolve(rsp) + }) + } + } - updateMetadata: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const metadata = fromMetadata(call.request.getMetadataMap()) + async _rpcWriteFile (call) { + return new Promise((resolve, reject) => { + call.once('data', req => { + const id = req.getId() + const path = req.getPath() + const opts = fromStat(req.getOpts()) - if (!id) throw new Error('A metadata update request must specify a session ID.') - if (!path) throw new Error('A metadata update request must specify a path.') - if (!metadata) throw new Error('A metadata update request must specify metadata.') + if (!id) throw new Error('A writeFile request must specify a session ID.') + if (!path) throw new Error('A writeFile request must specify a path.') const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive._update(path, { metadata }, err => { - if (err) return reject(err) - return resolve(new rpc.drive.messages.UpdateMetadataResponse()) - }) + return loadContent(resolve, reject, path, drive, opts) + }) + }) + + function loadContent (resolve, reject, path, drive, opts) { + return collectStream(call, (err, reqs) => { + if (err) return reject(err) + const chunks = reqs.map(req => { + const chunk = req.getChunk() + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) }) - }, + return drive.writeFile(path, Buffer.concat(chunks), opts, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.WriteFileResponse() + return resolve(rsp) + }) + }) + } + } - deleteMetadata: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const keys = call.request.getKeysList() + async _rpcUpdateMetadata (call) { + const id = call.request.getId() + const path = call.request.getPath() + const metadata = fromMetadata(call.request.getMetadataMap()) - if (!id) throw new Error('A metadata update request must specify a session ID.') - if (!path) throw new Error('A metadata update request must specify a path.') - if (!keys) throw new Error('A metadata update request must specify metadata keys.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A metadata update request must specify a session ID.') + if (!path) throw new Error('A metadata update request must specify a path.') + if (!metadata) throw new Error('A metadata update request must specify metadata.') + const drive = this.driveForSession(id) - const metadata = {} - for (const key of keys) { - metadata[key] = null - } + return new Promise((resolve, reject) => { + drive._update(path, { metadata }, err => { + if (err) return reject(err) + return resolve(new rpc.drive.messages.UpdateMetadataResponse()) + }) + }) + } - return new Promise((resolve, reject) => { - drive._update(path, { metadata }, err => { - if (err) return reject(err) - return resolve(new rpc.drive.messages.DeleteMetadataResponse()) - }) - }) - }, + async _rpcDeleteMetadata (call) { + const id = call.request.getId() + const path = call.request.getPath() + const keys = call.request.getKeysList() - stat: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const lstat = call.request.getLstat() + if (!id) throw new Error('A metadata update request must specify a session ID.') + if (!path) throw new Error('A metadata update request must specify a path.') + if (!keys) throw new Error('A metadata update request must specify metadata keys.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A stat request must specify a session ID.') - if (!path) throw new Error('A stat request must specify a path.') - const drive = this.driveForSession(id) + const metadata = {} + for (const key of keys) { + metadata[key] = null + } - const method = lstat ? drive.lstat.bind(drive) : drive.stat.bind(drive) + return new Promise((resolve, reject) => { + drive._update(path, { metadata }, err => { + if (err) return reject(err) + return resolve(new rpc.drive.messages.DeleteMetadataResponse()) + }) + }) + } - return new Promise((resolve, reject) => { - method(path, (err, stat) => { - if (err) return reject(err) + async _rpcStat (call) { + const id = call.request.getId() + const path = call.request.getPath() + const lstat = call.request.getLstat() - const rsp = new rpc.drive.messages.StatResponse() - rsp.setStat(toStat(stat)) + if (!id) throw new Error('A stat request must specify a session ID.') + if (!path) throw new Error('A stat request must specify a path.') + const drive = this.driveForSession(id) - return resolve(rsp) - }) - }) - }, + const method = lstat ? drive.lstat.bind(drive) : drive.stat.bind(drive) - unlink: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + return new Promise((resolve, reject) => { + method(path, (err, stat) => { + if (err) return reject(err) - if (!id) throw new Error('An unlink request must specify a session ID.') - if (!path) throw new Error('An unlink request must specify a path. ') - const drive = this.driveForSession(id) + const rsp = new rpc.drive.messages.StatResponse() + rsp.setStat(toStat(stat)) - return new Promise((resolve, reject) => { - drive.unlink(path, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.UnlinkResponse() - return resolve(rsp) - }) - }) - }, + return resolve(rsp) + }) + }) + } - readdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const recursive = call.request.getRecursive() - const noMounts = call.request.getNomounts() - const includeStats = call.request.getIncludestats() + async _rpcUnlink (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('A readdir request must specify a session ID.') - if (!path) throw new Error('A readdir request must specify a path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('An unlink request must specify a session ID.') + if (!path) throw new Error('An unlink request must specify a path. ') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.readdir(path, { recursive, noMounts, includeStats }, (err, files) => { - if (err) return reject(err) - - const rsp = new rpc.drive.messages.ReadDirectoryResponse() - if (!includeStats) { - rsp.setFilesList(files) - } else { - const names = [] - const stats = [] - const mounts = [] - const innerPaths = [] - for (const { name, stat, mount, innerPath } of files) { - names.push(name) - stats.push(toStat(stat)) - mounts.push(toMount(mount)) - innerPaths.push(innerPath) - } - rsp.setFilesList(names) - rsp.setStatsList(stats) - rsp.setMountsList(mounts) - rsp.setInnerpathsList(innerPaths) - } - return resolve(rsp) - }) - }) - }, + return new Promise((resolve, reject) => { + drive.unlink(path, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.UnlinkResponse() + return resolve(rsp) + }) + }) + } - mkdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() - const opts = fromStat(call.request.getOpts()) + async _rpcReaddir (call) { + const id = call.request.getId() + const path = call.request.getPath() + const recursive = call.request.getRecursive() + const noMounts = call.request.getNomounts() + const includeStats = call.request.getIncludestats() - if (!id) throw new Error('A mkdir request must specify a session ID.') - if (!path) throw new Error('A mkdir request must specify a directory path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A readdir request must specify a session ID.') + if (!path) throw new Error('A readdir request must specify a path.') + const drive = this.driveForSession(id) - const mkdirOpts = {} - if (opts.uid) mkdirOpts.uid = opts.uid - if (opts.gid) mkdirOpts.gid = opts.gid - if (opts.mode) mkdirOpts.mode = opts.mode + return new Promise((resolve, reject) => { + drive.readdir(path, { recursive, noMounts, includeStats }, (err, files) => { + if (err) return reject(err) - return new Promise((resolve, reject) => { - drive.mkdir(path, mkdirOpts, err => { - if (err) return reject(err) + const rsp = new rpc.drive.messages.ReadDirectoryResponse() + if (!includeStats) { + rsp.setFilesList(files) + } else { + const names = [] + const stats = [] + const mounts = [] + const innerPaths = [] + for (const { name, stat, mount, innerPath } of files) { + names.push(name) + stats.push(toStat(stat)) + mounts.push(toMount(mount)) + innerPaths.push(innerPath) + } + rsp.setFilesList(names) + rsp.setStatsList(stats) + rsp.setMountsList(mounts) + rsp.setInnerpathsList(innerPaths) + } + return resolve(rsp) + }) + }) + } - const rsp = new rpc.drive.messages.MkdirResponse() - return resolve(rsp) - }) - }) - }, + async _rpcMkdir (call) { + const id = call.request.getId() + const path = call.request.getPath() + const opts = fromStat(call.request.getOpts()) - rmdir: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + if (!id) throw new Error('A mkdir request must specify a session ID.') + if (!path) throw new Error('A mkdir request must specify a directory path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A rmdir request must specify a session ID.') - if (!path) throw new Error('A rmdir request must specify a directory path.') - const drive = this.driveForSession(id) + const mkdirOpts = {} + if (opts.uid) mkdirOpts.uid = opts.uid + if (opts.gid) mkdirOpts.gid = opts.gid + if (opts.mode) mkdirOpts.mode = opts.mode - return new Promise((resolve, reject) => { - drive.rmdir(path, err => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.mkdir(path, mkdirOpts, err => { + if (err) return reject(err) - const rsp = new rpc.drive.messages.RmdirResponse() - return resolve(rsp) - }) - }) - }, + const rsp = new rpc.drive.messages.MkdirResponse() + return resolve(rsp) + }) + }) + } - mount: async (call) => { - const id = call.request.getId() - const mountInfo = call.request.getInfo() + async _rpcRmdir (call) { + const id = call.request.getId() + const path = call.request.getPath() - const path = mountInfo.getPath() - const opts = fromMount(mountInfo.getOpts()) + if (!id) throw new Error('A rmdir request must specify a session ID.') + if (!path) throw new Error('A rmdir request must specify a directory path.') + const drive = this.driveForSession(id) - if (!id) throw new Error('A mount request must specify a session ID.') - if (!path) throw new Error('A mount request must specify a path.') - if (!opts) throw new Error('A mount request must specify mount options.') - const drive = this.driveForSession(id) + return new Promise((resolve, reject) => { + drive.rmdir(path, err => { + if (err) return reject(err) - return new Promise((resolve, reject) => { - let error = null - const mountListener = key => { - if (!opts.key || key.equals(opts.key)) { - this.removeListener('configured-mount', mountListener) - if (error) return - const rsp = new rpc.drive.messages.MountDriveResponse() - return resolve(rsp) - } - } - this.on('configured-mount', mountListener) - drive.mount(path, opts.key, opts, err => { - if (err) { - error = err - return reject(err) - } - if (opts.key && this._configuredMounts.has(opts.key.toString('hex'))) { - return mountListener(opts.key) - } - }) - }) - }, + const rsp = new rpc.drive.messages.RmdirResponse() + return resolve(rsp) + }) + }) + } - unmount: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + async _rpcMount (call) { + const id = call.request.getId() + const mountInfo = call.request.getInfo() - if (!id) throw new Error('An unmount request must specify a session ID.') - if (!path) throw new Error('An unmount request must specify a path.') - const drive = this.driveForSession(id) + const path = mountInfo.getPath() + const opts = fromMount(mountInfo.getOpts()) - return new Promise((resolve, reject) => { - drive.unmount(path, err => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.UnmountDriveResponse() - return resolve(rsp) - }) - }) - }, - - watch: async (call) => { - const self = this - var watcher = null - var closed = false - var driveWatchers = null - var keyString = null - - call.once('data', req => { - const id = req.getId() - var path = req.getPath() - - if (!id) throw new Error('A watch request must specify a session ID.') - if (!path) path = '/' - const drive = this.driveForSession(id) - keyString = drive.key.toString('hex') - - driveWatchers = this._watchers.get(keyString) - if (!driveWatchers) { - driveWatchers = [] - this._watchers.set(keyString, driveWatchers) - } + if (!id) throw new Error('A mount request must specify a session ID.') + if (!path) throw new Error('A mount request must specify a path.') + if (!opts) throw new Error('A mount request must specify mount options.') + const drive = this.driveForSession(id) - watcher = drive.watch(path, () => { - const rsp = new rpc.drive.messages.WatchResponse() - call.write(rsp) - }) + return new Promise((resolve, reject) => { + let error = null + const mountListener = key => { + if (!opts.key || key.equals(opts.key)) { + this.removeListener('configured-mount', mountListener) + if (error) return + const rsp = new rpc.drive.messages.MountDriveResponse() + return resolve(rsp) + } + } + this.on('configured-mount', mountListener) + drive.mount(path, opts.key, opts, err => { + if (err) { + error = err + return reject(err) + } + if (opts.key && this._configuredMounts.has(opts.key.toString('hex'))) { + return mountListener(opts.key) + } + }) + }) + } - const close = onclose.bind(null, id, path, driveWatchers) + async _rpcUnmount (call) { + const id = call.request.getId() + const path = call.request.getPath() - watcher.once('ready', subWatchers => { - // Add one in order to include the root watcher. - this._watchCount += subWatchers.length + 1 - if (this._watchCount > this.watchLimit) { - return close('Watch limit reached. Please close watch connections then try again.') - } - driveWatchers.push(watcher) - - // Any subsequent messages are considered cancellations. - call.on('data', close) - call.on('close', close) - call.on('finish', close) - call.on('error', close) - call.on('end', close) - }) - }) + if (!id) throw new Error('An unmount request must specify a session ID.') + if (!path) throw new Error('An unmount request must specify a path.') + const drive = this.driveForSession(id) - function onclose (id, path, driveWatchers, err) { - if (closed) return - closed = true - log.debug({ id, path }, 'unregistering watcher') - if (watcher) { - watcher.destroy() - if (watcher.watchers) self._watchCount -= (watcher.watchers.length + 1) - driveWatchers.splice(driveWatchers.indexOf(watcher), 1) - if (!driveWatchers.length) self._watchers.delete(keyString) - } - call.end() + return new Promise((resolve, reject) => { + drive.unmount(path, err => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.UnmountDriveResponse() + return resolve(rsp) + }) + }) + } + + async _rpcWatch (call) { + const self = this + var watcher = null + var closed = false + var driveWatchers = null + var keyString = null + + call.once('data', req => { + const id = req.getId() + var path = req.getPath() + + if (!id) throw new Error('A watch request must specify a session ID.') + if (!path) path = '/' + const drive = this.driveForSession(id) + keyString = drive.key.toString('hex') + + driveWatchers = this._watchers.get(keyString) + if (!driveWatchers) { + driveWatchers = [] + this._watchers.set(keyString, driveWatchers) + } + + watcher = drive.watch(path, () => { + const rsp = new rpc.drive.messages.WatchResponse() + call.write(rsp) + }) + + const close = onclose.bind(null, id, path, driveWatchers) + + watcher.once('ready', subWatchers => { + // Add one in order to include the root watcher. + this._watchCount += subWatchers.length + 1 + if (this._watchCount > this.watchLimit) { + return close('Watch limit reached. Please close watch connections then try again.') } - }, + driveWatchers.push(watcher) + + // Any subsequent messages are considered cancellations. + call.on('data', close) + call.on('close', close) + call.on('finish', close) + call.on('error', close) + call.on('end', close) + }) + }) - symlink: async (call) => { - const id = call.request.getId() - const target = call.request.getTarget() - const linkname = call.request.getLinkname() + function onclose (id, path, driveWatchers, err) { + if (closed) return + closed = true + log.debug({ id, path }, 'unregistering watcher') + if (watcher) { + watcher.destroy() + if (watcher.watchers) self._watchCount -= (watcher.watchers.length + 1) + driveWatchers.splice(driveWatchers.indexOf(watcher), 1) + if (!driveWatchers.length) self._watchers.delete(keyString) + } + call.end() + } + } - if (!id) throw new Error('A symlink request must specify a session ID.') - if (!target) throw new Error('A symlink request must specify a target.') - if (!linkname) throw new Error('A symlink request must specify a linkname.') - const drive = this.driveForSession(id) + async _rpcSymlink (call) { + const id = call.request.getId() + const target = call.request.getTarget() + const linkname = call.request.getLinkname() - return new Promise((resolve, reject) => { - drive.symlink(target, linkname, err => { - if (err) return reject(err) + if (!id) throw new Error('A symlink request must specify a session ID.') + if (!target) throw new Error('A symlink request must specify a target.') + if (!linkname) throw new Error('A symlink request must specify a linkname.') + const drive = this.driveForSession(id) - const rsp = new rpc.drive.messages.SymlinkResponse() - return resolve(rsp) - }) - }) - }, + return new Promise((resolve, reject) => { + drive.symlink(target, linkname, err => { + if (err) return reject(err) + + const rsp = new rpc.drive.messages.SymlinkResponse() + return resolve(rsp) + }) + }) + } - close: async (call) => { - const id = call.request.getId() + async _rpcClose (call) { + const id = call.request.getId() - this.driveForSession(id) - await this.closeSession(id) - const rsp = new rpc.drive.messages.CloseSessionResponse() + this.driveForSession(id) + await this.closeSession(id) + const rsp = new rpc.drive.messages.CloseSessionResponse() - return rsp - }, + return rsp + } - fileStats: async (call) => { - const id = call.request.getId() - const path = call.request.getPath() + async _rpcFileStats (call) { + const id = call.request.getId() + const path = call.request.getPath() - if (!id) throw new Error('A fileStats request must specify a session ID.') - if (!path) throw new Error('A fileStats request must specify a path.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A fileStats request must specify a session ID.') + if (!path) throw new Error('A fileStats request must specify a path.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.stats(path, (err, stats) => { - if (err) return reject(err) + return new Promise((resolve, reject) => { + drive.stats(path, (err, stats) => { + if (err) return reject(err) - if (!(stats instanceof Map)) { - const fileStats = stats - stats = new Map() - stats.set(path, fileStats) - } - const rsp = new rpc.drive.messages.FileStatsResponse() - setFileStats(rsp.getStatsMap(), stats) + if (!(stats instanceof Map)) { + const fileStats = stats + stats = new Map() + stats.set(path, fileStats) + } + const rsp = new rpc.drive.messages.FileStatsResponse() + setFileStats(rsp.getStatsMap(), stats) - return resolve(rsp) - }) - }) - }, + return resolve(rsp) + }) + }) + } - mounts: async (call) => { - const id = call.request.getId() - const memory = call.request.getMemory() - const recursive = call.request.getRecursive() + async _rpcMounts (call) { + const id = call.request.getId() + const memory = call.request.getMemory() + const recursive = call.request.getRecursive() - if (!id) throw new Error('A mounts request must specify a session ID.') - const drive = this.driveForSession(id) + if (!id) throw new Error('A mounts request must specify a session ID.') + const drive = this.driveForSession(id) - return new Promise((resolve, reject) => { - drive.getAllMounts({ memory, recursive }, (err, mounts) => { - if (err) return reject(err) - const rsp = new rpc.drive.messages.DriveMountsResponse() - if (!mounts) return resolve(rsp) - - const mountsList = [] - for (const [path, { metadata }] of mounts) { - mountsList.push(toMountInfo({ - path, - opts: { - key: metadata.key, - version: metadata.version - } - })) + return new Promise((resolve, reject) => { + drive.getAllMounts({ memory, recursive }, (err, mounts) => { + if (err) return reject(err) + const rsp = new rpc.drive.messages.DriveMountsResponse() + if (!mounts) return resolve(rsp) + + const mountsList = [] + for (const [path, { metadata }] of mounts) { + mountsList.push(toMountInfo({ + path, + opts: { + key: metadata.key, + version: metadata.version } - rsp.setMountsList(mountsList) - return resolve(rsp) - }) - }) - } - } + })) + } + rsp.setMountsList(mountsList) + return resolve(rsp) + }) + }) } } -function collect (index, opts) { - return new Promise((resolve, reject) => { - collectStream(index.createReadStream(opts), (err, list) => { - if (err) return reject(err) - return resolve(list) - }) - }) +DriveManager.generateSubDbs = function (db) { + return { + drives: sub(db, 'drives', { valueEncoding: 'bjson' }), + mirrors: sub(db, 'mirrors', { valueEncoding: 'utf8' }), + seeding: sub(db, 'seeding', { valueEncoding: 'json' }) + } } module.exports = DriveManager diff --git a/lib/fuse/index.js b/lib/fuse/index.js index 8538a6a..1be4087 100644 --- a/lib/fuse/index.js +++ b/lib/fuse/index.js @@ -613,67 +613,65 @@ class FuseManager extends EventEmitter { return new Map([...this._drives]) } - getHandlers () { - return { - mount: async (call) => { - var mountOpts = call.request.getOpts() - const mnt = call.request.getPath() - if (mountOpts) mountOpts = fromHyperdriveOptions(mountOpts) + // RPC Methods - if (!mnt) throw new Error('A mount request must specify a mountpoint.') - const mountInfo = await this.mount(mnt, mountOpts) + async _rpcMount (call) { + var mountOpts = call.request.getOpts() + const mnt = call.request.getPath() + if (mountOpts) mountOpts = fromHyperdriveOptions(mountOpts) - const rsp = new rpc.fuse.messages.MountResponse() - rsp.setMountinfo(toHyperdriveOptions(mountInfo)) - rsp.setPath(mnt) + if (!mnt) throw new Error('A mount request must specify a mountpoint.') + const mountInfo = await this.mount(mnt, mountOpts) - return rsp - }, + const rsp = new rpc.fuse.messages.MountResponse() + rsp.setMountinfo(toHyperdriveOptions(mountInfo)) + rsp.setPath(mnt) - unmount: async (call) => { - const mnt = call.request.getPath() + return rsp + } - await this.unmount(mnt) + async _rpcUnmount (call) { + const mnt = call.request.getPath() - return new rpc.fuse.messages.UnmountResponse() - }, + await this.unmount(mnt) - status: (call) => { - const rsp = new rpc.fuse.messages.FuseStatusResponse() - rsp.setAvailable(true) - return new Promise((resolve, reject) => { - hyperfuse.isConfigured((err, configured) => { - if (err) return reject(err) - rsp.setConfigured(configured) - return resolve(rsp) - }) - }) - }, + return new rpc.fuse.messages.UnmountResponse() + } + + async _rpcStatus (call) { + const rsp = new rpc.fuse.messages.FuseStatusResponse() + rsp.setAvailable(true) + return new Promise((resolve, reject) => { + hyperfuse.isConfigured((err, configured) => { + if (err) return reject(err) + rsp.setConfigured(configured) + return resolve(rsp) + }) + }) + } - info: async (call) => { - const rsp = new rpc.fuse.messages.InfoResponse() - const mnt = call.request.getPath() + async _rpcInfo (call) { + const rsp = new rpc.fuse.messages.InfoResponse() + const mnt = call.request.getPath() - const { key, mountPath, writable, relativePath } = await this.info(mnt) - rsp.setKey(key) - rsp.setPath(relativePath) - rsp.setMountpath(mountPath) - rsp.setWritable(writable) + const { key, mountPath, writable, relativePath } = await this.info(mnt) + rsp.setKey(key) + rsp.setPath(relativePath) + rsp.setMountpath(mountPath) + rsp.setWritable(writable) - return rsp - }, + return rsp + } - download: async (call) => { - const rsp = new rpc.fuse.messages.DownloadResponse() - const path = call.request.getPath() + async _rpcDownload (call) { + const rsp = new rpc.fuse.messages.DownloadResponse() + const path = call.request.getPath() - const { downloadId, sessionId } = await this.download(path) - rsp.setDownloadid(downloadId) - rsp.setSessionid(sessionId) + const { downloadId, sessionId } = await this.download(path) + rsp.setDownloadid(downloadId) + rsp.setSessionid(sessionId) - return rsp - } - } + return rsp } } diff --git a/lib/peers/index.js b/lib/peers.js similarity index 91% rename from lib/peers/index.js rename to lib/peers.js index b1e064a..32045d5 100644 --- a/lib/peers/index.js +++ b/lib/peers.js @@ -4,7 +4,8 @@ const eos = require('end-of-stream') const { rpc } = require('hyperdrive-daemon-client') const messages = rpc.peers.messages const WatchPeersTypes = messages.WatchPeersResponse.Type -const log = require('../log').child({ component: 'peers' }) + +const log = require('./log').child({ component: 'peers' }) const ALIAS = Symbol('hyperdrive-peer-alias') @@ -99,13 +100,4 @@ module.exports = class PeersManager extends EventEmitter { this._keysByAlias.set(alias, remoteKey) return alias } - - getHandlers () { - return { - listPeers: this._rpcListPeers.bind(this), - watchPeers: this._rpcWatchPeers.bind(this), - getAlias: this._rpcGetAlias.bind(this), - getKey: this._rpcGetKey.bind(this) - } - } } diff --git a/lib/peersockets/index.js b/lib/peersockets.js similarity index 95% rename from lib/peersockets/index.js rename to lib/peersockets.js index 5e5ac73..6ef7156 100644 --- a/lib/peersockets/index.js +++ b/lib/peersockets.js @@ -2,7 +2,7 @@ const { EventEmitter } = require('events') const { rpc } = require('hyperdrive-daemon-client') const messages = rpc.peersockets.messages -const log = require('../log').child({ component: 'peersockets' }) +const log = require('./log').child({ component: 'peersockets' }) const PeerMessageTypes = messages.PeerMessage.Type @@ -24,12 +24,6 @@ module.exports = class PeersocketsManager extends EventEmitter { const topicHandler = new TopicHandler(this, this.peersockets, this.peers, call) this.handles.push(topicHandler) } - - getHandlers () { - return { - join: this._rpcJoin.bind(this) - } - } } class TopicHandler { diff --git a/package.json b/package.json index 0328db2..5d5e4b2 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,13 @@ { "name": "hyperdrive-daemon", - "version": "1.13.17", + "version": "1.14.2", "description": "A FUSE-mountable distributed filesystem, built on Hyperdrive", "main": "index.js", "bin": { "hyperdrive": "./bin/run/run" }, "scripts": { - "test": "standard && NODE_ENV=test tape test/*.js" + "test": "NODE_ENV=test tape test/*.js" }, "files": [ "index.js", @@ -39,6 +39,7 @@ "@oclif/plugin-autocomplete": "^0.1.5", "@oclif/plugin-help": "^2.2.3", "buffer-json-encoding": "^1.0.2", + "call-me-maybe": "^1.0.1", "corestore": "^5.0.0", "corestore-swarm-networking": "^5.0.0", "dat-encoding": "^5.0.1", @@ -57,6 +58,7 @@ "level-mem": "^5.0.1", "minimist": "^1.2.5", "mkdirp": "^0.5.1", + "nanoresource-promise": "^1.2.2", "ora": "^4.0.3", "peersockets": "^0.3.0", "pino": "^5.12.6", @@ -67,7 +69,8 @@ "random-access-memory": "^3.1.1", "stream-collector": "^1.0.1", "streamx": "^2.6.0", - "subleveldown": "^4.0.0" + "subleveldown": "^4.0.0", + "varint": "^5.0.0" }, "optionalDependencies": { "fuse-native": "^2.2.1", diff --git a/test/hyperdrive.js b/test/hyperdrive.js index f888ac9..dae118c 100644 --- a/test/hyperdrive.js +++ b/test/hyperdrive.js @@ -1,3 +1,7 @@ +const { promises: { mkdtemp, writeFile, readFile } } = require('fs') +const { tmpdir } = require('os') +const { once } = require('events') +const { join } = require('path') const test = require('tape') const collectStream = require('stream-collector') @@ -182,6 +186,70 @@ test('can write/read a file from a remote hyperdrive using stream methods', asyn t.end() }) +test('assorted read parameters to createReadStream', async t => { + const { client, cleanup } = await createOne() + + try { + const drive = await client.drive.get() + t.true(drive.key) + t.same(drive.id, 1) + + let blocks = ['hello', 'hello', 'world', 'world'] + let complete = blocks.join('') + let tests = [ + { + params: {}, + value: complete + }, + { + params: { end: 10 }, + value: complete.slice(0, 10 + 1) + }, + { + params: { start: 4, end: 10 }, + value: complete.slice(4, 10 + 1) + }, + { + params: { end: complete.length - 1 }, + value: complete + }, + { + params: { start: 5, length: 5 }, + value: complete.slice(5, 10) + } + ] + + const writeStream = drive.createWriteStream('hello', { uid: 999, gid: 999 }) + for (let block of blocks) { + writeStream.write(block) + } + writeStream.end() + + await new Promise((resolve, reject) => { + writeStream.on('error', reject) + writeStream.on('finish', resolve) + }) + + for (let { params, value } of tests) { + const readStream = await drive.createReadStream('hello', params) + const content = await new Promise((resolve, reject) => { + collectStream(readStream, (err, bufs) => { + if (err) return reject(err) + return resolve(Buffer.concat(bufs)) + }) + }) + t.same(content.toString('utf8'), value) + } + + await drive.close() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + test('reading an invalid file propogates error', async t => { const { client, cleanup } = await createOne() @@ -875,6 +943,66 @@ test('can get all network configurations', async t => { t.end() }) +test('import', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'import-test')) + await writeFile(join(tmpDir, 'test.txt'), 'hello world') + + try { + // now import the dir + const drive = await client.drive.get() + const importProgress = client.drive.import(tmpDir, drive) + + const [, dst] = await once(importProgress, 'put-end') + t.same(dst.name, '/test.txt') + const contents = await drive.readFile('test.txt', { encoding: 'utf8' }) + t.same(contents, 'hello world') + + await drive.close() + + importProgress.destroy() + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('export', async t => { + const { client, cleanup } = await createOne() + + // create a tmp dir + const tmpDir = await mkdtemp(join(tmpdir(), 'export-test')) + + try { + // create a test drive + const drive = await client.drive.get() + await drive.writeFile('export.txt', 'hello world') + + const dw = client.drive.export(drive, tmpDir) + await dw.start() + + let total, downloaded + dw.on('stats', async stats => { + total = stats.total + downloaded = stats.downloaded + if (total === downloaded) { + t.pass('stats OK') + const contents = await readFile(join(tmpDir, 'export.txt'), { encoding: 'utf8' }) + t.same(contents, 'hello world') + await drive.close() + await cleanup() + t.end() + } + }) + } catch (err) { + t.fail(err) + } +}) + // TODO: Figure out why the grpc server is not terminating. test.onFinish(() => { setTimeout(() => { diff --git a/test/replication.js b/test/replication.js index 795be8e..56eee97 100644 --- a/test/replication.js +++ b/test/replication.js @@ -175,6 +175,150 @@ test('can cancel an active download', async t => { } }) +test('can mirror a single drive', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await drive1.writeFile('/a/1', 'hello') + await drive1.writeFile('/a/2', 'world') + await drive1.writeFile('/a/3', 'three') + await drive1.writeFile('/a/4', 'four') + await drive1.writeFile('/a/5', 'five') + + // 100 ms delay for replication. + await delay(100) + + const d2Stats1 = await drive2.stats() + var stats = d2Stats1.stats + + // Since there has not been a content read yet, the stats will not report the latest content length. + t.same(stats[0].content.totalBlocks, 0) + + await drive2.mirror() + + // 200 ms delay for download to complete. + await delay(200) + + const d2Stats2 = await drive2.stats() + stats = d2Stats2.stats + + const fileStats = await drive2.fileStats('a') + t.same(stats[0].content.totalBlocks, 5) + t.same(stats[0].content.downloadedBlocks, 5) + t.same(fileStats.get('/a/1').downloadedBlocks, 1) + t.same(fileStats.get('/a/2').downloadedBlocks, 1) + t.same(fileStats.get('/a/3').downloadedBlocks, 1) + t.same(fileStats.get('/a/4').downloadedBlocks, 1) + t.same(fileStats.get('/a/5').downloadedBlocks, 1) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('can mirror a drive with mounts', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + const mount = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await drive1.mount('/a', { key: mount.key }) + await mount.writeFile('2', 'world') + await mount.writeFile('3', 'three') + await mount.writeFile('4', 'four') + await mount.writeFile('5', 'five') + + // 100 ms delay for replication. + await delay(100) + + const d2Stats1 = await drive2.stats() + var stats = d2Stats1.stats + + await drive2.mirror() + + // 200 ms delay for download to complete. + await delay(200) + + const d2Stats2 = await drive2.stats() + stats = d2Stats2.stats + + const fileStats = await drive2.fileStats('a') + t.same(stats[1].content.totalBlocks, 4) + t.same(stats[1].content.downloadedBlocks, 4) + t.same(fileStats.get('/a/2').downloadedBlocks, 1) + t.same(fileStats.get('/a/3').downloadedBlocks, 1) + t.same(fileStats.get('/a/4').downloadedBlocks, 1) + t.same(fileStats.get('/a/5').downloadedBlocks, 1) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() +}) + +test('can cancel an active mirror', async t => { + const { clients, cleanup } = await create(2) + const firstClient = clients[0] + const secondClient = clients[1] + + try { + const drive1 = await firstClient.drive.get() + await drive1.configureNetwork({ lookup: true, announce: true }) + + const drive2 = await secondClient.drive.get({ key: drive1.key }) + + await writeFile(drive1, '/a/1', 50) + await writeFile(drive1, '/a/2', 50) + + const unmirror = await drive2.mirror() + await delay(100) + await unmirror() + + // Wait to make sure that the download is not continuing. + await delay(100) + + const { stats: totals } = await drive2.stats() + const fileStats = await drive2.fileStats('a') + const contentTotals = totals[0].content + t.true(contentTotals.downloadedBlocks < 100 && contentTotals.downloadedBlocks > 0) + t.true(fileStats.get('/a/1').downloadedBlocks < 50 && fileStats.get('/a/1').downloadedBlocks > 0) + t.true(fileStats.get('/a/2').downloadedBlocks < 50 && fileStats.get('/a/2').downloadedBlocks > 0) + } catch (err) { + t.fail(err) + } + + await cleanup() + t.end() + + async function writeFile (drive, name, numBlocks) { + const writeStream = drive.createWriteStream(name) + return new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + for (let i = 0; i < numBlocks; i++) { + writeStream.write(Buffer.alloc(1024 * 1024).fill('abcdefg')) + } + writeStream.end() + }) + } +}) + test('can replicate many mounted drives between daemons', async t => { const { clients, cleanup } = await create(2) console.time('many-mounts')