Skip to content

Commit 5eeb18c

Browse files
committed
Better constants handling + daemon refactoring + streaming methods
1 parent dc1d30b commit 5eeb18c

File tree

9 files changed

+550
-416
lines changed

9 files changed

+550
-416
lines changed

bin/start.js

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,30 @@ const forever = require('forever')
66

77
const { createMetadata } = require('../lib/metadata')
88
const { HyperdriveClient } = require('hyperdrive-daemon-client')
9-
10-
const HYPERDRIVE_DIR = p.join(os.homedir(), '.hyperdrive')
11-
const OUTPUT_FILE = p.join(HYPERDRIVE_DIR, 'output.log')
9+
const constants = require('hyperdrive-daemon-client/lib/constants')
1210

1311
exports.command = 'start'
1412
exports.desc = 'Start the Hyperdrive daemon.'
1513
exports.builder = {
1614
port: {
1715
description: 'The gRPC port that the daemon will bind to.',
1816
type: 'number',
19-
default: 3101
17+
default: constants.port
2018
},
2119
storage: {
2220
description: 'The storage directory for hyperdrives and associated metadata.',
2321
type: 'string',
24-
default: p.join(HYPERDRIVE_DIR, 'storage')
22+
default: constants.storage,
2523
},
2624
'log-level': {
2725
description: 'The log level',
2826
type: 'string',
29-
default: 'debug'
27+
default: constants.logLevel
3028
},
3129
bootstrap: {
3230
description: 'Comma-separated bootstrap servers to use.',
3331
type: 'array',
34-
default: []
32+
default: constants.bootstrap
3533
}
3634
}
3735

@@ -57,11 +55,11 @@ async function start (argv) {
5755
let endpoint = `localhost:${argv.port}`
5856
await createMetadata(endpoint)
5957
forever.startDaemon(p.join(__dirname, '..', 'index.js'), {
60-
uid: 'hyperdrive',
58+
uid: constants.uid,
6159
max: 1,
62-
logFile: OUTPUT_FILE,
63-
outFile: OUTPUT_FILE,
64-
errFile: OUTPUT_FILE,
60+
logFile: constants.unstructuredLog,
61+
outFile: constants.unstructuredLog,
62+
errFile: constants.unstructuredLog,
6563
args: ['--port', argv.port, '--storage', argv.storage, '--log-level', argv['log-level'], '--bootstrap', argv.bootstrap.join(',')]
6664
})
6765
console.log(chalk.green(`Daemon started at ${endpoint}`))

index.js

Lines changed: 100 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,47 @@
11
const p = require('path')
2+
const os = require('os')
23
const { EventEmitter } = require('events')
34

45
const mkdirp = require('mkdirp')
56
const raf = require('random-access-file')
67
const level = require('level')
78
const sub = require('subleveldown')
89
const grpc = require('@grpc/grpc-js')
9-
10-
const { rpc, loadMetadata } = require('hyperdrive-daemon-client')
1110
const corestore = require('corestore')
1211
const SwarmNetworker = require('corestore-swarm-networking')
1312

14-
const { DriveManager, createDriveHandlers } = require('./lib/drives')
13+
const { rpc, loadMetadata } = require('hyperdrive-daemon-client')
14+
const constants = require('hyperdrive-daemon-client/lib/constants')
15+
16+
const DriveManager = require('./lib/drives')
1517
const { catchErrors, serverError, requestError } = require('./lib/errors')
1618

1719
try {
1820
var hyperfuse = require('hyperdrive-fuse')
19-
var { FuseManager, createFuseHandlers } = require('./lib/fuse')
21+
var FuseManager = require('./lib/fuse')
2022
} catch (err) {
2123
console.warn('FUSE bindings are not available on this platform.')
2224
}
2325
const log = require('./lib/log').child({ component: 'server' })
24-
const argv = extractArguments()
26+
27+
const STOP_EVENTS = ['SIGINT', 'SIGTERM', 'unhandledRejection', 'uncaughtException']
2528

2629
class HyperdriveDaemon extends EventEmitter {
27-
constructor (storage, opts = {}) {
30+
constructor (opts = {}) {
2831
super()
2932

30-
this.db = level(`${storage}/db`, { valueEncoding: 'json' })
3133
this.opts = opts
34+
this.storage = opts.storage || constants.storage
35+
this.port = opts.port || constants.port
3236

37+
this.db = level(`${this.storage}/db`, { valueEncoding: 'json' })
3338
const dbs = {
3439
fuse: sub(this.db, 'fuse', { valueEncoding: 'json' }),
3540
drives: sub(this.db, 'drives', { valueEncoding: 'json' })
3641
}
3742

3843
const corestoreOpts = {
39-
storage: path => raf(`${storage}/cores/${path}`),
44+
storage: path => raf(`${this.storage}/cores/${path}`),
4045
sparse: true,
4146
// Collect networking statistics.
4247
stats: true
@@ -45,9 +50,21 @@ class HyperdriveDaemon extends EventEmitter {
4550
// The root corestore should be bootstrapped with an empty default feed.
4651
this.corestore.default()
4752

48-
this.networking = new SwarmNetworker(this.corestore, opts.network)
53+
const bootstrapOpts = opts.bootstrap || constants.bootstrap
54+
if (bootstrapOpts && bootstrapOpts.length && bootstrapOpts[0] !== '') {
55+
if (bootstrapOpts === false && bootstrapOpts[0] === 'false') {
56+
var networkOpts = { bootstrap: false }
57+
} else {
58+
networkOpts = { bootstrap: bootstrapOpts }
59+
}
60+
}
61+
this.networking = new SwarmNetworker(this.corestore, networkOpts)
62+
4963
this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, this.opts)
5064
this.fuse = hyperfuse ? new FuseManager(this.megastore, this.drives, dbs.fuse, this.opts) : null
65+
// Set in start.
66+
this.server = null
67+
this._cleanup = null
5168

5269
this.drives.on('error', err => this.emit('error', err))
5370
this.fuse.on('error', err => this.emit('error', err))
@@ -61,7 +78,15 @@ class HyperdriveDaemon extends EventEmitter {
6178
}
6279
}
6380

64-
_ready () {
81+
async _ready () {
82+
await this._loadMetadata()
83+
await this._ensureStorage()
84+
85+
this._cleanup = this.stop.bind(this)
86+
for (const event of STOP_EVENTS) {
87+
process.once(event, this._cleanup)
88+
}
89+
6590
return Promise.all([
6691
this.db.open(),
6792
this.networking.listen(),
@@ -72,89 +97,85 @@ class HyperdriveDaemon extends EventEmitter {
7297
})
7398
}
7499

75-
async close () {
76-
if (this._isClosed) return Promise.resolve()
77-
if (this.fuse && this.fuse.fuseConfigured) await this.fuse.unmount()
78-
if (this.networking) await this.networking.close()
79-
await this.db.close()
80-
this._isClosed = true
100+
async _loadMetadata () {
101+
this.metadata = this.opts.metadata || await new Promise((resolve, reject) => {
102+
loadMetadata((err, metadata) => {
103+
if (err) return reject(err)
104+
return resolve(metadata)
105+
})
106+
})
81107
}
82-
}
83108

84-
async function start (opts = {}) {
85-
const metadata = opts.metadata || await new Promise((resolve, reject) => {
86-
loadMetadata((err, metadata) => {
87-
if (err) return reject(err)
88-
return resolve(metadata)
109+
_ensureStorage () {
110+
return new Promise((resolve, reject) => {
111+
mkdirp(this.storage, err => {
112+
if (err) return reject(err)
113+
return resolve()
114+
})
89115
})
90-
})
91-
const storageRoot = opts.storage || argv.storage
92-
await ensureStorage()
93-
94-
const daemonOpts = {}
95-
const bootstrapOpts = opts.bootstrap || argv.bootstrap
96-
if (bootstrapOpts.length && bootstrapOpts[0] !== '') {
97-
if (bootstrapOpts === false && bootstrapOpts[0] === 'false') {
98-
daemonOpts.network = { bootstrap: false }
99-
} else {
100-
daemonOpts.network = { bootstrap: bootstrapOpts }
101-
}
102116
}
103117

104-
const daemon = new HyperdriveDaemon(storageRoot, daemonOpts)
105-
await daemon.ready()
118+
createMainHandlers () {
119+
return {
120+
stop: async (call) => {
121+
await this.close()
122+
setTimeout(() => {
123+
console.error('Daemon is exiting.')
124+
this.server.forceShutdown()
125+
}, 250)
126+
return new rpc.main.messages.StopResponse()
127+
},
106128

107-
const server = new grpc.Server();
108-
if (hyperfuse) {
109-
server.addService(rpc.fuse.services.FuseService, {
110-
...wrap(metadata, createFuseHandlers(daemon.fuse), { authenticate: true })
111-
})
129+
status: async (call) => {
130+
return new rpc.main.messages.StatusResponse()
131+
}
132+
}
112133
}
113-
server.addService(rpc.drive.services.DriveService, {
114-
...wrap(metadata, createDriveHandlers(daemon.drives), { authenticate: true })
115-
})
116-
server.addService(rpc.main.services.HyperdriveService, {
117-
...wrap(metadata, createMainHandlers(server, daemon), { authenticate: true })
118-
})
119-
120-
121-
const port = opts.port || argv.port
122-
await new Promise((resolve, reject) => {
123-
server.bindAsync(`0.0.0.0:${port}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
124-
if (err) return reject(err)
125-
log.info({ port: port }, 'server listening')
126-
server.start()
127-
return resolve()
128-
})
129-
})
130134

131-
const cleanupEvents = ['SIGINT', 'SIGTERM', 'unhandledRejection', 'uncaughtException']
132-
for (const event of cleanupEvents) {
133-
process.once(event, cleanup)
135+
async stop () {
136+
if (this._isClosed) return Promise.resolve()
137+
138+
if (this.fuse && this.fuse.fuseConfigured) await this.fuse.unmount()
139+
if (this.networking) await this.networking.close()
140+
if (this.server) this.server.forceShutdown()
141+
await this.db.close()
142+
143+
for (const event of STOP_EVENTS) {
144+
process.removeListener(event, this._cleanup)
145+
}
146+
147+
this._isClosed = true
134148
}
135149

136-
return cleanup
150+
async start () {
151+
await this.ready()
152+
this.server = new grpc.Server()
137153

138-
async function cleanup () {
139-
await daemon.close()
140-
await new Promise((resolve, reject) => {
141-
server.tryShutdown(err => {
142-
if (err) return reject(err)
143-
return resolve()
154+
if (hyperfuse) {
155+
this.server.addService(rpc.fuse.services.FuseService, {
156+
...wrap(this.metadata, this.fuse.getHandlers(), { authenticate: true })
144157
})
145-
})
146-
for (const event of cleanupEvents) {
147-
process.removeListener(event, cleanup)
148158
}
149-
}
159+
this.server.addService(rpc.drive.services.DriveService, {
160+
...wrap(this.metadata, this.drives.getHandlers(), { authenticate: true })
161+
})
162+
this.server.addService(rpc.main.services.HyperdriveService, {
163+
...wrap(this.metadata, this.createMainHandlers(), { authenticate: true })
164+
})
150165

151-
function ensureStorage () {
152-
return new Promise((resolve, reject) => {
153-
mkdirp(storageRoot, err => {
166+
await new Promise((resolve, reject) => {
167+
this.server.bindAsync(`0.0.0.0:${this.port}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
154168
if (err) return reject(err)
169+
log.info({ port: port }, 'server listening')
170+
this.server.start()
155171
return resolve()
156172
})
157173
})
174+
175+
176+
async function close () {
177+
await this.close()
178+
}
158179
}
159180
}
160181

@@ -213,26 +234,10 @@ function wrap (metadata, methods, opts) {
213234
return wrapped
214235
}
215236

216-
function createMainHandlers (server, daemon) {
217-
return {
218-
stop: async (call) => {
219-
await daemon.close()
220-
setTimeout(() => {
221-
console.error('Daemon is exiting.')
222-
server.forceShutdown()
223-
process.exit(0)
224-
}, 250)
225-
return new rpc.main.messages.StopResponse()
226-
},
227-
228-
status: async (call) => {
229-
return new rpc.main.messages.StatusResponse()
230-
}
231-
}
232-
}
233-
234237
if (require.main === module) {
235-
start()
238+
const opts = extractArguments()
239+
const daemon = new HyperdriveDaemon(opts)
240+
daemon.start()
236241
} else {
237-
module.exports = start
242+
module.exports = HyperdriveDaemon
238243
}

0 commit comments

Comments
 (0)