मैं इसे पूरा करने में सक्षम था - मुझे Oracle API के कई हिस्सों का अनुमान लगाना था, और विभिन्न ब्लॉगों से संकेत एकत्र करना था। यहां रुचि रखने वाले किसी भी व्यक्ति के लिए मैं इसे काम कर रहा हूं -1। मैंने Oracle Db2 पर Oracle ऑब्जेक्ट बनाया। इस Oracle ऑब्जेक्ट के साथ, मैंने पेलोड3 के रूप में ऑब्जेक्ट प्रकार की कतार तालिकाएँ बनाईं। अब मैं ऑब्जेक्ट डेटा 4 युक्त स्ट्रक्ट पेलोड के साथ AQMessage प्रकारों को संलग्न करने में सक्षम हूं। और मैं एक ऐसे JMS उपभोक्ता के साथ बातचीत करने में सक्षम हूं जो ADT पेलोड प्रकार को समझता है (http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types )
यहां कोड के साथ चरण दिए गए हैं - Oracle बनाएं 'OBJ_SINGLE_QUEUE_TABLE', "+" QUEUE_PAYLOAD_TABLE', "+" QUEUE_PAYLOAD_AD_ + "संगत => '10.0'); "+" अंत; "); doUpdateDatabase(conn, "BEGIN" + "DBMS_AQADM.CREATE_QUEUE("+" QUEUE_NAME => 'OBJ_SINGLE_QUEUE', "+" QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE'); "+"END; "); doUpdateDatabase(conn, "BEGIN" + "DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); "+"END;");}
अब आप ऑब्जेक्ट के स्ट्रक्चर इंस्टेंस के साथ जावा में AQMessage प्रकारों को एनक्यू कर सकते हैं
ब्लॉब फ़ील्ड को विशेष हैंडलिंग की आवश्यकता थी
उपभोक्ता के लिए, आपको ORADataFactory का एक उदाहरण प्रदान करना होगा जो उपभोक्ता को पेलोड प्रकार (आपकी कस्टम ऑब्जेक्ट) को समझने देता है।
जहां OracleAQObjORADataFactory के लिए कोड है
आप शायद अपनी परियोजना में ऊंट या वसंत का उपयोग कर रहे हैं, इस मामले में -1। यदि आप Camel 2.10.2 या उससे ऊपर के संस्करण पर हैं, तो आप एक कस्टम संदेश लिस्टर कंटेनर (CAMEL-5676)2 के साथ एक JMS उपभोक्ता बना सकते हैं। यदि आप पिछले संस्करण पर हैं तो आप एंडपॉइंट तरीके का उपयोग करने में सक्षम नहीं हो सकते हैं (मैं इसे समझ नहीं पाया), लेकिन आप जेएमएस अनुरोध श्रोता का उपयोग कर सकते हैं
कस्टम संदेश श्रोता कंटेनर
और अनुरोध श्रोता onMessage विधिसार्वजनिक शून्य enqueueMessage(OracleConnection conn, String सहसंबंधId, बाइट[] पेलोडडेटा) अपवाद फेंकता है {// पहले संदेश गुण बनाएं:AQMessageProperties aqMessageProperties =AQFactory.createAQMessageProperties(); aqMessageProperties.setCorrelation(correlationId); aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME); // प्रेषक के रूप में एक एजेंट निर्दिष्ट करें:AQAgent aqAgent =AQFactory.createAQAgent (); aqAgent.setName(SENDER_NAME); aqAgent.setAddress(QUEUE_NAME); aqMessageProperties.setSender(aqAgent); // पेलोड बनाएं नक्शा <स्ट्रिंग, वस्तु> पेलोड मैप =नया हैश मैप <स्ट्रिंग, ऑब्जेक्ट> (); payloadMap.put ("आईडी", सहसंबंध आईडी); payloadMap.put ("पेलोड", नया OracleAQBLOBUtil ()। createBlob (कॉन, पेलोडडेटा)); स्ट्रक्चर स्ट्रक्चर =नया स्ट्रक्चर (स्ट्रक्चर डिस्क्रिप्टर, कॉन, पेलोडमैप); // वास्तविक AQMessage उदाहरण बनाएँ:AQMessage aqMessage =AQFactory.createAQMessage(aqMessageProperties); aqMessage.setPayload(struct); AQEnqueueOptions ऑप्ट =नया AQEnqueueOptions (); opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT); ऑप्ट.सेट विजिबिलिटी (AQEnqueueOptions.VisibilityOption.ON_COMMIT); // वास्तविक एनक्यू ऑपरेशन निष्पादित करें:conn.enqueue(QUEUE_NAME, opt, aqMessage);}
सार्वजनिक वर्ग OracleAQBLOBUtil {सार्वजनिक BLOB createBlob(OracleConnection conn, byte[] पेलोड) अपवाद फेंकता है {BLOB ब्लॉब =BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION); आउटपुटस्ट्रीम आउटपुटस्ट्रीम =blob.setBinaryStream(1L); इनपुटस्ट्रीम इनपुटस्ट्रीम =नया बाइटअरेइनपुटस्ट्रीम (पेलोड); कोशिश करें {बाइट [] बफर =नया बाइट [blob.getBufferSize ()]; इंट बाइट्सरीड =0; जबकि ((बाइट्सरीड =इनपुटस्ट्रीम.रीड (बफर))! =-1) {आउटपुटस्ट्रीम.राइट (बफर, 0, बाइट्स रीड); } वापसी बूँद; } अंत में { outputStream.close (); इनपुटस्ट्रीम.क्लोज़ (); } } सार्वजनिक बाइट [] सेवऑटपुटस्ट्रीम (बीएलओबी ब्लॉब) अपवाद फेंकता है {इनपुटस्ट्रीम इनपुटस्ट्रीम =blob.getBinaryStream (); इंट काउंटर; ByteArrayOutputStream byteArrayOutputStream =नया ByteArrayOutputStream (); जबकि ((काउंटर =इनपुटस्ट्रीम.रीड ())> -1) {बाइटअरेऑटपुटस्ट्रीम.राइट (काउंटर); } बाइटअरेऑटपुटस्ट्रीम.क्लोज़ (); वापसी byteArrayOutputStream.toByteArray (); }}
AQjmsSession queueSession =(AQjmsSession) सेशन; क्यू क्यू =(क्यू) ctx.lookup(queueName);MessageConsumer रिसीवर =क्यूसेशन.क्रिएट रिसीवर (कतार, नया OracleAQObjORADataFactory());
पब्लिक क्लास AQMessageListenerContainer DefaultMessageListenerContainer को बढ़ाता है {@Override संरक्षित MessageConsumer createConsumer (सत्र सत्र, गंतव्य गंतव्य) JMSException फेंकता है {वापसी ((AQjmsSession) सत्र)। createConsumer (गंतव्य, getMessageSelector (), OracleAQObjORADataFactory (), OracleAQObjORADataFactory , isPubSubNoLocal ()); }}
सार्वजनिक शून्य onMessage(Message msg) { try { AQjmsAdtMessage aQjmsAdtMessage =(AQjmsAdtMessage) msg; OracleAQObjORADataFactory obj =(OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload (); System.out.println ("डेटाटाइम:" + obj.getId ()); System.out.println ("पेलोड:" + नया स्ट्रिंग (obj.getPayload (), Charset.forName ("UTF-8"))); } पकड़ें (अपवाद jmsException) { अगर (logger.isErrorEnabled ()) { logger.error (jmsException.getLocalizedMessage ()); } }}