Skip to content

Commit ac2d5d0

Browse files
committed
Add: moved drivewatcher here
1 parent 2060ba6 commit ac2d5d0

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed

lib/driveWatcher.js

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
const { EventEmitter } = require('events')
2+
const low = require('last-one-wins')
3+
const pump = require('pump')
4+
const streamx = require('streamx')
5+
6+
class DriveWatcher extends EventEmitter {
7+
constructor (client, drive, opts = {}) {
8+
super()
9+
this.client = client
10+
this.drive = drive
11+
this.recursive = !!opts.recursive
12+
this.drivesByPath = new Map([[ '/', drive ]])
13+
this.versionsByPath = new Map()
14+
this.unwatchesByPath = new Map()
15+
this.watchers = []
16+
this.timer = null
17+
this.emittingStats = false
18+
}
19+
20+
_createDiffer (path, drive) {
21+
// Last-one-wins in case the watch is triggered many times in quick succession.
22+
const self = this
23+
return low(onupdate)
24+
25+
async function onupdate (_, cb) {
26+
const lastVersion = self.versionsByPath.get(path)
27+
try {
28+
const diffStream = await drive.createDiffStream(lastVersion)
29+
const currentVersion = await drive.version()
30+
self.versionsByPath.set(path, currentVersion)
31+
return pump(diffStream, new streamx.Transform({
32+
transform: (data, cb) => {
33+
for (const watcher of self.watchers) {
34+
watcher(p.join(path, data.name))
35+
}
36+
return cb(null)
37+
}
38+
}), err => {
39+
if (err) return cb(err)
40+
return cb(null)
41+
})
42+
} catch (err) {
43+
return cb(err)
44+
}
45+
}
46+
}
47+
48+
async _emitStats () {
49+
if (this.emittingStats) return
50+
this.emittingStats = true
51+
var total = 0
52+
var downloaded = 0
53+
var peers = 0
54+
for (const [path, drive] of this.drivesByPath) {
55+
const driveStats = await drive.stats()
56+
for (const { path, metadata } of driveStats.stats) {
57+
if (path !== '/') continue
58+
downloaded += metadata.downloadedBlocks
59+
total += metadata.totalBlocks
60+
peers = metadata.peers
61+
}
62+
}
63+
this.emit('stats', { total, downloaded, peers })
64+
this.emittingStats = false
65+
}
66+
67+
async start () {
68+
// TODO: Handle dynamic (un)mounting.
69+
this.versionsByPath.set('/', await this.drive.version())
70+
this.unwatchesByPath.set('/', this.drive.watch('/', this._createDiffer('/', this.drive)))
71+
const allMounts = await this.drive.mounts({ memory: false, recursive: this.recursive })
72+
for (const { path, opts } of allMounts) {
73+
if (path === '/') continue
74+
const childDrive = await this.client.drive.get({ key: opts.key })
75+
this.drivesByPath.set(path, childDrive)
76+
this.versionsByPath.set(path, opts.version)
77+
this.unwatchesByPath.set(path, childDrive.watch('/', this._createDiffer(path, childDrive)))
78+
}
79+
this.timer = setInterval(this._emitStats.bind(this), 1000)
80+
}
81+
82+
watch (_, onwatch) {
83+
// The watch path is ignored for drives.
84+
this.watchers.push(onwatch)
85+
return () => {
86+
this.watchers.splice(this.watchers.indexOf(onwatch), 1)
87+
}
88+
}
89+
90+
async close () {
91+
for (const [path, unwatch] of this.unwatchesByPath) {
92+
await unwatch()
93+
}
94+
if (this.timer) {
95+
clearInterval(this.timer)
96+
this.timer = null
97+
}
98+
}
99+
}
100+
101+
module.exports = DriveWatcher

0 commit comments

Comments
 (0)