त्रुटि के अनुसार, आपके पास पहले से ही एक स्ट्रिंग है, (आप पहले ही df.selectExpr("CAST(value AS STRING)")
कर चुके हैं ), इसलिए आपको Row ईवेंट को String
. के रूप में प्राप्त करने का प्रयास करना चाहिए , न कि Array[Byte]
. बदलकर प्रारंभ करें
val valueStr = new String(record.getAs[Array[Byte]]("value"))
करने के लिए
val valueStr = record.getAs[String]("value")
मैं समझता हूं कि स्पार्क कोड चलाने के लिए आपके पास पहले से ही एक क्लस्टर हो सकता है, लेकिन मेरा सुझाव है कि अभी भी देखें। काफ्का कनेक्ट मोंगो सिंक कनेक्टर ताकि आपको अपने स्वयं के Mongo लेखक को Spark कोड में लिखने और बनाए रखने की आवश्यकता न पड़े।
या, आप स्पार्क डेटासेट को सीधे mongo में भी लिख सकते हैं