MongoDB
 sql >> डेटाबेस >  >> NoSQL >> MongoDB

नेवला स्कीमा का उपयोग करके CSV आयात करें

आप headers . प्राप्त करके इसे फास्ट-सीएसवी के साथ कर सकते हैं स्कीमा परिभाषा से जो पार्स की गई रेखाओं को "ऑब्जेक्ट्स" के रूप में वापस कर देगा। आपके पास वास्तव में कुछ बेमेल हैं, इसलिए मैंने उन्हें सुधार के साथ चिह्नित किया है:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

जब तक स्कीमा वास्तव में प्रदान किए गए सीएसवी तक जाती है तो यह ठीक है। ये वे सुधार हैं जिन्हें मैं देख सकता हूं लेकिन यदि आपको वास्तविक फ़ील्ड नामों को अलग-अलग संरेखित करने की आवश्यकता है तो आपको समायोजित करने की आवश्यकता है। लेकिन मूल रूप से एक Number था उस स्थिति में जहां String . है और अनिवार्य रूप से एक अतिरिक्त क्षेत्र, जिसे मैं मान रहा हूं वह CSV में रिक्त है।

सामान्य चीजें स्कीमा से फ़ील्ड नामों की सरणी प्राप्त कर रही हैं और सीएसवी पार्सर इंस्टेंस बनाते समय विकल्पों में पास कर रही हैं:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

एक बार जब आप वास्तव में ऐसा कर लेते हैं तो आपको एक सरणी के बजाय एक "ऑब्जेक्ट" वापस मिल जाता है:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

"प्रकार" के बारे में चिंता न करें क्योंकि नेवला स्कीमा के अनुसार मान डालेगा।

बाकी data के लिए हैंडलर के अंदर होता है प्रतिस्पर्धा। अधिकतम दक्षता के लिए हम insertMany() . का उपयोग कर रहे हैं प्रत्येक 10,000 पंक्तियों में केवल एक बार डेटाबेस को लिखने के लिए। यह वास्तव में सर्वर पर कैसे जाता है और प्रक्रियाएं MongoDB संस्करण पर निर्भर करती हैं, लेकिन मेमोरी उपयोग और लेखन के लिए "ट्रेड-ऑफ" के संदर्भ में आपके द्वारा एकल संग्रह के लिए आयात किए जाने वाले फ़ील्ड की औसत संख्या के आधार पर 10,000 काफी उचित होना चाहिए। उचित नेटवर्क अनुरोध। यदि आवश्यक हो तो संख्या को छोटा करें।

इन कॉलों को async . के रूप में चिह्नित करने के लिए महत्वपूर्ण भाग हैं फ़ंक्शन और await insertMany() . का परिणाम जारी रखने से पहले। साथ ही हमें pause() . की आवश्यकता है स्ट्रीम और resume() प्रत्येक आइटम पर अन्यथा हम buffer . को अधिलेखित करने का जोखिम उठाते हैं वास्तव में भेजे जाने से पहले डालने के लिए दस्तावेजों की संख्या। pause() और resume() पाइप पर "बैक-प्रेशर" डालने के लिए आवश्यक हैं, अन्यथा आइटम केवल "बाहर" आते रहते हैं और data को सक्रिय करते रहते हैं। घटना।

स्वाभाविक रूप से 10,000 प्रविष्टियों के लिए नियंत्रण की आवश्यकता है कि हम बफर को खाली करने और सर्वर पर किसी भी शेष दस्तावेज़ को भेजने के लिए प्रत्येक पुनरावृत्ति और स्ट्रीम पूर्ण होने पर जांच करें।

वास्तव में आप यही करना चाहते हैं, क्योंकि आप निश्चित रूप से data के माध्यम से "प्रत्येक" पुनरावृत्ति पर सर्वर के लिए एक एसिंक अनुरोध को बंद नहीं करना चाहते हैं। घटना या अनिवार्य रूप से प्रत्येक अनुरोध के पूरा होने की प्रतीक्षा किए बिना। आप "बहुत छोटी फ़ाइलों" की जाँच न करने से दूर हो जाएंगे, लेकिन किसी भी वास्तविक दुनिया के लोड के लिए आप निश्चित रूप से "इन फ़्लाइट" async कॉल के कारण कॉल स्टैक को पार कर लेंगे जो अभी तक पूरा नहीं हुआ है।

FYI करें - एक package.json उपयोग किया गया। mz वैकल्पिक है क्योंकि यह सिर्फ एक आधुनिकीकृत Promise है मानक नोड "अंतर्निहित" पुस्तकालयों की सक्षम लाइब्रेरी जिसका मैं उपयोग करने के लिए उपयोग किया जाता हूं। कोड निश्चित रूप से fs . के साथ पूरी तरह से विनिमेय है मॉड्यूल।

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

वास्तव में नोड v8.9.x और इसके बाद के संस्करण के साथ हम AsyncIterator के कार्यान्वयन के साथ इसे और भी सरल बना सकते हैं stream-to-iterator . के माध्यम से मापांक। यह अभी भी Iterator<Promise<T>> . में है मोड, लेकिन इसे तब तक करना चाहिए जब तक कि Node v10.x स्थिर LTS न बन जाए:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

मूल रूप से, सभी स्ट्रीम "ईवेंट" को संभालना और रोकना और फिर से शुरू करना एक सरल for द्वारा प्रतिस्थापित हो जाता है लूप:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

आसान! यह बाद के नोड कार्यान्वयन में for..await..of . के साथ साफ हो जाता है जब यह अधिक स्थिर हो जाता है। लेकिन उपरोक्त निर्दिष्ट संस्करण और ऊपर से ठीक चलता है।



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Mongoose.js एक कनेक्ट () कॉल से MongoDB से कई कनेक्शन बनाता है

  2. MongoDB कनेक्शन के लिए .NET सर्वोत्तम अभ्यास?

  3. मोंगोडब स्थानीय सर्वर शुरू करने में असमर्थ

  4. संग्रह से डेटा प्राप्त करें b संग्रह में नहीं एक MongoDB शेल क्वेरी में

  5. आप आधिकारिक सी # ड्राइवर का उपयोग कर MongoDB में Update.Set का उपयोग करके एकाधिक फ़ील्ड कैसे अपडेट करते हैं?