Redis
 sql >> डेटाबेस >  >> NoSQL >> Redis

Futures.rs और Redis PubSub का उपयोग करके ब्लॉकिंग कॉल के लिए फ्यूचर्स की धारा को कैसे कार्यान्वित करें?

भारी चेतावनी मैंने पहले कभी इस पुस्तकालय का उपयोग नहीं किया है, और कुछ अवधारणाओं के बारे में मेरा निम्न-स्तरीय ज्ञान थोड़ा कम है। अधिकतर मैं ट्यूटोरियल के माध्यम से पढ़ रहा हूँ। मुझे पूरा यकीन है कि जिसने भी एसिंक का काम किया है, वह इसे पढ़ेगा और हंसेगा, लेकिन यह अन्य लोगों के लिए एक उपयोगी शुरुआती बिंदु हो सकता है। चेतावनी देने वाला!

आइए कुछ आसान से शुरू करते हैं, यह प्रदर्शित करते हुए कि कैसे एक Stream काम करता है। हम Result . के एक पुनरावर्तक को रूपांतरित कर सकते हैं s एक धारा में:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

यह हमें धारा का उपभोग करने का एक तरीका दिखाता है। हम and_then . का उपयोग करते हैं प्रत्येक पेलोड के लिए कुछ करने के लिए (यहाँ बस इसे प्रिंट कर रहा है) और फिर for_each Stream को कन्वर्ट करने के लिए वापस एक Future . में . फिर हम अजीब नाम वाले forget . को कॉल करके भविष्य को चला सकते हैं विधि।

अगला, केवल एक संदेश को संसाधित करते हुए, रेडिस लाइब्रेरी को मिश्रण में बाँधना है। चूंकि get_message() विधि अवरुद्ध हो रही है, हमें मिश्रण में कुछ धागे पेश करने की जरूरत है। इस प्रकार के एसिंक्रोनस सिस्टम में बड़ी मात्रा में काम करना एक अच्छा विचार नहीं है क्योंकि बाकी सब कुछ अवरुद्ध हो जाएगा। उदाहरण के लिए:

<ब्लॉकक्वॉट>

जब तक इसे अन्यथा व्यवस्थित न किया जाए, यह सुनिश्चित किया जाना चाहिए कि इस फ़ंक्शन का कार्यान्वयन बहुत जल्दी समाप्त हो जाए

एक आदर्श दुनिया में, रेडिस क्रेट को फ़्यूचर्स जैसी लाइब्रेरी के ऊपर बनाया जाएगा और यह सब मूल रूप से उजागर किया जाएगा।

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

मेरी समझ यहाँ अस्पष्ट हो जाती है। एक अलग थ्रेड में, हम संदेश के लिए ब्लॉक करते हैं और जब हम इसे प्राप्त करते हैं तो इसे चैनल में धकेल देते हैं। मुझे समझ में नहीं आता है कि हमें धागे के हैंडल को पकड़ने की आवश्यकता क्यों है। मुझे उम्मीद है कि foo.forget स्ट्रीम के खाली होने तक प्रतीक्षा करते हुए, स्वयं को अवरुद्ध कर रहा होगा।

Redis सर्वर के टेलनेट कनेक्शन में, इसे भेजें:

publish rust awesome

और आप देखेंगे कि यह काम करता है। प्रिंट स्टेटमेंट जोड़ने से पता चलता है कि (मेरे लिए) foo.forget थ्रेड के स्पॉन होने से पहले स्टेटमेंट चलाया जाता है।

एकाधिक संदेश ट्रिकियर है। Sender उत्पादन पक्ष को उपभोग पक्ष से बहुत आगे जाने से रोकने के लिए स्वयं का उपभोग करता है। यह send . से एक और भविष्य लौटाकर पूरा किया जाता है ! लूप के अगले पुनरावृत्ति के लिए इसे पुन:उपयोग करने के लिए हमें इसे वहां से वापस शटल करने की आवश्यकता है:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

मुझे यकीन है कि समय बीतने के साथ इस प्रकार के अंतःसंचालन के लिए और अधिक पारिस्थितिकी तंत्र होगा। उदाहरण के लिए, फ्यूचर्स-सीपीयूपूल क्रेट शायद . हो सकता है इसके समान उपयोगकेस का समर्थन करने के लिए विस्तारित किया जाए।




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. रेल 4, प्यूमा और साइडकीक के साथ रेडिस कनेक्शन कैसे कॉन्फ़िगर करें?

  2. रेडिस पर लार्वा कतार में सभी लंबित नौकरियां कैसे प्राप्त करें?

  3. क्या रेडिस के लिए एक एम्बेड करने योग्य जावा विकल्प है?

  4. जेडिस लाइब्रेरी का उपयोग करके रेडिस सेंटिनल से कनेक्शन कैसे स्थापित करें?

  5. लॉगस्टैश वेब UI प्रारंभ नहीं होता है