मैं मोंगोडब का विशेषज्ञ नहीं हूं, लेकिन मैंने जो उदाहरण देखे हैं, उनके आधार पर यह एक ऐसा पैटर्न है जिसे मैं आजमाऊंगा।
मैंने डेटा के अलावा अन्य घटनाओं को छोड़ दिया है, क्योंकि थ्रॉटलिंग एक मुख्य चिंता है।
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
मैं बिना मोंगोडब के इस आरएक्स प्रवाह का एक साथ परीक्षण करने की कोशिश कर रहा हूं, इस बीच यह आपको कुछ विचार दे सकता है।