स्ट्रीमिंग में आपका स्वागत है। आप वास्तव में जो चाहते हैं वह एक "ईवेंट स्ट्रीम" है जो आपके इनपुट "एक समय में एक हिस्सा" को संसाधित करता है, और निश्चित रूप से आदर्श रूप से एक सामान्य सीमांकक जैसे कि "न्यूलाइन" वर्ण जो आप वर्तमान में उपयोग कर रहे हैं।
वास्तव में कुशल सामग्री के लिए, आप MongoDB का उपयोग जोड़ सकते हैं "बल्क एपीआई" मशीन की सारी मेमोरी या सीपीयू साइकिल को खाये बिना आपकी लोडिंग को यथासंभव तेज करने के लिए सम्मिलित करता है।
वकालत नहीं कर रहा है क्योंकि विभिन्न समाधान उपलब्ध हैं, लेकिन यहां एक सूची है जो लाइन का उपयोग करती है- इनपुट-स्ट्रीम पैकेज "लाइन टर्मिनेटर" भाग को सरल बनाने के लिए।
केवल "उदाहरण" द्वारा स्कीमा परिभाषाएं:
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
तो आम तौर पर "स्ट्रीम" इंटरफ़ेस "एक समय में एक पंक्ति" को संसाधित करने के लिए "इनपुट को तोड़ देता है"। यह आपको सब कुछ एक साथ लोड करने से रोकता है।
मुख्य भाग हैं "बल्क ऑपरेशंस API" मोंगोडीबी से। यह आपको वास्तव में सर्वर पर भेजने से पहले एक समय में कई कार्यों को "कतार में" करने की अनुमति देता है। तो इस मामले में "मॉड्यूलो" के उपयोग के साथ, केवल संसाधित प्रति 1000 प्रविष्टियों को ही भेजा जाता है। आप वास्तव में 16MB BSON सीमा तक कुछ भी कर सकते हैं, लेकिन इसे प्रबंधनीय रखें।
थोक में संसाधित किए जा रहे कार्यों के अलावा, async पुस्तकालय। यह वास्तव में आवश्यक नहीं है, लेकिन यह सुनिश्चित करता है कि किसी भी समय दस्तावेजों की "मॉड्यूलो सीमा" से अधिक अनिवार्य रूप से प्रक्रिया में नहीं है। सामान्य बैच "आवेषण" स्मृति के अलावा आईओ लागत पर नहीं आते हैं, लेकिन "निष्पादित" कॉल का मतलब है कि आईओ प्रसंस्करण कर रहा है। इसलिए हम और चीज़ों को कतार में लगाने के बजाय इंतज़ार करते हैं।
"स्ट्रीम प्रोसेसिंग" सीएसवी प्रकार डेटा के लिए निश्चित रूप से बेहतर समाधान मिल सकते हैं जो ऐसा प्रतीत होता है। लेकिन सामान्य तौर पर यह आपको यह अवधारणा देता है कि सीपीयू चक्रों को खाए बिना इसे मेमोरी कुशल तरीके से कैसे किया जाए।