`
longgangbai
  • 浏览: 7259208 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

关于ActiveMQ中Session和Connection资源的管理

阅读更多

      

配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
activemq所需的jar包
一段发送消息的代码:

执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:

点击队列名test,然后点击消息ID即可查看消息内容,如图:

如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:

执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:

这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。

 

       在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。

      在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:

     private ConnectionFactory connectionFactory;
    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
    private ObjectPoolFactory poolFactory;
    private int maximumActive = 500;
    private int maxConnections = 1;
    private int idleTimeout = 30 * 1000;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private long expiryTimeout = 0l;
     /* Sets the maximum number of active sessions per connection
     */
    public void setMaximumActive(int maximumActive) {
        this.maximumActive = maximumActive;
    }

    /**
     * @return the maxConnections
     */
    public int getMaxConnections() {
        return maxConnections;
    }

    /**
     * @param maxConnections the maxConnections to set
     */
    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    protected ObjectPoolFactory createPoolFactory() {
        return new GenericObjectPoolFactory(null, maximumActive);
    }

    public int getIdleTimeout() {
        return idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    /**
     * allow connections to expire, irrespective of load or idle time. This is useful with failover
     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
     * 
     * @param expiryTimeout non zero in milliseconds
     */
    public void setExpiryTimeout(long expiryTimeout) {
        this.expiryTimeout = expiryTimeout;   
    }
    
    public long getExpiryTimeout() {
        return expiryTimeout;
    }

 

  其中默认的Connecton连接数为1:

       默认的每一个Connecton的创建的Session数量为500个。

     默认是实现的PooledConnectionFactory类如下:

AmqJNDIPooledConnectionFactory

JcaPooledConnectionFactory

XaPooledConnectionFactory

PooledConnectionFactory

 其中关于Session的管理的资源池使用如下类:

public class SessionPool implements PoolableObjectFactory

 

获取Session是从ConnectonPool中获取代码如下:

    public Session createSession(boolean transacted, int ackMode) throws JMSException {
        SessionKey key = new SessionKey(transacted, ackMode);
        SessionPool pool = cache.get(key);
        if (pool == null) {
            pool = createSessionPool(key);
            cache.put(key, pool);
        }
        PooledSession session = pool.borrowSession();
        return session;
    }

 使用实例如下:

package easyway.app.activemq.demo.acknow;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ActiveMQ 中连接池的使用
 * 
 * @author longgangbai
 *
 */
public class ActiveMQConnectionPool  {
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private PooledConnectionFactory pooledFactory;

    public void testEviction() throws Exception {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61619");
        broker.start();
        factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false");
        pooledFactory = new PooledConnectionFactory(factory);
        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
        ActiveMQConnection amqC = connection.getConnection();
        final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
        amqC.addTransportListener(new TransportListener() {
            public void onCommand(Object command) {
            }
            public void onException(IOException error) {
                // we know connection is dead...
                // listeners are fired async
                gotExceptionEvent.countDown();
            }
            public void transportInterupted() {
            }
            public void transportResumed() {
            }
        });
        
        sendMessage(connection);
        Connection connection2 = pooledFactory.createConnection();
        sendMessage(connection2);
    }


   /**
    * 发送消息的方法
    * @param connection
    * @throws JMSException
    */
    private void sendMessage(Connection connection) throws JMSException {
    	//获取会话信息
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO"));
        producer.send(session.createTextMessage("Test"));
        session.close();
    }
    
    
	public static void main(String[] args) throws Exception {
		ActiveMQConnectionPool test=new ActiveMQConnectionPool();
		test.testEviction();
	}


}

 

 

在ActiveMQResourceManage的配置如下:

/**
 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
 * in a way that will allow the transaction manager to correctly recover XA transactions.
 *
 * For example, it can be used the following way:
 * <pre>
 *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 *      <property name="brokerURL" value="tcp://localhost:61616" />
 *   </bean>
 *
 *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
 *       <property name="maxConnections" value="8" />
 *       <property name="transactionManager" ref="transactionManager" />
 *       <property name="connectionFactory" ref="activemqConnectionFactory" />
 *       <property name="resourceName" value="activemq.broker" />
 *   </bean>
 *
 *   <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
 *         <property name="transactionManager" ref="transactionManager" />
 *         <property name="connectionFactory" ref="activemqConnectionFactory" />
 *         <property name="resourceName" value="activemq.broker" />
 *   </bean>
 * </pre>
 */

 

分享到:
评论
1 楼 鸿志永不止步 2017-03-09  
受益了,多谢!

相关推荐

Global site tag (gtag.js) - Google Analytics