Skip to content

Commit f22a16a

Browse files
committed
Temporarily force exit + better constants + initialization cleanup
1 parent 5eeb18c commit f22a16a

File tree

5 files changed

+53
-46
lines changed

5 files changed

+53
-46
lines changed

index.js

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const grpc = require('@grpc/grpc-js')
1010
const corestore = require('corestore')
1111
const SwarmNetworker = require('corestore-swarm-networking')
1212

13+
1314
const { rpc, loadMetadata } = require('hyperdrive-daemon-client')
1415
const constants = require('hyperdrive-daemon-client/lib/constants')
1516

@@ -34,12 +35,6 @@ class HyperdriveDaemon extends EventEmitter {
3435
this.storage = opts.storage || constants.storage
3536
this.port = opts.port || constants.port
3637

37-
this.db = level(`${this.storage}/db`, { valueEncoding: 'json' })
38-
const dbs = {
39-
fuse: sub(this.db, 'fuse', { valueEncoding: 'json' }),
40-
drives: sub(this.db, 'drives', { valueEncoding: 'json' })
41-
}
42-
4338
const corestoreOpts = {
4439
storage: path => raf(`${this.storage}/cores/${path}`),
4540
sparse: true,
@@ -60,21 +55,24 @@ class HyperdriveDaemon extends EventEmitter {
6055
}
6156
this.networking = new SwarmNetworker(this.corestore, networkOpts)
6257

63-
this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, this.opts)
64-
this.fuse = hyperfuse ? new FuseManager(this.megastore, this.drives, dbs.fuse, this.opts) : null
58+
// Set in ready.
59+
this.db = null
60+
this.drives = null
61+
this.fuse = null
62+
6563
// Set in start.
6664
this.server = null
65+
this._isMain = !!opts.main
6766
this._cleanup = null
6867

69-
this.drives.on('error', err => this.emit('error', err))
70-
this.fuse.on('error', err => this.emit('error', err))
71-
7268
this._isClosed = false
73-
this._isReady = false
69+
this._readyPromise = false
7470

7571
this.ready = () => {
76-
if (this._isReady) return Promise.resolve()
77-
return this._ready()
72+
if (this._isClosed) return Promise.resolve()
73+
if (this._readyPromise) return this._readyPromise
74+
this._readyPromise = this._ready()
75+
return this._readyPromise
7876
}
7977
}
8078

@@ -87,14 +85,24 @@ class HyperdriveDaemon extends EventEmitter {
8785
process.once(event, this._cleanup)
8886
}
8987

90-
return Promise.all([
91-
this.db.open(),
92-
this.networking.listen(),
88+
this.networking.listen()
89+
90+
this.db = level(`${this.storage}/db`, { valueEncoding: 'json' })
91+
const dbs = {
92+
fuse: sub(this.db, 'fuse', { valueEncoding: 'json' }),
93+
drives: sub(this.db, 'drives', { valueEncoding: 'json' })
94+
}
95+
this.drives = new DriveManager(this.corestore, this.networking, dbs.drives, this.opts)
96+
this.fuse = hyperfuse ? new FuseManager(this.drives, dbs.fuse, this.opts) : null
97+
this.drives.on('error', err => this.emit('error', err))
98+
this.fuse.on('error', err => this.emit('error', err))
99+
100+
await Promise.all([
93101
this.drives.ready(),
94102
this.fuse ? this.fuse.ready() : Promise.resolve()
95-
]).then(() => {
96-
this._ready = true
97-
})
103+
])
104+
105+
this._isReady = true
98106
}
99107

100108
async _loadMetadata () {
@@ -118,10 +126,11 @@ class HyperdriveDaemon extends EventEmitter {
118126
createMainHandlers () {
119127
return {
120128
stop: async (call) => {
121-
await this.close()
129+
await this.stop()
122130
setTimeout(() => {
123131
console.error('Daemon is exiting.')
124132
this.server.forceShutdown()
133+
if (this._isMain) process.exit(0)
125134
}, 250)
126135
return new rpc.main.messages.StopResponse()
127136
},
@@ -137,7 +146,6 @@ class HyperdriveDaemon extends EventEmitter {
137146

138147
if (this.fuse && this.fuse.fuseConfigured) await this.fuse.unmount()
139148
if (this.networking) await this.networking.close()
140-
if (this.server) this.server.forceShutdown()
141149
await this.db.close()
142150

143151
for (const event of STOP_EVENTS) {
@@ -171,11 +179,6 @@ class HyperdriveDaemon extends EventEmitter {
171179
return resolve()
172180
})
173181
})
174-
175-
176-
async function close () {
177-
await this.close()
178-
}
179182
}
180183
}
181184

@@ -236,7 +239,7 @@ function wrap (metadata, methods, opts) {
236239

237240
if (require.main === module) {
238241
const opts = extractArguments()
239-
const daemon = new HyperdriveDaemon(opts)
242+
const daemon = new HyperdriveDaemon({ ...opts, main: true })
240243
daemon.start()
241244
} else {
242245
module.exports = HyperdriveDaemon

lib/drives/index.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,16 @@ class DriveManager extends EventEmitter {
4646
this._watchers = new Map()
4747
this._sessionCounter = 0
4848

49-
this._ready = new Promise(async resolve => {
50-
await this._reseed()
51-
return resolve()
52-
})
53-
this.ready = () => this._ready
49+
this._readyPromise = null
50+
51+
this.ready = () => {
52+
if (this._readyPromise) return this._readyPromise
53+
this._readyPromise = new Promise(async resolve => {
54+
await this._reseed()
55+
return resolve()
56+
})
57+
return this._readyPromise
58+
}
5459
}
5560

5661
async _reseed () {
@@ -306,19 +311,16 @@ class DriveManager extends EventEmitter {
306311
if (length !== 0) streamOpts.length = length
307312
streamOpts.start = start
308313

309-
console.log('STREAM OPTS:', streamOpts)
310314
const stream = drive.createReadStream(path, streamOpts)
311315

312316
const rspMapper = map.obj(chunk => {
313-
console.log('MAPPING RSP:', chunk)
314317
const rsp = new rpc.drive.messages.ReadStreamResponse()
315318
rsp.setChunk(chunk)
316319
return rsp
317320
})
318321

319322
pump(stream, rspMapper, call, err => {
320323
if (err) log.error({ id, err }, 'createReadStream error')
321-
console.log('AFTER PUMP HERE, err:', err)
322324
})
323325
},
324326

lib/fuse/index.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ const constants = require('hyperdrive-daemon-client/lib/constants')
1515
const log = require('../log').child({ component: 'fuse-manager' })
1616

1717
class FuseManager extends EventEmitter {
18-
constructor (megastore, driveManager, db, opts) {
18+
constructor (driveManager, db, opts) {
1919
super()
2020

21-
this.megastore = megastore
2221
this.driveManager = driveManager
2322
this.db = db
2423
this.opts = opts
@@ -27,7 +26,6 @@ class FuseManager extends EventEmitter {
2726
this._handlers = new Map()
2827

2928
// Set in ready.
30-
3129
this.fuseConfigured = false
3230
this._rootDrive = null
3331
this._rootMnt = null
@@ -444,8 +442,7 @@ class FuseManager extends EventEmitter {
444442

445443
await this.unmount(mnt)
446444

447-
const rsp = rpc.fuse.messages.UnmountResponse()
448-
return rsp
445+
return new rpc.fuse.messages.UnmountResponse()
449446
},
450447

451448
status: (call) => {

lib/log.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ const p = require('path')
33

44
const pino = require('pino')
55
const argv = require('yargs').argv
6-
7-
// TODO: Move to a consts file.
8-
const LOGFILE = p.join(os.homedir(), '.hyperdrive', 'log.json')
6+
const constants = require('hyperdrive-daemon-client/lib/constants')
97

108
// Forever will redirect stdout to the correct log file.
119
module.exports = pino({
1210
name: 'hyperdrive',
1311
level: argv['log-level'] || 'info',
1412
enabled: true,
15-
}, (process.env['NODE_ENV'] === 'test') ? pino.destination(2) : LOGFILE)
13+
}, (process.env['NODE_ENV'] === 'test') ? pino.destination(2) : constants.structuredLog)

test/hyperdrive.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ test('can write/read a large file from a remote hyperdrive', async t => {
4949
t.end()
5050
})
5151

52-
test.only('can write/read a file from a remote hyperdrive using stream methods', async t => {
52+
test('can write/read a file from a remote hyperdrive using stream methods', async t => {
5353
const { client, cleanup } = await createOne()
5454

5555
try {
@@ -311,6 +311,13 @@ test.skip('watch cleans up after unexpected close', async t => {
311311
t.end()
312312
})
313313

314+
// TODO: Figure out why the grpc server is not terminating.
315+
test.onFinish(() => {
316+
setTimeout(() => {
317+
process.exit(0)
318+
}, 100)
319+
})
320+
314321
function delay (ms) {
315322
return new Promise(resolve => setTimeout(resolve, ms))
316323
}

0 commit comments

Comments
 (0)