ActiveMQ使用手册

更新时间:2024-06-12 15:21:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

ActimeMQ使用手册

目 录

第一章前言 ....................................................................................................................................................... 0 1.1编写目的 ........................................................................................................................................................ 0 1.2术语 ................................................................................................................................................................ 0 1.3参考文档 ........................................................................................................................................................ 0 第二章 ActiveMQ介绍 ..................................................................................................................................... 1 2.1 ActiveMQ的原理 ........................................................................................................................................... 1 2.1.1 JMS的基本构件 .................................................................................................................................... 1 2.1.1.1连接工厂 ....................................................................................................................................... 1 2.1.1.2会话 ............................................................................................................................................... 1 2.1.1.3目的地 ........................................................................................................................................... 1 2.1.1.4生产者 ........................................................................................................................................... 2 2.1.1.5消息消费者.................................................................................................................................... 2 2.1.1.6消息 ............................................................................................................................................... 2 2.1.2 JMS的可靠性机制................................................................................................................................. 3 2.1.2.1消息确认 ....................................................................................................................................... 3 2.1.2.2持久性 ........................................................................................................................................... 4 2.1.2.3优先级 ........................................................................................................................................... 4 2.1.2.4消息过期 ....................................................................................................................................... 4 2.1.2.5临时目的地.................................................................................................................................... 4 2.1.2.6持久订阅 ....................................................................................................................................... 4 2.1.2.7本地事务 ....................................................................................................................................... 5 2.2 ActiveMQ基本配置 ....................................................................................................................................... 5 2.2.1配置ActiveMQ服务IP和端口 ............................................................................................................. 5 2.2.2监控ActiveMQ ....................................................................................................................................... 6 2.3 ActiveMQ的Broker........................................................................................................................................ 7 2.3.1 Running Broker: ...................................................................................................................................... 7 2.3.2 Embedded Broker: .................................................................................................................................. 8 2.4 ActiveMQ Broker的Transport ....................................................................................................................... 9 2.4.1 TCP Transport ....................................................................................................................................... 10 2.4.2 Failover Transport ................................................................................................................................. 10 2.4.3 Discover Transport ................................................................................................................................ 12 2.5 ActiveMQ Broker的持久方式 ...................................................................................................................... 14 2.5.1 AMQ Message Store .......................................................................................................................... 14 2.5.2 Kaha Message Store ............................................................................................................................. 16 2.5.3 JDBC Message Store ............................................................................................................................. 16 2.6 ActimveMQ的其他特性 .............................................................................................................................. 17 2.6.1异步发送消息 ...................................................................................................................................... 17 2.6.2消费者异步分派 .................................................................................................................................. 18

第i页

2.6.3消费者优先级 ...................................................................................................................................... 18 2.6.4独占消费者 .......................................................................................................................................... 18 2.6.5消息重发策略 ...................................................................................................................................... 18 2.6.6目标相关的属性 .................................................................................................................................. 19 2.6.7消息预取处理 ...................................................................................................................................... 21 2.6.8配置连接URL ...................................................................................................................................... 22 2.6.9消息重发与死信管理 .......................................................................................................................... 24 第三章 ActiveMQ集群方式 ........................................................................................................................... 28 3.1 AMQ集群特点 ............................................................................................................................................. 28 3.2 AMQ集群方式-- Queue consumer clusters ................................................................................................. 28 3.3 AMQ集群方式-- Broker clusters .................................................................................................................. 29 3.3.1管道订阅conduitSubscriptions ........................................................................................................... 31 3.3.2双向网络连接(duplex networkConnector) .......................................................................................... 31 3.3.3指定和限制Destination ...................................................................................................................... 32 3.3.4被卡住的消息 ...................................................................................................................................... 33 3.3.5其他说明 .............................................................................................................................................. 34 3.4 AMQ集群方式-- Master Slave ..................................................................................................................... 34 3.4.1 Pure Master Slave ................................................................................................................................. 35 3.4.2 Shared File System Master Slave .......................................................................................................... 35 3.4.3 JDBC Master Slave ................................................................................................................................ 35 第四章 ActiveMQ+Mongodb集群实践 .......................................................................................................... 37 4.1 AMQ+Mongodb的群集 ............................................................................................................................... 37 第五章 ActiveMQ的JMX监控 ....................................................................................................................... 38 5.1 JMX的配置 .................................................................................................................................................. 43 5.2 JMX的JAVA端获取信息 ............................................................................................................................. 43 第六章 ActiveMQ的使用注意........................................................................................................................ 45 6.1发现数据中存在queue://ActiveMQ.DLQ ................................................................................................... 45 6.2对应的客户端URL配置 .............................................................................................................................. 45

第ii页

第一章 前言

1.1 编写目的

某平台的消息队列采用ActiveMQ来实现,搭建分布式消息平台,提供可靠的传递消息及数据的消息服务,实现平台间的消息传递。本文档第二部分介绍了ActiveMQ的原理和重要配置介绍,第三部门介绍群集搭建方式,分析各种集群方式的优缺点,第四部分实战搭建Master-Slave+Broker-Custer的群集,第五部分是对ActiveMQ的监控,可以通过JMX接口直接获取当前的消息情况,第六部分是对ActiveMQ使用要注意的情况。

对于清楚JMS和ActiveMQ熟悉的,需要对ActiveMQ做集群处理,可以跳过第二、三章直接看第四章实践。

1.2 术语 1.3 参考文档

第0页

第二章 ActiveMQ介绍

2.1 ActiveMQ的原理

ActiveMQ是基于JMS的息中间件。下面先介绍一下JMS的基本构件.

2.1.1 JMS的基本构件

2.1.1.1 连接工厂

连接工厂是客户用来创建连接的对象,ActiveMQ提供的连接工厂类为:ActiveMQConnectionFactory。

2.1.1.2 会话

JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收

(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。

2.1.1.3 目的地

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2 规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。 点对点消息传递域的特点如下:

每个消息只能有一个消费者。

? 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

?

发布/订阅消息传递域的特点如下:

第1页

每个消息可以有多个消费者。

? 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

?

2.1.1.4 生产者

消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。

2.1.1.5 消息消费者

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:

同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。 ? 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

?

2.1.1.6 消息

JMS消息由以下三部分组成:

消息头。每个消息头字段都有相应的getter和setter方法。

? 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。

?

除了消息头中定义好的标准属性外,JMS 提供一种机制增加新属性到消息头中,这种新属性包含以下几种: 1. 应用需要用到的属性;

2. 消息头中原有的一些可选属性; 3. JMS Provider 需要用到的属性。 标准的JMS 消息头包含以下属性: 消息头 JMSDestination 消息发送的目的地 传送模式,有两种模式: PERSISTENT 和NON_PERSISTENT, PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。 NON_PERSISTENT 表示偶然丢失该消息是被允许的, 这两种模式使开发者可以在消息传送的可靠性和吞吐量之间找到平衡点。 描述 JMSDeliveryMode 第2页

JMSExpiration 消息过期时间,等于Destination 的send 方法中的 timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive 值等于零,则JMSExpiration 被设为零,表示该消息永不过期。 如果发送后,在消息过期时间之后消息还没有被发送到目 的地,则该消息被清除。 消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 严格按照这十个 优先级发送消息,但必须保证加急消息要先于普通消息到达。 唯一识别每个消息的标识,由JMS Provider 产生。 一个消息被提交给JMS Provider 到消息被发出的时间。 用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。 提供本消息回复消息的目的地址 消息类型的识别符。 如果一个客户端收到一个设置了JMSRedelivered 属性的消息, 则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。 JMSPriority JMSMessageID JMSTimestamp JMSCorrelationID JMSReplyTo JMSType JMSRedelivered

?

消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

2.1.2 JMS的可靠性机制

当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。

2.1.2.1 消息确认

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值: Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

? Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认 是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息

?

第3页

消费者消费了10个消息,然后确认第5个消息,那么所有 10个消息都被确认。

? Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。

2.1.2.2 持久性

JMS 支持以下两种消息提交模式:

PERSISTENT。指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。

? NON_PERSISTENT。不要求JMS provider持久保存消息。

?

2.1.2.3 优先级

可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS provider并不一定保证按照优先级的顺序提交消息。

2.1.2.4 消息过期

可以设置消息在一定时间后过期,默认是永不过期。

2.1.2.5 临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

2.1.2.6 持久订阅

首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic。第二个参数是订阅的名称。

JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的

第4页

createDurableSubscriber方法,那么该持久订阅就会被激活。JMS provider会象客户发送客户处于非激活状态时所发布的消息。

持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

2.1.2.7 本地事务

在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。

事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

2.2 ActiveMQ基本配置

ActiveMQ配置文件:$AcrtiveMQ/conf/activemq.xml

2.2.1 配置ActiveMQ服务IP和端口

第5页

在 transportConnectors标识中配置ActiveMQ服务IP和端口,其中name属性指定协议的名称,uri属性指定协议所对应的协议 名,IP地址和端口号。上述IP地址和端口可以根据实际需要指定。Java客户端默认使用openwire协议,所以ActiveMQ服务地址为 tcp://localhost:61616

2.2.2 监控ActiveMQ

本节将使用JXM和JXM控制台(JDK1.5控制台)监控ActiveMQ。

?

?

配置JXM步骤如下:

1. 设置broker标识的useJmx属性为true;

2. 取消对managementContext标识的注释(系统默认注释managementContext标识),监控的默认端口为1099。

在Windows平台监控

进入%JAVA_HOME%/bin,双击jconsole.exe,在对话框中输入ActiveMQ服务主机的地址,JXM的端口和主机登陆帐号。

第6页

2.3 ActiveMQ的Broker

ActiveMQ中,Broker是消息的代理者,消费者和生产者实际是通过Broker来进行消息的传递,生产者和消费者之间是无法直接通信的。

Broker的实现可以分为如下几种方式:

2.3.1 Running Broker:

ActiveMQ5.0 的二进制发布包中bin目录中包含一个名为activemq的脚本,直接运行这个脚本就可以启动一个broker。

此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子: Example Description Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file. Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath. Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml activemq activemq xbean:myconfig.xml activemq xbean:file:./conf/broker1.xml 第7页

activemq xbean:file:C:/ActiveMQ/conf/broker2.xml Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml Runs a broker with two transport connectors and JMX enabled. activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true Runs a broker with 1 activemq broker:(tcp://localhost:61616, transport connector and 1 network:tcp://localhost:5000)?persistent=fnetwork connector with alse persistence disabled. 2.3.2 Embedded Broker:

可以通过在应用程序中以编码的方式启动broker,例如: Java代码

BrokerService broker = new BrokerService(); broker.addConnector(\broker.start();

如果需要启动多个broker,那么需要为broker设置一个名字。例如: Java代码

BrokerService broker = new BrokerService(); broker.setName(\

broker.addConnector(\broker.start();

如果希望在同一个JVM内访问这个broker,那么可以使用VM Transport,URI是:vm://brokerName。关于更多的broker属性,可以参考Apache的官方文档。

此外,也可以通过BrokerFactory来创建broker,例如:

第8页

Java代码

BrokerService broker = BrokerFactory.createBroker(new URI(someURI)); someURI的可选值如下: URI Example scheme xbean: xbean:activemq.xml Description 查找运行环境(classpath)相对路径下的配置文件(如果在根目录下配置为---xbean:activemq.xml) 查找实例目录的broker配置文件(foo/bar/activemq.xml) file: file:foo/bar/activemq.xml broker: broker:tcp://localhost:61616 通过URL配置

当使用XBean的配置方式的时候,需要指定一个xml配置文件,例如: Java代码

BrokerService broker = BrokerFactory.createBroker(new URI(\/activemq.xml\

使用Spring的配置方式如下: Xml代码

2.4 ActiveMQ Broker的Transport

Transport 是Broker 数据传输使用的协议,ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。

第9页

下面介绍几种常用的Transport

2.4.1 TCP Transport

TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:

tcp://hostname:port?transportOptions Transport Options的可选值如下: Option Name Default Description Value The minimum version wireformat that is allowed Causes all commands that are sent over the transport to be logged When true, it causes the local machines name to resolve to \sets the socket timeout in milliseconds A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored. All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information minmumWireFormatVersion 0 trace useLocalHost socketBufferSize soTimeout false true 64 * 1024 Sets the socket buffer size in bytes 0 connectionTimeout 30000 wireFormat default The name of the WireFormat to use wireFormat.* 2.4.2 Failover Transport

Failover Transport是一种重新连接的机制,它工作于其它transport的上层,

用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。 Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法: failover:(uri1,...,uriN)?transportOptions failover:uri1,...,uriN

Transport Options的可选值如下:

第10页

Option Name initialReconnectDelay Default Value 10 Description How long to wait before the first reconnect attempt (in ms) maxReconnectDelay The maximum amount of time we ever 30000 wait between reconnect attempts (in ms) true 2.0 Should an exponential backoff be used btween reconnect attempts The exponent used in the exponential backoff attempts useExponentialBackOff reconnectDelayExponent maxReconnectAttempts From version 5.6 onwards: -1 is default and means retry forever, 0 means don't retry (only try connection once but no retry). Prior to version 5.6: 0 is default -1 | 0 and means retry forever. All versions: If set to >0, then this is the maximum number of reconnect attempts before an error is sent back to the client. If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client on the first attempt by the client to start a connection, once connected the maxReconnectAttempts option takes precedence. use a random algorithm to choose the the URI to use for reconnect from the list provided startupMaxReconnectAttempts 0 randomize true backup initialize and hold a second false transport connection - to enable fast failover Enables timeout on send operations (in miliseconds) without interruption of reconnection process keep a cache of in-flight messages that will flushed to a broker on timeout -1 trackMessages false 第11页

reconnect maxCacheSize 131072 size in bytes for the cache, if trackMessages is enabled Determines whether the client should accept updates to its list of known URIs from the connected broker. Added in ActiveMQ 5.4 A URL (or path to a local file) to a text file containing a comma separated list of URIs to use for reconnect in the case of failure. Added in ActiveMQ 5.4 Extra options to add to the nested URLs. Added in ActiveMQ 5.9 After every N reconnect attempts log a warning to indicate there is no connection but that we are still trying, set to <= 0 to disable. Added in ActiveMQ 5.10 Determines whether the client should respond to broker ConnectionControl events with a reconnect (see: rebalanceClusterClients) updateURIsSupported true updateURIsURL null nested.* null warnAfterReconnectAttempts.* 10 reconnectSupported true

例如:

failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

2.4.3 Discover Transport

Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:

discovery:(discoveryAgentURI)?transportOptions discovery:discoveryAgentURI

Transport Options的可选值如下: Option Name Default Description Value 第12页

initialReconnectDelay 10 maxReconnectDelay 30000 How long to wait before the first reconnect attempt The maximum amount of time we ever wait between reconnect attempts Should an exponential backoff be used btween reconnect attempts The exponent used in the exponential backoff attempts If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client useExponentialBackOff true backOffMultiplier 2 maxReconnectAttempts 0 例如:

discovery:(multicast://default)?initialReconnectDelay=100 为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子: Xml代码

...

在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个 broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker 的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。

在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:

Java代码

(ActiveMQConnection)connection).addTransportListener(new TransportListener() {

public void onCommand(Object cmd) { }

public void onException(IOException exp) {

第13页

}

public void transportInterupted() {

// The transport has suffered an interruption from which it hopes to recover. }

public void transportResumed() {

// The transport has resumed after an interruption. } });

2.5 ActiveMQ Broker的持久方式

当ActiveMQ的队列比较多时,可能内存将会不够用,或突然关机,可能导致内存数据丢失,所以我们需要通过其他方式来保证数据的完整性。目前Broker常用的持久方式有:AMQMessage Store,Kaha Persistence,JDBC Persistence

2.5.1 AMQMessage Store

AMQ Message Store是ActiveMQ5.0 缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子: Xml代码

第14页

Property name directory Default value Comments the path to the directory to activemq-data use to store the message store data and log files true false 32mb use NIO to write messages to the data logs sync every write to disk a hint to set the maximum size of the message data logs use a persistent index for the message logs. If this is false, an in-memory structure is maintained the maximum number of messages to keep in a transaction before automatically committing time (ms) before checking for a discarding/moving message data logs that are no longer used default number of bins used by the index. The bigger the bin size - the better the relative performance of the index the size of the index key - the key is the message id the size of the index page - the bigger the page - the better the write performance of the index the path to the directory to use to store discarded data logs if true data logs are moved to the archive directory instead of being deleted useNIO syncOnWrite maxFileLength persistentIndex true maxCheckpointMessageAddSize 4kb cleanupInterval 30000 indexBinSize 1024 indexKeySize 96 indexPageSize 16kb directoryArchive archive archiveDataLogs false 第15页

2.5.2 Kaha Message Store

Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。以下是其配置的一个例子: Xml代码

2.5.3 JDBC Message Store

目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。

如果你使用的数据库不被支持,那么可以调整StatementProvider 来保证使用正确的SQL方言(flavour of SQL)。通常绝大多数数据库支持以下adaptor:

org.activemq.store.jdbc.adapter.BlobJDBCAdapter ? org.activemq.store.jdbc.adapter.BytesJDBCAdapter ? org.activemq.store.jdbc.adapter.DefaultJDBCAdapter ? org.activemq.store.jdbc.adapter.ImageJDBCAdapter

?

也可以在配置文件中直接指定JDBC adaptor,例如: Xml代码

1.

第16页

Xml代码

2.6 ActimveMQ的其他特性

2.6.1 异步发送消息

ActiveMQ支持生产者以同步或异步模式发送消息。使用不同的模式对send方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ吞吐量的重要因素,使用异步发送可以提高系统的性能。

在默认大多数情况 下,AcitveMQ是以异步模式发送消息。例外的情况:在没有使用事务的情况下,生产者以PERSISTENT传送模式发送消息。在这种情况 下,send方法都是同步的,并且一直阻塞直到ActiveMQ发回确认消息:消息已经存储在持久性数据存储中。这种确认机制保证消息不会丢失,但会造成 生产者阻塞从而影响反应时间。

高性能的程序一般都能容忍在故障情况下丢失少量数据。如果编写这样的程序,可以通过使用异步发送来提高吞吐量(甚至在使用PERSISTENT传送模式的情况下)。

使用Connection URI配置异步发送:

cf = new ActiveMQConnectionFactory(\在ConnectionFactory层面配置异步发送:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

在Connection层面配置异步发送,此层面的设置将覆盖ConnectionFactory层面的设置:

第17页

((ActiveMQConnection)connection).setUseAsyncSend(true);

2.6.2 消费者异步分派

在ActiveMQ中,支持ActiveMQ以同步或异步模式向消费者分派消息。这样的意义:可以以异步模式向处理消息慢的消费者分配消息;以同步模式向处理消息快的消费者分配消息。

ActiveMQ默认以同步模式分派消息,这样的设置可以提高性能。但是对于处理消息慢的消费者,需要以异步模式分派。

在ConnectionFactory层面配置同步分派:

((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

在Connection层面配置同步分派,此层面的设置将覆盖ConnectionFactory层面的设置: ((ActiveMQConnection)connection).setDispatchAsync(false);

在消费者层面以Destination URI配置同步分派,此层面的设置将覆盖ConnectionFactory和Connection层面的设置:

queue = new ActiveMQQueue(\

consumer = session.createConsumer(queue);

2.6.3 消费者优先级

在ActveMQ分布式环境中,在有消费者存在的情况下,如果更希望ActveMQ发送消息给消费者而不是其他的ActveMQ到ActveMQ的传送,可以如下设置: Java客户端:

queue = new ActiveMQQueue(\consumer = session.createConsumer(queue);

2.6.4 独占消费者

ActiveMQ维护队列消息的顺序并顺序把消息分派给消费者。但是如果建立了多个Session和MessageConsumer,那么同一时刻多个线程同时从一个队列中接收消息时就并不能保证处理时有序。

有时候有序处理消息是非常重要的。ActiveMQ支持独占的消费。ActiveMQ挑选一个MessageConsumer,并把一个队列中所有消息按顺序分派给它。如果消费者发生故障,那么ActiveMQ将自动故障转移并选择另一个消费者。可以如下设置: Java客户端:

queue = new ActiveMQQueue(\consumer = session.createConsumer(queue);

2.6.5 消息重发策略

在事务控制里抛出异常,txManager会进行rollback处理.(在activeMQ里,消息默认会redelivery到客户端6次,如果继续异常,消息会放到deadletter queue里(ActiveMQ.DLQ))。

在以下三种情况中,消息会被再次传送给消费者: 1.在使用事务的Session中,调用rollback()方法;

2.在使用事务的Session中,调用commit()方法之前就关闭了Session;

第18页

3.在Session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了recover()方法。

可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略。你可以在ActiveMQConnectionFactory 或ActiveMQConnection类中配置RedeliveryPolicy属性,用来定义重传策略的具体细节。

您可以使用Java代码,Spring配置或配置URI字符串来定义重传策略 可用的属性:

属性 collisionAvoidanceFactor 默认值 0.15 maximumRedeliveries 6 maximumRedeliveryDelay -1 initialRedeliveryDelay redeliveryDelay 1000L 1000L useCollisionAvoidance false useExponentialBackOff false backOffMultiplier 5 说明 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 初始重发延迟时间 重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4) 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。 启用指数倍数递增的方式增加延迟时间。 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效

2.6.6 目标相关的属性

ActiveMQ支持混合目标技术。它允许在一个JMS目标中使用一组JMS目标。

第19页

例如可以利用混合目标在同一操作中用向12个队列发送同一条消息或者在同一操作中向一个主题和一个队列发送同一条消息。

在混合目标中,通过“,”来分隔不同的目标。 Java客户端: 例如:

// send to 3 queues as one logical operation

Queue queue = new ActiveMQQueue(\producer.send(queue, someMessage);

如果在一个目标中混合不同类别的目标,可以通过使用“queue://”和“topic://”前缀来识别不同的目标。 例如:

// send to queues and topic one logical operation

Queue queue = new ActiveMQQueue(\

producer.send(queue, someMessage);

目标选项有:

属性 consumer.prefetchSize 默认值 variable 描述 The number of message the consumer will prefetch. Use to control if messages xare dropped if a slow consumer situation exists. Same as the noLocal flag on a Topic consumer. Exposed here so that it can be used with a queue. Should the broker dispatch messages asynchronously to the consumer. Is this a Retroactive Consumer. JMS Selector used with the consumer. consumer.maximumPendingMessageLimit 0 consumer.noLocal FALSE consumer.dispatchAsync FALSE consumer.retroactive FALSE consumer.selector null 第20页

consumer.exclusive FALSE consumer.priority Is this an Exclusive Consumer. Allows you to 0 configure a Consumer Priority. Java客户端的使用: 例如: queue = new

ActiveMQQueue(\er.prefetchSize=10\

consumer = session.createConsumer(queue);

2.6.7 消息预取处理

ActiveMQ的目标之一就是高性能的数据传送,所以ActiveMQ使用“预取限制”来控制有多少消息能及时的传送给任何地方的消费者。 一旦预取数量达到限制,那么就不会有消息被分派给这个消费者直到它发回签收消息(用来标识所有的消息已经被处理)。

可以为每个消费者指定消息预取。如果有大量的消息并且希望更高的性能,那么可以为这个消费者增大预取值。如果有少量的消息并且每条消息的处理都要花费很长的时间,那么可以设置预取值为1,这样同一时间,ActiveMQ只会为这个消费者分派一条消息。 Java客户端:

在ConnectionFactory层面为所有消费者配置预取值: tcp://localhost:61616?jms.prefetchPolicy.all=50 在ConnectionFactory层面为队列消费者配置预取值:

第21页

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 使用“目标选项”为一个消费者配置预取值: queue = new

ActiveMQQueue(\

consumer = session.createConsumer(queue);

2.6.8 配置连接URL

ActiveMQ支持通过Configuration URI明确的配置连接属性。 例如:当要设置异步发送时,可以通过在Configuration URI中使用jms.$PROPERTY来设置。

tcp://localhost:61616?jms.useAsyncSend=true

以下的选项在URI必须以“jms.”为前缀。

属性 默认值 描述 If this flag is set then a seperate thread is not used for dispatching messages for each Session in the Connection. However, a separate thread is always used if there is more than one session, or the session isn't in auto acknowledge or dups ok mode Sets the JMS clientID to use null for the connection Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker. alwaysSessionAsync TRUE clientID closeTimeout 15000 (milliseconds) 第22页

copyMessageOnSend TRUE Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. This is enabled by default to be compliant with the JMS specification. You can disable it if you do not mutate JMS messages after they are sent for a performance boost. Sets whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost. disableTimeStampsByDefault FALSE dispatchAsync nestedMapAndListEnabled objectMessageSerializationDefered Should the broker dispatch FALSE messages asynchronously to the consumer. Enables/disables whether or not Structured Message Properties and MapMessages are supported so that Message TRUE properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag FALSE causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk. 第23页

optimizeAcknowledge FALSE optimizedMessageDispatch TRUE useAsyncSend FALSE useCompression FALSE useRetroactiveConsumer FALSE Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNING enabling this issue could cause some issues with auto-acknowledgement on reconnection If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss. Enables the use of compression of the message bodies Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started. 2.6.9 消息重发与死信管理

DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。 出现以下情况时,消息会被redelivered

A transacted session is used and rollback() is called.

第24页

A transacted session is closed before commit is called.

A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called. 当一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的链接)次数时,会给broker发送一个\,这个消息被认为是a poison pill,这时broker会将这个消息发送到DLQ,以便后续处理。

缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。

缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ 可以通过配置文件(activemq.xml)来调整死信发送策略。 1.不使用缺省的死信队列

缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理。可以通过individualDeadLetterStrategy或sharedDeadLetterStrategy策略来进行修改。如下:

' ,否则用队列名称 -->

第25页

queuePrefix=\

...

2.非持久消息保存到死信队列

3.过期消息不保存到死信队列

2.6.10 生产者和消费者的密码使用

打开conf/activemq.xml文件,在标签里的标签前加入:

password=\

第26页

注意必须在标签前,否则启动ActiveMQ会报错。

第27页

第三章 ActiveMQ集群方式

3.1 AMQ集群特点

? 异构系统通讯、异步调用、应用解耦。

? 高可用性。每组Broker采用主从模式部署,支持Failover。 ? 客户端支持多组Broker服务器负载均衡和Failover。 ? 统一的异常消息重试服务,偏于容错。 ? 同步发送、消息持久化,防止数据丢失。 ? 消息异步归档到云存储,偏于问题跟踪。

? 高扩展性。动态扩容,支持水平扩展,按消息垂直划分。

? 易维护。统一集群管理控制平台,完善的监控报警,消息运营状况统计。 ? 原理图:

3.2 AMQ集群方式--Queue consumer clusters

ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它 consumers更快,那么这个consumer就会消费更多的消息。

第28页

3.3 AMQ集群方式--Broker clusters

一个常见的场景是有多个JMS broker,有一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover:// 协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://。

如果某个网络上有多个brokers而且客户使用静态发现(使用Static Transport或Failover Transport)或动态发现(使用Discovery Transport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,stand alone brokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Network of brokers,以便在broker之间存储转发消息。

从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Network of brokers, 一种是使用static transport,如下:

第29页

另外一种是使用multicast discovery,如下:

...

Network Connector有以下属性:

属性 name dynamicOnly 默认描述 值 bridg名称 e 如果为true, 持久订阅被激活时false 才创建对应的网路持久订阅。默认是启动时激活。 如果为true,网络的消费者优先false 级降低为-5。如果为false,则默认跟本地消费者一样为0. 消息和订阅在网络上通过的1 broker数量 多个网络消费者是否被当做一个true 消费者来对待。 第30页

decreaseNetworkConsumerPriority networkTTL conduitSubscriptions

excludedDestinations empty 不通过网络转发的destination 通过网络转发的destinations,dynamicallyIncludedDestinations empty 注意空列表代表所有的都转发。 匹配的都将通过网络转发-即使没staticallyIncludedDestinations empty 有对应的消费者 如果为true,则既可消费又可生duplex false 产消息到网络broker 设置网络消费者的prefetch sizeprefetchSize 1000 参数。必须大于0,因为网络消费者不能自己轮询消息。 suppressDuplicateQueueSubscriptio(从5.3版本开始) 如果为true, false ns 重复的订阅关系一产生即被阻止。 是否广播advisory messages来创bridgeTempDestinations true 建临时destination。 (从 5.6版本开始) 如果为true,非持久化消息也将使用alwaysSyncSend false request/reply方式代替oneway方式发送到远程broker。 (从5.6版本开始) 如果为true,只有staticBridge false staticallyIncludedDestinations中配置的destination可以被处理。 3.3.1 管道订阅conduitSubscriptions

对于conduitSubscriptions属性,用于负载均衡时使用。设想有两个brokers,分别是brokerA和brokerB,它们之间用 forwarding bridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer,它发送了30条消息到brokerA。如果 conduitSubscriptions=true,那么brokerA上的consumer会得到15条消息,另外15条消息会发送给brokerB。此时负载并不均衡,因为此时brokerA将brokerB上的两个consumers视为一个;如果 conduitSubscriptions=false,那么每个consumer上都会收到10条消息。

3.3.2 双向网络连接(duplex networkConnector)

默认NetworkConnector在需要转发消息时是单向连接的。当duplex=true时,就变成了双向连接,这时配置在broker2端的指向broker1的duplex networkConnector,相当于即配置了

第31页

broker2到broker1的网络连接,也配置了broker1到broker2的网络连接。(就是说不管broker1同意与否,都被绑架了。)当然,仅仅在broker1上配置也有同样的效果。

注意:可以在两个broker间建立两个以上的双向网络连接来增加吞吐量或对主题\\队列分区,只需要指定他们使用不同的name即可。

3.3.3 指定和限制Destination

通过NetworkConnector共享的destination太多,传输的Advisory Message就会变的非常多,系统的拓扑结构将变得非常复杂,所有才有多种方式来限制这些destination配置:

dynamicallyIncludedDestinations

这里匹配到的destination,在需要时将被转发。 excludedDestinations

这里匹配到的destination,将不会被转发 staticallyIncludedDestinations

如果指定了staticBridge为true,则只有这里匹配的destination可以被转发。此时本地broker完全被其他broker代理。 并且本broker不会订阅其他broker上的AdvisoryMessage,也不会获取任何远程consumer信息。 这几个配置可以使用通配符,比如“>”,详见wildcard。 示例代码:

conduitSubscriptions=\

decreaseNetworkConsumerPriority=\>

第32页

此外,从5.6版本起,可以在networkConnector上设置destinationFilter来指定感兴趣的Advisory Message将被传播。

destinationFilter=\Topic.include.test.bar\>

3.3.4 被卡住的消息

一个很有意思的场景是,broker1和broker2通过networkConnector连接。一些个consumers连接到 broker1,消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转发部分消 息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是有一部分他们 还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。怎么办 呢?

办法就是从5.6版本destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上 有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消 息而不被分发。

第33页

3.3.5 其他说明

.NetworkConnector基于AdvisoryMessage机制,如果broker的advisorySupport选型被禁用,则NetworkConnector将不起作用。 用作转发的broker中入列出列这些统计信息只记录其转发的数据。 用作转发的broker中无法看到远程broker的相同队列中的数据(browse消息列表为空,queuesize为0)。

3.4 AMQ集群方式--Master Slave

在一个网络内运行多个brokers或者stand alone brokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个 broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。Master Slave 背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。

Master Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。以下是几种不同的类型: Master Requirements Slave Type Pure Master Slave Shared File System Master Slave JDBC Pros Cons None Requires manual restart No central point of to bring back a failed failure master and can only support 1 slave Run as many slaves A Shared File as required. Requires shared file system such as a Automatic recovery system SAN of old masters A Shared Run as many slaves Requires a shared 第34页

Master Slave database as required. database. Also Automatic recovery relatively slow as it of old masters cannot use the high performance journal

3.4.1 Pure Master Slave

Pure Master Slave该方式已经逐渐被淘汰,这里不再描述。

3.4.2 Shared File System Master Slave

如果你使用SAN或者共享文件系统,那么你可以使用Shared File System Master Slave。基本上,你可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。客户端使用failover transport来连接到可用的broker。当master broker失效的时候会释放这把锁,这时候其中一个slave broker会得到这把锁从而成为master broker。以下是ActiveMQ配置的一个例子:

3.4.3 JDBC Master Slave

JDBC Master Slave的工作原理跟Shared File System Master Slave类似,只是采用了数据库作为持久化存储。以下是ActiveMQ配置的一个例子:

...

第35页

id=\

class=\

destroy-method=\

value=\

name=\

第36页

第四章 ActiveMQ+Mongodb集群实践

正常的来说,Master-Slave方式集群无法处理负载均衡,一般都是通过Broker-Clusters + Master-Slave方式。在负载均衡上,配置文件使用networkConnectors属性配置,java客户端使用failover协议来处理。

4.1 AMQ+Mongodb的群集

对于ActiveMQ 5.9.1的配置,直接将如下的压缩包拷贝到ActiveMQ根目录的lib目录,并解压缩。(一个专为5.9.0版本使用,另一个专为5.9.1版本使用)

MQ-store-mongodb-5.9.0.zipMQ-store-mongodb-5.9.1.zip

然后在activemq.xml的broker结点下加入:

uri=\

通过java来使用该AMQ集群发送处理时,

ActiveMQConnectionFactory factory =

newActiveMQConnectionFactory(“failover:(tcp://host1:61618,tcp://host1:61618)”);

Connectionconnection = factory.createConnection(); connection.start();

Sessionsession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session

.createQueue(ActiveMQUtils.testQueueName);

MessageProducerproducer = session.createProducer(destination);

第37页

本文来源:https://www.bwwdw.com/article/lvg6.html

Top