`
longgangbai
  • 浏览: 7259178 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ的拦截器插件

阅读更多

ActiveMQ拦截器使用和原理

     ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。

publicinterface BrokerPlugin {

    /**

     *Installsthepluginintotheinterceptorchainofthebroker,returningthenew

     *interceptedbrokertouse.

     */

    Broker installPlugin(Broker broker) throws Exception;

}

 

1.       日志拦截器

      日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-logLog4j两种日志。

Attribute

Description

Default Value

logAll

Log all Events

false

logMessageEvents

Events related with producing, consuming and dispatching messages

false

logConnectionEvents

Events related to connections and sessions

true

logTransactionEvents

Events related to transaction handling

false

logConsumerEvents

Events related to consuming messages

false

logProducerEvents

Events related to producing messages

false

logInternalEvents

Events normally not of Interest for users like failover, querying internal objects etc

false

默认连接时间日志是开启,其他均未开启。通过activemq开启相关的日志。

    <plugins>

      <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->

      <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>

    </plugins>

2.       统计拦截器

     ActiveMQ5.3开始,StatisticsPlugin插件被用作检测Broker中统计的插件。注意消息必须包含replyTo的消息头,如果是在JMS那么需要采用jmsReplyTo消息头,否则消息将被统计忽略。ReplyTo消息头包含了你想检查统计的消息。统计消息是一个MapMessage.

      检查Broker的信息,仅仅需要一个名称为ActiveMQ.Statistics.Broker并且有一个replyTo的消息头的Destination。为了检测所有destination,你需要一个名称为ActiveMQ.Statistics.Destination.<destination-name>或者ActiveMQ.Statistics.Destination.<wildcard-expression>并且有一个replyTo的消息头。如果许多Destination匹配相关的模糊匹配表达式,那么统计的消息将被发送到replyToDestination.

    <plugins>

      <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->

      <statisticsBrokerPlugin/>

</plugins>

Statistics插件发送消息到特殊的目标。

下面是一个统计Broker的消息

           Queue replyTo = session.createTemporaryQueue();

           MessageConsumer consumer = session.createConsumer(replyTo);

 

           String queueName = "ActiveMQ.Statistics.Broker";

           Queue testQueue = session.createQueue(queueName);

           MessageProducer producer = session.createProducer(testQueue);

           Message msg = session.createMessage();

           msg.setJMSReplyTo(replyTo);

           producer.send(msg);

 

           MapMessage reply = (MapMessage) consumer.receive();

           assertNotNull(reply);

 

           for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {

             String name = e.nextElement().toString();

             System.out.println(name + "=" + reply.getObject(name));

        }

查询Destination的统计信息

           Queue replyTo = session.createTemporaryQueue();

           MessageConsumer consumer = session.createConsumer(replyTo);

 

           Queue testQueue = session.createQueue("TEST.FOO");

           MessageProducer producer = session.createProducer(null);

 

           String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()

           Queue query = session.createQueue(queueName);

 

           Message msg = session.createMessage();

 

           producer.send(testQueue, msg)

           msg.setJMSReplyTo(replyTo);

           producer.send(query, msg);

           MapMessage reply = (MapMessage) consumer.receive();

           assertNotNull(reply);

           assertTrue(reply.getMapNames().hasMoreElements());

                  

           for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {

               String name = e.nextElement().toString();

               System.err.println(name + "=" + reply.getObject(name));

        }

 

ActiveMQ性能优化

1、目标策略

在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:

 

 

producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq.apache.org/per-destination-policies.html

当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:

VM Cursor:在内存中保存消息的引用。

File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。

在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。

对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有

vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。

Message Cursors的使用参考:http://activemq.apache.org/message-cursors.html

2、存储设置

设置消息在内存、磁盘中存储的大小,配置如下:

 

 

memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。

storeUsage表示持久化存储文件的大小。

tempUsage表示非持久化消息存储的临时内存大小。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics