Oracle
 sql >> डेटाबेस >  >> RDS >> Oracle

जावा के साथ प्रतिबद्ध होने पर Oracle AQ तालिका पर कैसे कतारबद्ध करें और JMS क्लाइंट के साथ उपभोग करें

मैं इसे पूरा करने में सक्षम था - मुझे Oracle API के कई हिस्सों का अनुमान लगाना था, और विभिन्न ब्लॉगों से संकेत एकत्र करना था। यहां रुचि रखने वाले किसी भी व्यक्ति के लिए मैं इसे काम कर रहा हूं -1। मैंने Oracle Db2 पर Oracle ऑब्जेक्ट बनाया। इस Oracle ऑब्जेक्ट के साथ, मैंने पेलोड3 के रूप में ऑब्जेक्ट प्रकार की कतार तालिकाएँ बनाईं। अब मैं ऑब्जेक्ट डेटा 4 युक्त स्ट्रक्ट पेलोड के साथ AQMessage प्रकारों को संलग्न करने में सक्षम हूं। और मैं एक ऐसे JMS उपभोक्ता के साथ बातचीत करने में सक्षम हूं जो ADT पेलोड प्रकार को समझता है (http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types )

यहां कोड के साथ चरण दिए गए हैं - Oracle बनाएं 'OBJ_SINGLE_QUEUE_TABLE', "+" QUEUE_PAYLOAD_TABLE', "+" QUEUE_PAYLOAD_AD_ + "संगत => '10.0'); "+" अंत; "); doUpdateDatabase(conn, "BEGIN" + "DBMS_AQADM.CREATE_QUEUE("+" QUEUE_NAME => 'OBJ_SINGLE_QUEUE', "+" QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE'); "+"END; "); doUpdateDatabase(conn, "BEGIN" + "DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); "+"END;");}

अब आप ऑब्जेक्ट के स्ट्रक्चर इंस्टेंस के साथ जावा में AQMessage प्रकारों को एनक्यू कर सकते हैं

सार्वजनिक शून्य enqueueMessage(OracleConnection conn, String सहसंबंधId, बाइट[] पेलोडडेटा) अपवाद फेंकता है {// पहले संदेश गुण बनाएं:AQMessageProperties aqMessageProperties =AQFactory.createAQMessageProperties(); aqMessageProperties.setCorrelation(correlationId); aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME); // प्रेषक के रूप में एक एजेंट निर्दिष्ट करें:AQAgent aqAgent =AQFactory.createAQAgent (); aqAgent.setName(SENDER_NAME); aqAgent.setAddress(QUEUE_NAME); aqMessageProperties.setSender(aqAgent); // पेलोड बनाएं नक्शा <स्ट्रिंग, वस्तु> पेलोड मैप =नया हैश मैप <स्ट्रिंग, ऑब्जेक्ट> (); payloadMap.put ("आईडी", सहसंबंध आईडी); payloadMap.put ("पेलोड", नया OracleAQBLOBUtil ()। createBlob (कॉन, पेलोडडेटा)); स्ट्रक्चर स्ट्रक्चर =नया स्ट्रक्चर (स्ट्रक्चर डिस्क्रिप्टर, कॉन, पेलोडमैप); // वास्तविक AQMessage उदाहरण बनाएँ:AQMessage aqMessage =AQFactory.createAQMessage(aqMessageProperties); aqMessage.setPayload(struct); AQEnqueueOptions ऑप्ट =नया AQEnqueueOptions (); opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT); ऑप्ट.सेट विजिबिलिटी (AQEnqueueOptions.VisibilityOption.ON_COMMIT); // वास्तविक एनक्यू ऑपरेशन निष्पादित करें:conn.enqueue(QUEUE_NAME, opt, aqMessage);} 

ब्लॉब फ़ील्ड को विशेष हैंडलिंग की आवश्यकता थी

सार्वजनिक वर्ग OracleAQBLOBUtil {सार्वजनिक BLOB createBlob(OracleConnection conn, byte[] पेलोड) अपवाद फेंकता है {BLOB ब्लॉब =BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION); आउटपुटस्ट्रीम आउटपुटस्ट्रीम =blob.setBinaryStream(1L); इनपुटस्ट्रीम इनपुटस्ट्रीम =नया बाइटअरेइनपुटस्ट्रीम (पेलोड); कोशिश करें {बाइट [] बफर =नया बाइट [blob.getBufferSize ()]; इंट बाइट्सरीड =0; जबकि ((बाइट्सरीड =इनपुटस्ट्रीम.रीड (बफर))! =-1) {आउटपुटस्ट्रीम.राइट (बफर, 0, बाइट्स रीड); } वापसी बूँद; } अंत में { outputStream.close (); इनपुटस्ट्रीम.क्लोज़ (); } } सार्वजनिक बाइट [] सेवऑटपुटस्ट्रीम (बीएलओबी ब्लॉब) अपवाद फेंकता है {इनपुटस्ट्रीम इनपुटस्ट्रीम =blob.getBinaryStream (); इंट काउंटर; ByteArrayOutputStream byteArrayOutputStream =नया ByteArrayOutputStream (); जबकि ((काउंटर =इनपुटस्ट्रीम.रीड ())> -1) {बाइटअरेऑटपुटस्ट्रीम.राइट (काउंटर); } बाइटअरेऑटपुटस्ट्रीम.क्लोज़ (); वापसी byteArrayOutputStream.toByteArray (); }} 

उपभोक्ता के लिए, आपको ORADataFactory का एक उदाहरण प्रदान करना होगा जो उपभोक्ता को पेलोड प्रकार (आपकी कस्टम ऑब्जेक्ट) को समझने देता है।

AQjmsSession queueSession =(AQjmsSession) सेशन; क्यू क्यू =(क्यू) ctx.lookup(queueName);MessageConsumer रिसीवर =क्यूसेशन.क्रिएट रिसीवर (कतार, नया OracleAQObjORADataFactory()); 

जहां OracleAQObjORADataFactory के लिए कोड है

आयात करें आयात ओरेकल सिस्टम.AQ_EVENT_OBJ"; सार्वजनिक स्थैतिक अंतिम int _SQL_TYPECODE =OracleTypes.STRUCT; संरक्षित MutableStruct _struct; संरक्षित स्थिर int [] _sqlType ={java.sql.Types.VARCHAR, java.sql.Types.VARBINARY}; संरक्षित स्थिर ORADataFactory[] _factory =नया ORADataFactory[2]; संरक्षित स्थिर अंतिम OracleAQObjORADataFactory _AqEventObjFactory =नया OracleAQObjORADataFactory (); सार्वजनिक स्थैतिक ORADataFactory getORADataFactory () {वापसी _AqEventObjFactory; } /* कंस्ट्रक्टर्स */ रक्षित शून्य _init_struct (बूलियन इनिट) { अगर (init) _struct =नया MutableStruct (नया ऑब्जेक्ट[2], _sqlType, _factory); } सार्वजनिक OracleAQObjORADataFactory () { _init_struct(true); } सार्वजनिक OracleAQObjORADataFactory (स्ट्रिंग आईडी, बाइट [] पेलोड) SQLException फेंकता है { _init_struct(true); सेट आईडी (आईडी); सेटपेलोड (पेलोड); } /* ORAData इंटरफ़ेस */ सार्वजनिक डेटा से डेटा (कनेक्शन c) SQLException को फेंकता है {वापसी _struct.toDatum(c, EVENT_OBJECT); } /* ORADataFactory इंटरफ़ेस */ सार्वजनिक ORAData बनाएँ (डेटाम d, int sqlType) SQLException को फेंकता है {वापसी बनाएँ (नल, d, sqlType); } संरक्षित ORAData create(OracleAQObjORADataFactory o, Datum d, int sqlType) SQLException को फेंकता है {if (d ==null) रिटर्न अशक्त; अगर (ओ ==शून्य) ओ =नया OracleAQObjORADataFactory (); o._struct =नया MutableStruct ((STRUCT) d, _sqlType, _factory); वापसी ओ; } सार्वजनिक स्ट्रिंग getId () SQLException फेंकता है {वापसी (स्ट्रिंग) _struct.getAttribute (0); } public void setId(String id) SQLException को फेंकता है { _struct.setAttribute(0, id); } सार्वजनिक बाइट [] getPayload () SQLException फेंकता है {BLOB बूँद =(BLOB) _struct.getAttribute(1); इनपुटस्ट्रीम इनपुटस्ट्रीम =blob.getBinaryStream (); रिटर्न गेटबाइट्स (इनपुटस्ट्रीम); } सार्वजनिक बाइट [] getBytes (इनपुटस्ट्रीम बॉडी) {int c; कोशिश करें {बाइटअरेऑटपुटस्ट्रीम एफ =नया बाइटअरेऑटपुटस्ट्रीम (); जबकि ((c =body.read ())> -1) {f.write(c); } एफ.क्लोज़ (); बाइट [] परिणाम =f.toByteArray (); वापसी परिणाम; } पकड़ें (अपवाद ई) { System.err.println ("अपवाद:" + e.getMessage ()); ई.प्रिंटस्टैकट्रेस (); वापसी शून्य; } } सार्वजनिक शून्य सेटपेलोड (बाइट [] पेलोड) SQLException फेंकता है { _struct.setAttribute(1, पेलोड); }}

आप शायद अपनी परियोजना में ऊंट या वसंत का उपयोग कर रहे हैं, इस मामले में -1। यदि आप Camel 2.10.2 या उससे ऊपर के संस्करण पर हैं, तो आप एक कस्टम संदेश लिस्टर कंटेनर (CAMEL-5676)2 के साथ एक JMS उपभोक्ता बना सकते हैं। यदि आप पिछले संस्करण पर हैं तो आप एंडपॉइंट तरीके का उपयोग करने में सक्षम नहीं हो सकते हैं (मैं इसे समझ नहीं पाया), लेकिन आप जेएमएस अनुरोध श्रोता का उपयोग कर सकते हैं

  <बीन id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">  jdbc:oracle:thin:@blrub442:1522:UB23          <संपत्ति का नाम ="उपयोगकर्ता नाम"> <मान> प्रणाली   <संपत्ति का नाम ="पासवर्ड"> <मूल्य> ओरेकल             <संपत्ति का नाम ="गंतव्य" रेफरी ="aqEventQueue" /> <संपत्ति का नाम ="messageListener" ref ="aqMessageListener" /> /बीन्स> 

कस्टम संदेश श्रोता कंटेनर

पब्लिक क्लास AQMessageListenerContainer DefaultMessageListenerContainer को बढ़ाता है {@Override संरक्षित MessageConsumer createConsumer (सत्र सत्र, गंतव्य गंतव्य) JMSException फेंकता है {वापसी ((AQjmsSession) सत्र)। createConsumer (गंतव्य, getMessageSelector (), OracleAQObjORADataFactory (), OracleAQObjORADataFactory , isPubSubNoLocal ()); }} 

और अनुरोध श्रोता onMessage विधि

सार्वजनिक शून्य onMessage(Message msg) { try { AQjmsAdtMessage aQjmsAdtMessage =(AQjmsAdtMessage) msg; OracleAQObjORADataFactory obj =(OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload (); System.out.println ("डेटाटाइम:" + obj.getId ()); System.out.println ("पेलोड:" + नया स्ट्रिंग (obj.getPayload (), Charset.forName ("UTF-8"))); } पकड़ें (अपवाद jmsException) { अगर (logger.isErrorEnabled ()) { logger.error (jmsException.getLocalizedMessage ()); } }} 



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. क्या ओरेकल डीएमएल स्टेटमेंट या सिर्फ रो करते हुए पूरी टेबल को लॉक कर देगा

  2. जावा:Io अपवाद:नेटवर्क एडेप्टर कनेक्शन स्थापित नहीं कर सका

  3. डेटाबेस में एकाधिक तालिकाओं का लेखा परीक्षा इतिहास

  4. Oracle में एक तिथि से वर्ष प्राप्त करने के लिए 2 कार्य

  5. डेटा को एक टेबल से दूसरी टेबल में कॉपी करने के लिए संग्रहित प्रक्रिया