@@ -7,6 +7,7 @@ var defParam=require("./defParam");
77var csvline = require ( "./csvline" ) ;
88var fileline = require ( "./fileline" ) ;
99var dataToCSVLine = require ( "./dataToCSVLine" ) ;
10+ var fileLineToCSVLine = require ( "./fileLineToCSVLine" ) ;
1011var linesToJson = require ( "./linesToJson" ) ;
1112var CSVError = require ( "./CSVError" ) ;
1213var workerMgr = require ( "./workerMgr" ) ;
@@ -41,7 +42,8 @@ function Converter(params,options) {
4142 this . _csvTransf = null ;
4243 this . finalResult = [ ] ;
4344 // this.on("data", function() {});
44- this . on ( "error" , function ( ) { } ) ;
45+ this . on ( "error" , emitDone ( this ) ) ;
46+ this . on ( "end" , emitDone ( this ) ) ;
4547 this . initWorker ( ) ;
4648 process . nextTick ( function ( ) {
4749 if ( this . _needEmitFinalResult === null ) {
@@ -69,6 +71,13 @@ function Converter(params,options) {
6971 return this ;
7072}
7173util . inherits ( Converter , Transform ) ;
74+ function emitDone ( conv ) {
75+ return function ( err ) {
76+ process . nextTick ( function ( ) {
77+ conv . emit ( 'done' , err )
78+ } )
79+ }
80+ }
7281Converter . prototype . _transform = function ( data , encoding , cb ) {
7382 if ( this . param . toArrayString && this . started === false ) {
7483 this . started = true ;
@@ -94,21 +103,38 @@ Converter.prototype.setPartialData=function(d){
94103}
95104Converter . prototype . processData = function ( data , cb ) {
96105 var params = this . param ;
106+ var fileLines = fileline ( data , this . param )
107+ if ( this . preProcessLine && typeof this . preProcessLine === "function" ) {
108+ fileLines . lines = this . _preProcessLines ( fileLines . lines , this . lastIndex )
109+ }
97110 if ( ! params . _headers ) { //header is not inited. init header
98- this . processHead ( data , cb ) ;
111+ this . processHead ( fileLines , cb ) ;
99112 } else {
100113 if ( params . workerNum <= 1 ) {
101- var lines = dataToCSVLine ( data , params ) ;
114+ var lines = fileLineToCSVLine ( fileLines , params ) ;
102115 this . setPartialData ( lines . partial ) ;
103116 var jsonArr = linesToJson ( lines . lines , params , this . recordNum ) ;
104117 this . processResult ( jsonArr )
105118 this . lastIndex += jsonArr . length ;
106119 this . recordNum += jsonArr . length ;
107120 cb ( ) ;
108121 } else {
109- this . workerProcess ( data , cb ) ;
122+ this . workerProcess ( fileLines , cb ) ;
123+ }
124+ }
125+ }
126+ Converter . prototype . _preProcessLines = function ( lines , startIdx ) {
127+ var rtn = [ ]
128+ for ( var i = 0 ; i < lines . length ; i ++ ) {
129+ var result = this . preProcessLine ( lines [ i ] , startIdx + i + 1 )
130+ if ( typeof result === "string" ) {
131+ rtn . push ( result )
132+ } else {
133+ rtn . push ( lines [ i ] )
134+ this . emit ( "error" , new Error ( "preProcessLine should return a string but got: " + JSON . stringify ( result ) ) )
110135 }
111136 }
137+ return rtn
112138}
113139Converter . prototype . initWorker = function ( ) {
114140 var workerNum = this . param . workerNum - 1 ;
@@ -117,14 +143,22 @@ Converter.prototype.initWorker=function(){
117143 this . workerMgr . initWorker ( workerNum , this . param ) ;
118144 }
119145}
146+ Converter . prototype . preRawData = function ( func ) {
147+ this . preProcessRaw = func ;
148+ return this ;
149+ }
150+ Converter . prototype . preFileLine = function ( func ) {
151+ this . preProcessLine = func ;
152+ return this ;
153+ }
120154/**
121155 * workerpRocess does not support embeded multiple lines.
122156 */
123157
124- Converter . prototype . workerProcess = function ( data , cb ) {
158+ Converter . prototype . workerProcess = function ( fileLine , cb ) {
125159 var self = this ;
126- var line = fileline ( data , this . param )
127- var eol = this . getEol ( data )
160+ var line = fileLine
161+ var eol = this . getEol ( )
128162 this . setPartialData ( line . partial )
129163 this . workerMgr . sendWorker ( line . lines . join ( eol ) + eol , this . lastIndex , cb , function ( results , lastIndex ) {
130164 var cur = self . sequenceBuffer [ 0 ] ;
@@ -154,10 +188,10 @@ Converter.prototype.workerProcess=function(data,cb){
154188 } ) ;
155189 this . lastIndex += line . lines . length ;
156190}
157- Converter . prototype . processHead = function ( data , cb ) {
191+ Converter . prototype . processHead = function ( fileLine , cb ) {
158192 var params = this . param ;
159193 if ( ! params . _headers ) { //header is not inited. init header
160- var lines = dataToCSVLine ( data , params ) ;
194+ var lines = fileLineToCSVLine ( fileLine , params ) ;
161195 this . setPartialData ( lines . partial ) ;
162196 if ( params . noheader ) {
163197 if ( params . headers ) {
@@ -198,6 +232,7 @@ Converter.prototype.processResult=function(result){
198232 // this.lastIndex+=result.length;
199233 // cb();
200234}
235+
201236Converter . prototype . emitResult = function ( r ) {
202237 var index = r . index ;
203238 var row = r . row ;
@@ -320,7 +355,7 @@ Converter.prototype.getEol = function(data) {
320355 this . param . eol = eol ;
321356 }
322357
323- return this . param . eol ;
358+ return this . param . eol || eol ;
324359} ;
325360Converter . prototype . fromFile = function ( filePath , cb ) {
326361 var fs = require ( 'fs' ) ;
0 commit comments