-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer.js
93 lines (79 loc) · 2.63 KB
/
transformer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
const fs = require("fs");
const csv = require("csvtojson");
const { transformRow } = require("./transform.js");
const { Transform, pipeline } = require("stream");
var numRows = 0;
exports.doTransform = function(options) {
const lineToArray = new Transform({
transform (chunk, encoding, cb) {
// add [ to very front
// add , between rows
// remove crlf from row
this.push((this.isNotAtFirstRow ? ',' : '[') + chunk.toString('utf-8').slice(0, -1));
this.isNotAtFirstRow = true;
cb();
},
flush(cb) {
// add ] to very end or [] if no rows
const isEmpty = (!this.isNotAtFirstRow);
this.push(isEmpty ? '[]' : ']');
cb();
}
});
const transformRowStream = new Transform({
transform: function(row, encoding, callback) {
try {
var rowStringRaw = row.toString();
var firstRow = false;
var middleRow = false;
if(rowStringRaw.substring(0,1) == '[') {
rowStringRaw = rowStringRaw.substring(1);
firstRow = true;
}
else if(rowStringRaw.substring(0,1) == ',') {
middleRow = true;
rowStringRaw = rowStringRaw.substring(1);
}
else if(rowStringRaw == ']') {
callback(null, ']');
return;
}
const rowObject = JSON.parse(rowStringRaw);
const transformedRow = transformRow(rowObject, options);
var rowString = JSON.stringify(transformedRow);
if(firstRow) {
rowString = '[' + rowString;
}
else if(middleRow) {
rowString = ',' + rowString
}
else if (options.outputFormat === 'ndjson') {
rowString = rowString + '\n';
}
process.stdout.write(++numRows + " rows written.\r");
callback(null, rowString);
} catch (err) {
callback(err);
}
}
});
const inputStream = fs.createReadStream(options.inputFile);
const outputStream = fs.createWriteStream(options.outputFile); //, { 'flags': 'a' });
outputStream.on('finish',function() {console.log("Pipeline completed successfully (" + numRows + " rows.)")})
const csvParser = options.checkTypes === 'true' ? csv({checkType: true,downstreamFormat: 'line'}) : csv({downstreamFormat: 'line'});
//const csvParser = csv({checkType: true,downstreamFormat: 'line'});
if(options.outputFormat === 'json') {
inputStream
.pipe(csvParser)
.pipe(lineToArray)
.pipe(transformRowStream)
.pipe(outputStream);
}
else {
pipeline(inputStream,csvParser,transformRowStream,outputStream, err => {
if (err) {
console.log("\nPipeline encountered an error:", err);
}
});
}
}