आप headers
. प्राप्त करके इसे फास्ट-सीएसवी के साथ कर सकते हैं स्कीमा परिभाषा से जो पार्स की गई रेखाओं को "ऑब्जेक्ट्स" के रूप में वापस कर देगा। आपके पास वास्तव में कुछ बेमेल हैं, इसलिए मैंने उन्हें सुधार के साथ चिह्नित किया है:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
जब तक स्कीमा वास्तव में प्रदान किए गए सीएसवी तक जाती है तो यह ठीक है। ये वे सुधार हैं जिन्हें मैं देख सकता हूं लेकिन यदि आपको वास्तविक फ़ील्ड नामों को अलग-अलग संरेखित करने की आवश्यकता है तो आपको समायोजित करने की आवश्यकता है। लेकिन मूल रूप से एक Number
था उस स्थिति में जहां String
. है और अनिवार्य रूप से एक अतिरिक्त क्षेत्र, जिसे मैं मान रहा हूं वह CSV में रिक्त है।
सामान्य चीजें स्कीमा से फ़ील्ड नामों की सरणी प्राप्त कर रही हैं और सीएसवी पार्सर इंस्टेंस बनाते समय विकल्पों में पास कर रही हैं:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
एक बार जब आप वास्तव में ऐसा कर लेते हैं तो आपको एक सरणी के बजाय एक "ऑब्जेक्ट" वापस मिल जाता है:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
"प्रकार" के बारे में चिंता न करें क्योंकि नेवला स्कीमा के अनुसार मान डालेगा।
बाकी data
के लिए हैंडलर के अंदर होता है प्रतिस्पर्धा। अधिकतम दक्षता के लिए हम insertMany()
. का उपयोग कर रहे हैं प्रत्येक 10,000 पंक्तियों में केवल एक बार डेटाबेस को लिखने के लिए। यह वास्तव में सर्वर पर कैसे जाता है और प्रक्रियाएं MongoDB संस्करण पर निर्भर करती हैं, लेकिन मेमोरी उपयोग और लेखन के लिए "ट्रेड-ऑफ" के संदर्भ में आपके द्वारा एकल संग्रह के लिए आयात किए जाने वाले फ़ील्ड की औसत संख्या के आधार पर 10,000 काफी उचित होना चाहिए। उचित नेटवर्क अनुरोध। यदि आवश्यक हो तो संख्या को छोटा करें।
इन कॉलों को async
. के रूप में चिह्नित करने के लिए महत्वपूर्ण भाग हैं फ़ंक्शन और await
insertMany()
. का परिणाम जारी रखने से पहले। साथ ही हमें pause()
. की आवश्यकता है स्ट्रीम और resume()
प्रत्येक आइटम पर अन्यथा हम buffer
. को अधिलेखित करने का जोखिम उठाते हैं वास्तव में भेजे जाने से पहले डालने के लिए दस्तावेजों की संख्या। pause()
और resume()
पाइप पर "बैक-प्रेशर" डालने के लिए आवश्यक हैं, अन्यथा आइटम केवल "बाहर" आते रहते हैं और data
को सक्रिय करते रहते हैं। घटना।
स्वाभाविक रूप से 10,000 प्रविष्टियों के लिए नियंत्रण की आवश्यकता है कि हम बफर को खाली करने और सर्वर पर किसी भी शेष दस्तावेज़ को भेजने के लिए प्रत्येक पुनरावृत्ति और स्ट्रीम पूर्ण होने पर जांच करें।
वास्तव में आप यही करना चाहते हैं, क्योंकि आप निश्चित रूप से data
के माध्यम से "प्रत्येक" पुनरावृत्ति पर सर्वर के लिए एक एसिंक अनुरोध को बंद नहीं करना चाहते हैं। घटना या अनिवार्य रूप से प्रत्येक अनुरोध के पूरा होने की प्रतीक्षा किए बिना। आप "बहुत छोटी फ़ाइलों" की जाँच न करने से दूर हो जाएंगे, लेकिन किसी भी वास्तविक दुनिया के लोड के लिए आप निश्चित रूप से "इन फ़्लाइट" async कॉल के कारण कॉल स्टैक को पार कर लेंगे जो अभी तक पूरा नहीं हुआ है।
FYI करें - एक package.json
उपयोग किया गया। mz
वैकल्पिक है क्योंकि यह सिर्फ एक आधुनिकीकृत Promise
है मानक नोड "अंतर्निहित" पुस्तकालयों की सक्षम लाइब्रेरी जिसका मैं उपयोग करने के लिए उपयोग किया जाता हूं। कोड निश्चित रूप से fs
. के साथ पूरी तरह से विनिमेय है मॉड्यूल।
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
वास्तव में नोड v8.9.x और इसके बाद के संस्करण के साथ हम AsyncIterator
के कार्यान्वयन के साथ इसे और भी सरल बना सकते हैं stream-to-iterator
. के माध्यम से मापांक। यह अभी भी Iterator<Promise<T>>
. में है मोड, लेकिन इसे तब तक करना चाहिए जब तक कि Node v10.x स्थिर LTS न बन जाए:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
मूल रूप से, सभी स्ट्रीम "ईवेंट" को संभालना और रोकना और फिर से शुरू करना एक सरल for
द्वारा प्रतिस्थापित हो जाता है लूप:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
आसान! यह बाद के नोड कार्यान्वयन में for..await..of
. के साथ साफ हो जाता है जब यह अधिक स्थिर हो जाता है। लेकिन उपरोक्त निर्दिष्ट संस्करण और ऊपर से ठीक चलता है।