आप async लाइब्रेरी का उपयोग करके इस समस्या को हल कर सकते हैं। आप किसी भी स्ट्रीम के लिए नीचे दिए गए पैटर्न का उपयोग कर सकते हैं।
var AsyncLib = require('async');
var worker = function (payload, cb) {
//do something with payload and call callback
return cb();
};
var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);
var stream = //some readable stream;
stream.on('data', function(data) {
//no need to pause and resume
var payload = '//some payload';
streamQueue.push(payload);
})
.on('end', function() {
//register drain event on end and callback
streamQueue.drain = function () {
callback();
};
});