Skip to content

Commit c60b777

Browse files
committed
Merge pull request #34 from CfABrigadePhiladelphia/import_pipeline
import files with importPipeline
2 parents a2400ad + 30c0be0 commit c60b777

File tree

5 files changed

+64
-41
lines changed

5 files changed

+64
-41
lines changed

index.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
var level = require('level')
22
var hypercore = require('hypercore')
3-
var createImportStream = require('./lib/import.js')
3+
var createImportPipeline = require('./lib/import.js')
44

55
module.exports = Jawn
66

@@ -11,6 +11,6 @@ function Jawn (opts) {
1111
this.db = this.core.db
1212
}
1313

14-
Jawn.prototype.createImportStream = function (opts) {
15-
return createImportStream(this, opts)
14+
Jawn.prototype.createImportPipeline = function (opts, callback) {
15+
return createImportPipeline(this, opts, callback)
1616
}

lib/import.js

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,30 @@
1-
var pump = require('pump')
1+
var miss = require('mississippi')
22
var through = require('through2')
3+
var parseInputStream = require('parse-input-stream')
34

4-
module.exports = importStream
5+
module.exports = importPipeline
56

6-
function importStream (jawn, opts) {
7+
// Returns the parser at the beginning of the pipeline
8+
// When the pipeline finishes, it calls `callback(feedId, err)`.
9+
// If there were no errors, err will be undefined.
10+
function importPipeline (jawn, opts, callback) {
711
if (!opts) opts = {}
812
var writeStream = jawn.core.createWriteStream(opts)
9-
var stream = pump(parseInputStream(opts), writeStream, function done (err) {
10-
stream.id = writeStream.id
11-
console.log(err)
12-
})
13-
return stream
14-
}
13+
var parser = parseInputStream(opts)
14+
var transform = through.obj(stringifyData, end)
1515

16-
// Transform input CSV, JSON, etc to json objects that will be written as blocks in our hypercore feed
17-
// TODO: Implement this! See https://github.com/CfABrigadePhiladelphia/jawn/issues/32
18-
// This is the functionality that parse-input-stream aims to support.
19-
// If you can get that module to work, feel free to use it!
20-
function parseInputStream (opts) {
21-
var transformStream = through(write, end)
22-
return transformStream
16+
miss.pipe(parser, transform, writeStream, function done (err) {
17+
callback(err, writeStream.id)
18+
})
2319

24-
function write (buffer, encoding, next) {
25-
// transform input CSV, JSON, etc. here ...
20+
function stringifyData (data, enc, next) {
21+
this.push(JSON.stringify(data))
2622
next()
2723
}
2824

2925
function end (done) {
3026
done()
3127
}
28+
29+
return parser
3230
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
"fs": "0.0.2",
2525
"hypercore": "~1.8.0",
2626
"level": "~1.4.0",
27+
"mississippi": "^1.2.0",
2728
"parse-input-stream": "^1.0.1",
2829
"path": "~0.12.7",
29-
"pump": "^1.0.1",
3030
"tape": "~4.5.1",
3131
"through2": "^2.0.1"
3232
},

test/data/dummy.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"foo":"bar","name":"josie","age":"35"}
2+
{"foo":"baz","name":"eloise","age":"71"}
3+
{"foo":"baz","name":"francoise","age":"5"}

test/import.js

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,61 @@
11
var test = require('tape')
2+
var fs = require('fs')
3+
var path = require('path')
24
var Jawn = require('../')
35
var memdb = require('memdb')
46

57
test('import json to jawn', function (t) {
68
var jawn = freshJawn()
7-
var importStream = jawn.createImportStream({'format': 'json'})
8-
// Imitate the stream that would come from reading sample.csv
9-
// importStream should parse the CSV correctly, identifying the first row as headers
10-
// This is the same as doing
11-
// var data = fs.createReadStream('./test/data/sample.csv')
12-
// data.pipe(importStream)
13-
// except the writes are being performed synchronously/inline so we can call importStream.end() after writing the contents into it.
14-
importStream.write('{foo: "bar", name: "josie"}')
15-
importStream.write('{foo: "baz", name: "eloise"}')
16-
importStream.write('{foo: "baz", name: "francoise"}')
9+
importFromFile(jawn, 'dummy.json', {'format': 'json'}, verify)
10+
var expected = [
11+
'{"foo":"bar","name":"josie","age":"35"}',
12+
'{"foo":"baz","name":"eloise","age":"71"}',
13+
'{"foo":"baz","name":"francoise","age":"5"}'
14+
]
15+
function verify (err, feedId) {
16+
if (err) { console.log(err) }
17+
var rs = jawn.core.createReadStream(feedId)
18+
rs.on('data', function (block) {
19+
t.same(block.toString(), expected.shift(), 'block matches imported line')
20+
})
21+
t.same(jawn.core.get(feedId).blocks, 3, 'correct number of blocks returned')
22+
t.end()
23+
}
24+
})
1725

18-
// This should be expecting JSON objects, not strings.
19-
// temporarily expecting strings in order to hand off the code as-is
26+
test('import csv to jawn', function (t) {
27+
var jawn = freshJawn()
28+
importFromFile(jawn, 'sample.csv', {'format': 'csv'}, verify)
2029
var expected = [
21-
'{foo: "bar", name: "josie"}',
22-
'{foo: "baz", name: "eloise"}',
23-
'{foo: "baz", name: "francoise"}'
30+
'{"Type of Experience":"Writing software in any programming language","Little/No Experience":"1","Some Experience":"5","Very Familiar":"4"}',
31+
'{"Type of Experience":"Frontend Web Development","Little/No Experience":"4","Some Experience":"3","Very Familiar":"3"}',
32+
'{"Type of Experience":"Server-side (“backend”) Web Development","Little/No Experience":"4","Some Experience":"4","Very Familiar":"2"}',
33+
'{"Type of Experience":"Using Git to track changes and share code (add, commit, push, pull)","Little/No Experience":"2","Some Experience":"5","Very Familiar":"3"}'
2434
]
2535

26-
importStream.end(function () {
27-
var feedId = importStream.id.toString('hex')
36+
function verify (err, feedId) {
37+
if (err) { console.log(err) }
2838
var rs = jawn.core.createReadStream(feedId)
2939
rs.on('data', function (block) {
3040
t.same(block.toString(), expected.shift(), 'block matches imported line')
3141
})
32-
t.same(jawn.core.get(feedId).blocks, 3, 'correct number of blocks returned')
42+
t.same(jawn.core.get(feedId).blocks, 4, 'correct number of blocks returned')
3343
t.end()
34-
})
44+
}
3545
})
3646

47+
// helpers
48+
49+
function fixture (name) {
50+
return path.join(__dirname, 'data', name)
51+
}
52+
3753
function freshJawn () {
3854
return new Jawn({db: memdb()})
3955
}
56+
57+
function importFromFile (jawn, file, opts, callback) {
58+
var importPipeline = jawn.createImportPipeline(opts, callback)
59+
var data = fs.createReadStream(fixture(file))
60+
data.pipe(importPipeline)
61+
}

0 commit comments

Comments
 (0)