मुझे एक समाधान मिला। चूंकि मुझे संरचित स्ट्रीमिंग के लिए सही मोंगो ड्राइवर नहीं मिला, इसलिए मैंने दूसरे समाधान पर काम किया। अब, मैं mongoDb से सीधे कनेक्शन का उपयोग करता हूं, और foreachbatch(.) के बजाय "foreach(...)" का उपयोग करता हूं। ..) मेरा कोड testSpark.py फ़ाइल में इस तरह दिखता है:
....
import pymongo
from pymongo import MongoClient
local_url = "mongodb://localhost:27017"
def write_machine_df_mongo(target_df):
cluster = MongoClient(local_url)
db = cluster["test_db"]
collection = db.test1
post = {
"machine_id": target_df.machine_id,
"proc_type": target_df.proc_type,
"sensor1_id": target_df.sensor1_id,
"sensor2_id": target_df.sensor2_id,
"time": target_df.time,
"sensor1_val": target_df.sensor1_val,
"sensor2_val": target_df.sensor2_val,
}
collection.insert_one(post)
machine_df.writeStream\
.outputMode("append")\
.foreach(write_machine_df_mongo)\
.start()