1+
2+ CREATE type Message_type as object (subject VARCHAR2 (30 ), text VARCHAR2 (80 ));
3+ /
4+ -- Creating an Object type queue
5+ BEGIN
6+ DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
7+ queue_name => ' objType_TEQ' ,
8+ storage_clause => null ,
9+ multiple_consumers => true,
10+ max_retries => 10 ,
11+ comment => ' ObjectType for TEQ' ,
12+ queue_payload_type => ' Message_type' ,
13+ queue_properties => null ,
14+ replication_mode => null );
15+ DBMS_AQADM .START_QUEUE (queue_name=> ' objType_TEQ' , enqueue => TRUE, dequeue=> True);
16+ END;
17+ /
18+
19+ -- Creating a RAW type queue:
20+ BEGIN
21+ DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
22+ queue_name => ' rawType_TEQ' ,
23+ storage_clause => null ,
24+ multiple_consumers => true,
25+ max_retries => 10 ,
26+ comment => ' RAW type for TEQ' ,
27+ queue_payload_type => ' RAW' ,
28+ queue_properties => null ,
29+ replication_mode => null );
30+ DBMS_AQADM .START_QUEUE (queue_name=> ' rawType_TEQ' , enqueue => TRUE, dequeue=> True);
31+ END;
32+ /
33+
34+ -- Creating JSON type queue:
35+ BEGIN
36+ DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
37+ queue_name => ' jsonType_TEQ' ,
38+ storage_clause => null ,
39+ multiple_consumers => true,
40+ max_retries => 10 ,
41+ comment => ' jsonType for TEQ' ,
42+ queue_payload_type => ' JSON' ,
43+ queue_properties => null ,
44+ replication_mode => null );
45+ DBMS_AQADM .START_QUEUE (queue_name=> ' jsonType_TEQ' , enqueue => TRUE, dequeue=> True);
46+ END;
47+ /
48+ BEGIN
49+ DBMS_AQADM .CREATE_TRANSACTIONAL_EVENT_QUEUE (
50+ queue_name => ' JAVA_TEQ_PUBSUB_QUEUE' ,
51+ storage_clause => null ,
52+ multiple_consumers=> true,
53+ max_retries => 10 ,
54+ comment => ' JAVA_TEQ_PUBSUB_QUEUE' ,
55+ queue_payload_type=> ' JMS' ,
56+ queue_properties => null ,
57+ replication_mode => null );
58+ DBMS_AQADM .START_QUEUE (queue_name=> ' JAVA_TEQ_PUBSUB_QUEUE' , enqueue => TRUE, dequeue=> True);
59+ END;
60+ /
61+ DECLARE
62+ subscriber sys .aq $_agent;
63+ BEGIN
64+ dbms_aqadm .add_subscriber (queue_name => ' objType_TEQ' , subscriber => sys .aq $_agent(' teqBasicObjSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicObjSubscriber' ' ' );
65+
66+ dbms_aqadm .add_subscriber (queue_name => ' rawType_TEQ' , subscriber => sys .aq $_agent(' teqBasicRawSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicRawSubscriber' ' ' );
67+
68+ dbms_aqadm .add_subscriber (queue_name => ' jsonType_TEQ' , subscriber => sys .aq $_agent(' teqBasicJsonSubscriber' , null ,0 ), rule => ' correlation = ' ' teqBasicJsonSubscriber' ' ' );
69+
70+ END;
71+ /
72+ CREATE OR REPLACE FUNCTION enqueueDequeueTEQ (subscriber varchar2 , queueName varchar2 , message Message_Typ) RETURN Message_Typ
73+ IS
74+ enqueue_options DBMS_AQ .enqueue_options_t ;
75+ message_properties DBMS_AQ .message_properties_t ;
76+ message_handle RAW(16 );
77+ dequeue_options DBMS_AQ .dequeue_options_t ;
78+ messageData Message_Typ;
79+
80+ BEGIN
81+ messageData := message;
82+ message_properties .correlation := subscriber;
83+ DBMS_AQ .ENQUEUE (
84+ queue_name => queueName,
85+ enqueue_options => enqueue_options,
86+ message_properties => message_properties,
87+ payload => messageData,
88+ msgid => message_handle);
89+ COMMIT ;
90+ DBMS_OUTPUT .PUT_LINE (' ----------ENQUEUE Message: ' || ' ORDERID: ' || messageData .ORDERID || ' , OTP: ' || messageData .OTP || ' , DELIVERY_STATUS: ' || messageData .DELIVERY_STATUS );
91+
92+ dequeue_options .dequeue_mode := DBMS_AQ .REMOVE ;
93+ dequeue_options .wait := DBMS_AQ .NO_WAIT ;
94+ dequeue_options .navigation := DBMS_AQ .FIRST_MESSAGE ;
95+ dequeue_options .consumer_name := subscriber;
96+ DBMS_AQ .DEQUEUE (
97+ queue_name => queueName,
98+ dequeue_options => dequeue_options,
99+ message_properties => message_properties,
100+ payload => messageData,
101+ msgid => message_handle);
102+ COMMIT ;
103+ DBMS_OUTPUT .PUT_LINE (' ----------DEQUEUE Message: ' || ' ORDERID: ' || messageData .ORDERID || ' , OTP: ' || messageData .OTP || ' , DELIVERY_STATUS: ' || messageData .DELIVERY_STATUS );
104+ RETURN messageData;
105+ END;
106+ /
107+ EXIT;
0 commit comments