MongoDB
 sql >> डेटाबेस >  >> NoSQL >> MongoDB

PySpark संरचित स्ट्रीमिंग का उपयोग करके काफ्का स्ट्रीम को MongoDB में सिंक करें

मुझे एक समाधान मिला। चूंकि मुझे संरचित स्ट्रीमिंग के लिए सही मोंगो ड्राइवर नहीं मिला, इसलिए मैंने दूसरे समाधान पर काम किया। अब, मैं mongoDb से सीधे कनेक्शन का उपयोग करता हूं, और foreachbatch(.) के बजाय "foreach(...)" का उपयोग करता हूं। ..) मेरा कोड testSpark.py फ़ाइल में इस तरह दिखता है:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. नेवले से मोंगोडब संस्करण कैसे प्राप्त करें

  2. (नोड:3341) पदावनत चेतावनी:नेवला:mpromise

  3. डुप्लिकेट सरणियों को फ़िल्टर करें और मोंगोडब एकत्रीकरण में अद्वितीय सरणी लौटाएं

  4. mongodb num_rows समकक्ष php

  5. मोंगोडब-गो-ड्राइवर में, एक संरचना में मार्शल/अनमर्शल बीएसओएन कैसे करें