मैं क्रोनजॉब का अनुकरण करने के लिए चेंज स्ट्रीम और टीटीएल का उपयोग करने में सक्षम था। मैंने एक पोस्ट प्रकाशित किया है जिसमें बताया गया है कि मैंने विवरण में क्या किया और क्रेडिट दिया:https://www। patreon.com/posts/17697287
लेकिन, मूल रूप से, किसी भी समय मुझे दस्तावेज़ के लिए "ईवेंट" शेड्यूल करने की आवश्यकता होती है, जब मैं दस्तावेज़ बना रहा हूं तो मैं समानांतर में एक ईवेंट दस्तावेज़ भी बना देता हूं। इस ईवेंट दस्तावेज़ की _id पहले दस्तावेज़ की समान आईडी होगी।
साथ ही, इस ईवेंट दस्तावेज़ के लिए मैं एक TTL सेट करूँगा।
जब टीटीएल की समय सीमा समाप्त हो जाएगी तो मैं चेंज स्ट्रीम के साथ इसके "डिलीट" बदलाव को कैप्चर कर लूंगा। और फिर मैं पहले संग्रह में लक्ष्य दस्तावेज़ खोजने के लिए परिवर्तन की दस्तावेज़ कुंजी का उपयोग करूंगा (क्योंकि यह वही आईडी है जिस दस्तावेज़ को मैं ट्रिगर करना चाहता हूं), और दस्तावेज़ के साथ मैं जो कुछ भी चाहता हूं वह करता हूं।
मैं MongoDB तक पहुँचने के लिए एक्सप्रेस और नेवला के साथ Node.js का उपयोग कर रहा हूँ। यहाँ App.js में जोड़ा जाने वाला प्रासंगिक हिस्सा है:
const { ReplSet } = require('mongodb-topology-manager');
run().catch(error => console.error(error));
async function run() {
console.log(new Date(), 'start');
const bind_ip = 'localhost';
// Starts a 3-node replica set on ports 31000, 31001, 31002, replica set
// name is "rs0".
const replSet = new ReplSet('mongod', [
{ options: { port: 31000, dbpath: `${__dirname}/data/db/31000`, bind_ip } },
{ options: { port: 31001, dbpath: `${__dirname}/data/db/31001`, bind_ip } },
{ options: { port: 31002, dbpath: `${__dirname}/data/db/31002`, bind_ip } }
], { replSet: 'rs0' });
// Initialize the replica set
await replSet.purge();
await replSet.start();
console.log(new Date(), 'Replica set started...');
// Connect to the replica set
const uri = 'mongodb://localhost:31000,localhost:31001,localhost:31002/' + 'test?replicaSet=rs0';
await mongoose.connect(uri);
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));
db.once('open', function () {
console.log("Connected correctly to server");
});
// To work around "MongoError: cannot open $changeStream for non-existent database: test" for this example
await mongoose.connection.createCollection('test');
// *** we will add our scheduler here *** //
var Item = require('./models/item');
var ItemExpiredEvent = require('./models/scheduledWithin');
let deleteOps = {
$match: {
operationType: "delete"
}
};
ItemExpiredEvent.watch([deleteOps]).
on('change', data => {
// *** treat the event here *** //
console.log(new Date(), data.documentKey);
Item.findById(data.documentKey, function(err, item) {
console.log(item);
});
});
// The TTL set in ItemExpiredEvent will trigger the change stream handler above
console.log(new Date(), 'Inserting item');
Item.create({foo:"foo", bar: "bar"}, function(err, cupom) {
ItemExpiredEvent.create({_id : item._id}, function(err, event) {
if (err) console.log("error: " + err);
console.log('event inserted');
});
});
}
और यहाँ मॉडल/अनुसूचित के लिए कोड है:
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
var ScheduledWithin = new Schema({
_id: mongoose.Schema.Types.ObjectId,
}, {timestamps: true});
// timestamps: true will automatically create a "createdAt" Date field
ScheduledWithin.index({createdAt: 1}, {expireAfterSeconds: 90});
module.exports = mongoose.model('ScheduledWithin', ScheduledWithin);