ActiveMQ拦截器使用和原理
在ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。
publicinterface BrokerPlugin {
/**
*Installsthepluginintotheinterceptorchainofthebroker,returningthenew
*interceptedbrokertouse.
*/
Broker installPlugin(Broker broker) throws Exception;
}
1. 日志拦截器
日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-log和Log4j两种日志。
Attribute |
Description |
Default Value |
logAll |
Log all Events |
false |
logMessageEvents |
Events related with producing, consuming and dispatching messages |
false |
logConnectionEvents |
Events related to connections and sessions |
true |
logTransactionEvents |
Events related to transaction handling |
false |
logConsumerEvents |
Events related to consuming messages |
false |
logProducerEvents |
Events related to producing messages |
false |
logInternalEvents |
Events normally not of Interest for users like failover, querying internal objects etc |
false |
默认连接时间日志是开启,其他均未开启。通过activemq开启相关的日志。
<plugins>
<!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
<loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
</plugins>
2. 统计拦截器
从ActiveMQ5.3开始,StatisticsPlugin插件被用作检测Broker中统计的插件。注意消息必须包含replyTo的消息头,如果是在JMS那么需要采用jmsReplyTo消息头,否则消息将被统计忽略。ReplyTo消息头包含了你想检查统计的消息。统计消息是一个MapMessage.
检查Broker的信息,仅仅需要一个名称为ActiveMQ.Statistics.Broker并且有一个replyTo的消息头的Destination。为了检测所有destination,你需要一个名称为ActiveMQ.Statistics.Destination.<destination-name>或者ActiveMQ.Statistics.Destination.<wildcard-expression>并且有一个replyTo的消息头。如果许多Destination匹配相关的模糊匹配表达式,那么统计的消息将被发送到replyTo的Destination.
<plugins>
<!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
<statisticsBrokerPlugin/>
</plugins>
Statistics插件发送消息到特殊的目标。
下面是一个统计Broker的消息
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
String queueName = "ActiveMQ.Statistics.Broker";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.out.println(name + "=" + reply.getObject(name));
}
查询Destination的统计信息
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(null);
String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()
Queue query = session.createQueue(queueName);
Message msg = session.createMessage();
producer.send(testQueue, msg)
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name + "=" + reply.getObject(name));
}
ActiveMQ性能优化
1、目标策略
在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:
producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq.apache.org/per-destination-policies.html
当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:
VM Cursor:在内存中保存消息的引用。
File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。
在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
Message Cursors的使用参考:http://activemq.apache.org/message-cursors.html
2、存储设置
设置消息在内存、磁盘中存储的大小,配置如下:
memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。
storeUsage表示持久化存储文件的大小。
tempUsage表示非持久化消息存储的临时内存大小。
相关推荐
ActiveMQ插件
基于 IP 验证和授权插件的源码及jar,jar包可直接拷贝至activemq 目录下的 lib 目录下,打开activemq\conf\activemq.xml,在 broker 节点中加入: <plugins> ...
activemq通过IP或特征码授权插件
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
activemq activeMq笔记.docx
activemq-gc-插件 ActiveMQ GC 插件
最新activemq-cpp开发手册!
ActiveMQ插件,用于连接验证 在外部activeMQ服务器中加入验证插件:插件加入地址为activeMQ/lib activeMQ.xml植入的配置文件 < value>http://127.0.0.1:8082/activeMQ/checkSecret</
apache-activemq Linux版本
activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-
ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁
activeMQ学习activeMQ学习activeMQ学习activeMQ学习
包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。
用于ACtiveMq 配置插件配置使用,配置介绍等,适合初学者
activemq 配置说明与activemq入门讲解
Munin ActiveMQ插件 在Munin中显示队列状态图。 要求 Perl模块 WWW::Mechanize XML::Simple 插件安装 德比安 下载脚本 以root身份将queue_脚本下载到/usr/share/munin/plugins/queue_ wget -P /usr/share/munin/...
activemq实战