Skip to content

Commit b598601

Browse files
committed
Additional methods and bug fixes
Methods added/updated: * version * createDiffStream * download (untested) * stat returns Stat objects. Also updated the session management, and stopped writing the .key file by default
1 parent 727edec commit b598601

File tree

5 files changed

+259
-12
lines changed

5 files changed

+259
-12
lines changed

index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ function wrap (metadata, methods, opts) {
223223
method(call)
224224
.then(rsp => {
225225
log.debug(tag, 'request was successful')
226-
if (cb) return cb(null, rsp)
226+
if (cb) process.nextTick(cb, null, rsp)
227227
})
228228
.catch(err => {
229229
log.error({ ...tag, error: err.toString(), stack: err.stack }, 'request failed')

lib/drives/array-index.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
class ArrayIndex {
2+
constructor () {
3+
this._arr = []
4+
}
5+
6+
_getFreeIndex () {
7+
var idx = this._arr.indexOf(null)
8+
if (idx === -1) idx = this._arr.length
9+
if (!idx) idx = 1
10+
return idx
11+
}
12+
13+
get (idx) {
14+
return this._arr[idx]
15+
}
16+
17+
insert (value) {
18+
const idx = this._getFreeIndex()
19+
this._arr[idx] = value
20+
return idx
21+
}
22+
23+
delete (idx) {
24+
this._arr[idx] = null
25+
}
26+
}
27+
28+
module.exports = ArrayIndex

lib/drives/index.js

Lines changed: 159 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ const {
1515
toStat,
1616
fromMount,
1717
toDriveStats,
18+
toDownloadProgress,
19+
toDiffEntry,
1820
toChunks
1921
} = require('hyperdrive-daemon-client/lib/common')
2022
const { rpc } = require('hyperdrive-daemon-client')
23+
const ArrayIndex = require('./array-index.js')
2124

2225
const log = require('../log').child({ component: 'drive-manager' })
2326

@@ -40,9 +43,9 @@ class DriveManager extends EventEmitter {
4043

4144
// TODO: Replace with an LRU cache.
4245
this._drives = new Map()
43-
this._sessions = new Map()
4446
this._watchers = new Map()
45-
this._sessionCounter = 0
47+
this._sessions = new ArrayIndex()
48+
this._downloads = new ArrayIndex()
4649

4750
this._readyPromise = null
4851

@@ -123,12 +126,12 @@ class DriveManager extends EventEmitter {
123126

124127
async createSession (key, opts) {
125128
const drive = await this.get(key, opts)
126-
this._sessions.set(++this._sessionCounter, drive)
127-
return { drive, session: this._sessionCounter }
129+
const sessionId = this._sessions.insert(drive)
130+
return { drive, session: sessionId }
128131
}
129132

130-
async closeSession (id) {
131-
this._sessions.delete(id)
133+
closeSession (id) {
134+
return this._sessions.delete(id)
132135
}
133136

134137
async getAllStats () {
@@ -198,7 +201,8 @@ class DriveManager extends EventEmitter {
198201
keyString = this._generateKeyString(key, opts)
199202

200203
if (drive.writable) {
201-
await this._configureDrive(drive, opts && opts.configure)
204+
// TODO: Replace everything in _configureDrive with virtual files in the FUSE component.
205+
// await this._configureDrive(drive, opts && opts.configure)
202206
} else {
203207
// All read-only drives are currently published by default.
204208
await this.publish(drive)
@@ -230,6 +234,18 @@ class DriveManager extends EventEmitter {
230234

231235
getHandlers () {
232236
return {
237+
version: async (call) => {
238+
const id = call.request.getId()
239+
240+
if (!id) throw new Error('A version request must specify a session ID.')
241+
const drive = this.driveForSession(id)
242+
243+
const rsp = new rpc.drive.messages.DriveVersionResponse()
244+
rsp.setVersion(drive.version)
245+
246+
return rsp
247+
},
248+
233249
get: async (call) => {
234250
var driveOpts = call.request.getOpts()
235251
if (driveOpts) driveOpts = fromHyperdriveOptions(driveOpts)
@@ -293,6 +309,139 @@ class DriveManager extends EventEmitter {
293309
return rsp
294310
},
295311

312+
download: async (call) => {
313+
const id = call.request.getId()
314+
const path = call.request.getPath()
315+
const detailed = call.request.getDetailed()
316+
317+
if (!id) throw new Error('A download request must specify a session ID.')
318+
const drive = this.driveForSession(id)
319+
var downloadId = null
320+
var ended = false
321+
322+
const dl = drive.download(path, { detailed }, () => {
323+
if (ended) return null
324+
return finish(true)
325+
})
326+
downloadId = this._downloads.insert(dl)
327+
328+
call.on('end', cleanup)
329+
call.on('close', cleanup)
330+
call.on('finish', cleanup)
331+
call.on('error', cleanup)
332+
333+
dl.on('start', start)
334+
dl.on('cancelled', () => finish(false))
335+
dl.on('progress', progress)
336+
337+
function progress (path, fileStats, totals) {
338+
const rsp = new rpc.drive.messages.DownloadResponse()
339+
rsp.setDownloadid(downloadId)
340+
rsp.setType(rpc.drive.messages.DownloadResponse.Type.PROGRESS)
341+
342+
if (detailed) {
343+
const fileRsps = fileStats.map(getFileStatusRsp)
344+
rsp.setFilesList(fileRsps)
345+
}
346+
347+
const progressRsp = toDownloadProgress(totals)
348+
rsp.setProgress(progressRsp)
349+
350+
call.write(rsp)
351+
}
352+
353+
function start (files, totals) {
354+
const rsp = new rpc.drive.messages.DownloadResponse()
355+
rsp.setDownloadid(downloadId)
356+
rsp.setType(rpc.drive.messages.DownloadResponse.Type.START)
357+
358+
if (detailed) {
359+
const fileRsps = files.map(getFileStatusRsp)
360+
rsp.setFilesList(fileRsps)
361+
}
362+
363+
const progressRsp = toDownload(totals)
364+
rsp.setProgress(progressRsp)
365+
366+
call.write(rsp)
367+
}
368+
369+
function finish (completed) {
370+
ended = true
371+
const rsp = new rpc.drive.messages.DownloadResponse()
372+
rsp.setDownloadid(downloadId)
373+
rsp.setType(rpc.drive.messages.DownloadResponse.Type.FINISH)
374+
375+
const progress = new rpc.drive.messages.DownloadProgress()
376+
progress.setCompleted(!!completed)
377+
progress.setCancelled(!completed)
378+
rsp.setProgress(progress)
379+
380+
cleanup()
381+
call.end(rsp)
382+
}
383+
384+
function cleanup () {
385+
this._downloads.delete(downloadId)
386+
dl.removeAllListeners('progress')
387+
dl.removeAllListeners('cancelled')
388+
dl.removeAllListeners('start')
389+
}
390+
391+
function getFileStatusRsp (path, stats) {
392+
const fileStatus = new rpc.drive.messages.FileDownloadStatus()
393+
fileStatus.setPath(path)
394+
if (stats) fileStatus.setProgress(toDownloadProgress(stats))
395+
return fileStatus
396+
}
397+
},
398+
399+
undownload: async (call) => {
400+
const id = call.request.getId()
401+
const downloadId = call.request.getDownloadid()
402+
403+
if (!id) throw new Error('An undownload request must specify a session ID.')
404+
if (!downloadId) throw new Error('An undownload request must specify a download ID.')
405+
const drive = this.driveForSession(id)
406+
407+
const dl = this._downloads.get(downloadId)
408+
if (dl) dl.cancel()
409+
this._downloads.delete(downloadId)
410+
411+
return new rpc.drive.messages.UndownloadResponse()
412+
},
413+
414+
createDiffStream: async (call) => {
415+
const id = call.request.getId()
416+
const prefix = call.request.getPrefix()
417+
const otherVersion = call.request.getOther()
418+
419+
if (!id) throw new Error('A diff stream request must specify a session ID.')
420+
const drive = this.driveForSession(id)
421+
422+
const stream = drive.createDiffStream(otherVersion, prefix)
423+
424+
const rspMapper = map.obj(chunk => {
425+
const rsp = new rpc.drive.messages.DiffStreamResponse()
426+
if (!chunk) return rsp
427+
428+
const { name, type, value } = chunk
429+
rsp.setType(type)
430+
rsp.setName(name)
431+
if (type === 'put') {
432+
rsp.setValue(toDiffEntry({ stat: value }))
433+
} else {
434+
rsp.setValue(toDiffEntry({ mount: value }))
435+
}
436+
437+
return rsp
438+
})
439+
440+
pump(stream, rspMapper, call, err => {
441+
if (err) log.error({ id, err }, 'createDiffStream error')
442+
})
443+
},
444+
296445
createReadStream: async (call) => {
297446
const id = call.request.getId()
298447
const path = call.request.getPath()
@@ -408,8 +557,10 @@ class DriveManager extends EventEmitter {
408557
if (!path) throw new Error('A stat request must specify a path. ')
409558
const drive = this.driveForSession(id)
410559

560+
const method = lstat ? drive.lstat.bind(drive) : drive.stat.bind(drive)
561+
411562
return new Promise((resolve, reject) => {
412-
drive.stat(path, { followLink: lstat }, (err, stat) => {
563+
method(path, (err, stat) => {
413564
if (err) return reject(err)
414565

415566
const rsp = new rpc.drive.messages.StatResponse()

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"forever-monitor": "^1.7.1",
3737
"fs-extra": "^7.0.1",
3838
"google-protobuf": "^3.8.0",
39-
"hyperdrive": "^10.0.0-rc10",
39+
"hyperdrive": "^10.2.0",
4040
"hyperdrive-daemon-client": "^0.10.1",
4141
"level": "^4.0.0",
4242
"mkdirp": "^0.5.1",

test/hyperdrive.js

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ test('can write/read a file from a remote hyperdrive', async t => {
1111
t.true(drive.key)
1212
t.same(drive.id, 1)
1313

14+
const version = await drive.version()
15+
1416
await drive.writeFile('hello', 'world')
1517

1618
const contents = await drive.readFile('hello')
@@ -122,11 +124,10 @@ test('can list a directory from a remote hyperdrive', async t => {
122124
await drive.writeFile('adios', 'amigo')
123125

124126
const files = await drive.readdir('')
125-
t.same(files.length, 4)
127+
t.same(files.length, 3)
126128
t.notEqual(files.indexOf('hello'), -1)
127129
t.notEqual(files.indexOf('goodbye'), -1)
128130
t.notEqual(files.indexOf('adios'), -1)
129-
t.notEqual(files.indexOf('.key'), -1)
130131

131132
await drive.close()
132133
} catch (err) {
@@ -137,6 +138,70 @@ test('can list a directory from a remote hyperdrive', async t => {
137138
t.end()
138139
})
139140

141+
test('can create a diff stream on a remote hyperdrive', async t => {
142+
const { client, cleanup } = await createOne()
143+
144+
try {
145+
const drive1 = await client.drive.get()
146+
const drive2 = await client.drive.get()
147+
148+
await drive1.writeFile('hello', 'world')
149+
const v1 = await drive1.version()
150+
await drive1.writeFile('goodbye', 'dog')
151+
const v2 = await drive1.version()
152+
await drive1.mount('d2', { key: drive2.key })
153+
const v3 = await drive1.version()
154+
await drive1.unmount('d2')
155+
156+
const diff1 = await drive1.createDiffStream()
157+
const checkout = await drive1.checkout(v2)
158+
const diff2 = await checkout.createDiffStream(v1)
159+
const diff3 = await drive1.createDiffStream(v3)
160+
161+
await validate(diff1, [
162+
{ type: 'put', name: 'goodbye' },
163+
{ type: 'put', name: 'hello' }
164+
])
165+
await validate(diff2, [
166+
{ type: 'put', name: 'goodbye'}
167+
])
168+
await validate(diff3, [
169+
// TODO: The first is a false positive.
170+
{ type: 'put', name: 'goodbye' },
171+
{ type: 'unmount', name: 'd2' }
172+
])
173+
174+
await drive1.close()
175+
await drive2.close()
176+
} catch (err) {
177+
t.fail(err)
178+
}
179+
180+
await cleanup()
181+
t.end()
182+
183+
async function validate (stream, expected) {
184+
return new Promise((resolve, reject) => {
185+
var seen = 0
186+
stream.on('end', () => {
187+
t.same(seen, expected.length)
188+
return resolve()
189+
})
190+
stream.on('error', t.fail.bind(t))
191+
stream.on('data', ({ name, left, right }) => {
192+
t.same(name, expected[seen].name)
193+
if (left) {
194+
t.same(left.type, expected[seen].left.type)
195+
}
196+
if (right) {
197+
t.same(right.type, expected[seen].right.type)
198+
}
199+
seen++
200+
})
201+
})
202+
}
203+
})
204+
140205
test('can read/write multiple remote hyperdrives on one server', async t => {
141206
const { client, cleanup } = await createOne()
142207
var startingId = 1
@@ -311,6 +376,9 @@ test('can create a symlink to directories', async t => {
311376
t.same(files.length, 1)
312377
t.same(files[0], 'world')
313378

379+
const stat = await drive.lstat('other_world')
380+
t.true(stat.isSymbolicLink())
381+
314382
await drive.close()
315383
} catch (err) {
316384
t.fail(err)

0 commit comments

Comments
 (0)