OracleAQ消息队列的使用详解

更新时间:2023-10-09 17:43:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

Oracle AQ 消息队列(-)

随着不同应用模块间的消息交互和通信成为一个关键的功能,并且变得越来越重要。Oracle引入了一种强大的队列机制,通过它程序间可以实现信息的交互,oracle把它称作为AQ - Advanced Queuing. 使用Oracle AQ,我们不需要安装额外的中间件,它是Oracle数据库的一个功能组件,只要你安装了Oracle 数据库就可以使用AQ了。接下来分两部分来介绍AQ的使用,使用之前我们要创建QUEUE.

我们创建一个自己的AQ的管理角色 \和管理用户\,再把Oracle AQ 管理角色 \授权给\

CREATE ROLE my_aq_adm_role;

GRANT aq_adminsistator_role TO my_aq_adm_role

创建一个用户的角色 \和 普通用户\再把Oracle AQ的用户角色\和一些基本操作需要的系统权限授权给 \

CREATE ROLE my_aq_user_role;

GRANT CREATE session, aq_user_role TO my_aq_user_role;

EXEC DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(

privilege => 'ENQUEUE_ANY',

grantee => 'my_aq_user_role',

admin_option => FALSE);

EXEC DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(

privilege => 'DEQUEUE_ANY',

grantee => 'my_aq_user_role',

admin_option = 'FALSE');

现在我们创建 AQ管理用户

CREATE USER aqadm IDENTIFIED BY aqadm

DEFAULT TABLESPACE elathen

TEMPORARY TABLESPACE temp;

GRANT my_aq_adm_role TO aqadm;

GRANT connect, resource TO aqadm; ----注意,resource角色一定要授给user,如果resource角色授权给

my_aq_adm_role, user将会失去 unlimited tablespace 权限

接着为我们下面example创建普通用户

CREATE USER aquser IDENTIFIED BY aquser

DEFAULET TABLESPACE elathen

TEMPORARY TABLESPACE temp;

GRANT my_aq_user_role TO aquser;

我们将在我们第一个queue中使用object type而不是NUMBER or VARCHAR2作为payload,这使我们更贴近实际应用。(payload是任何消息都使用的一种数据类型和结构).

上面做的工作都需要在DBA的权限下做,现在我们切换到AQ管理员

CONNECT aqadm/aqadm

CREATE TYPE queue_message_type AS OBJECT(

no NUBER,

title VARCHAR2(30),

text VARCHAR2(2000)); /

GRANT EXECUTE ON queue_message_type TO my_aq_user_role;

我们再创建一个叫\的queue以及相应的queue table \然后启动queue,这样我们就可以使用了。

1.PL/SQL中使用AQ和java使用oracle本地AQ

点对点模型(The Point-to-point Model)

在简单的系统中,我们可以认为两个系统一起使用一个或多个Queue。这种方法我们称作点对点模型。把消息输入到queue中的过程称为入列(Enqueue)相反的过程称为出列(Dequeue)。一条消息一次只能被一个使用这个queue的应用系统Dequeue,当其他应用系统可以浏览这个queue。这种模式就是点对点模式(the point-point Model)

PL/SQL中使用AQ

使用aquser连接到数据库

CONNECT aquser/aquser

现在我们Enqueue一条消息.

DECLARE

queue_options

DBMS_AQ.enqueue_options_t;

message_properties DBMS_AQ.message_properties_t;

message_id RAW(16);

my_message

aqadm.queue_message_type;

BEGIN

my_message := aqadm.queue_message_type(1,

'This is a sample message',

'This message has been posted on' || to_char(SYSDATE,'DD.MM.YYYY HH24:MI:SS'));

DBMS_AQ.enqueue(queue_name => 'aqadm.message_queue',

enqueue_options => queue_options,

message_properties => message_properties,

payload => my_message,

msgid => message_id);

COMMIT;

END; /

我们现在Dequeue刚才入列的消息,先执行

SET SERVEROUTPUT ON 然后

DECLARE

queue_options DBMS_AQ.DEQUEUE_OPTIONS_T;

message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

message_id RAW(2000);

my_message aqadm.queue_message_type;

BEGIN

DBMS_AQ.DEQUEUE(

queue_name => 'aqadm.message_queue',

dequeue_options => queue_options,

message_properties => message_properties,

payload => my_message,

msgid => message_id );

COMMIT;

DBMS_OUTPUT.PUT_LINE(

'Dequeued no: ' || my_message.no);

DBMS_OUTPUT.PUT_LINE(

'Dequeued title: ' || my_message.title);

DBMS_OUTPUT.PUT_LINE(

'Dequeued text: ' || my_message.text);

END; /

上面的PL/SQL的例子比较简单和直接,任何应用和编程环境都可以这样做。然而实际项目中,可能用编程语言来处理消息会更便利和更有实用价值。接下来我们将讨论在java中使用AQ

Java 中使用Oracle Native AQ

在前面的例子里我们为队列消息创建了一个Oracle Object type \在java语言中我们不能使用Oracle的数据类型,因此我们要创建一个和\对应的java类。我们可以使用Oracle Jpublisher,通过它,我们可以创建一个和Oracle Object type对应的java类。(这里就不具体讨论JPublisher用法)

这里我们用JPubisher创建一个和Oracle Object type \对应java class \

在使用Oracle Native AQ对java的interface之前,我们必须通过jdbc连接到数据库,代码如下

//loads the Oracle JDBC driver

Class.forName(\

;

NativeAQ aq = new NativeAQ();

//DB connection, HOST --- 数据库所在的机器domian id SID----数据的service name

aq.connection

=

DriverManager.getConnection(\

racle:thin:@HOST:1521:SID\

//

aq.connection.setAutoCommit(false);

然后我们通过传入AQ connection来获取AQ session对象

//loads the Oracle AQ driver

Class.forName(\;

aq.session = AQDriverManager.createAQSession(aq.connection);

上述工作做好后,我们可以获取我们需要的queue对象了,进行出列的操作。

AQQueue queue = aq.session.getQueue(\

AQDequeueOption dequeueOption = new AQDequeueOption();

System.out.println(\

;

AQMessage message =

((AQOracleQueue)queue).dequeue(dequeueOption,QUEUE_MESSAGE_TYPE.getFactory());

把raw payload 转换成我们消息类型

AQObjectPayload payload = message.getObjectPayload();

QUEUE_MESSAGE_TYPE messageData = (QUEUE_MESSAGE_TYPE) payload.getPayloadData();

aq.connection.commit();

System.out.println(\

System.out.println(\

System.out.pritnln(\

本文来源:https://www.bwwdw.com/article/oczf.html

Top