配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
一段发送消息的代码:
执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:
点击队列名test,然后点击消息ID即可查看消息内容,如图:
如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:
执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:
这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。
在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:
private ConnectionFactory connectionFactory; private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); private ObjectPoolFactory poolFactory; private int maximumActive = 500; private int maxConnections = 1; private int idleTimeout = 30 * 1000; private AtomicBoolean stopped = new AtomicBoolean(false); private long expiryTimeout = 0l; /* Sets the maximum number of active sessions per connection */ public void setMaximumActive(int maximumActive) { this.maximumActive = maximumActive; } /** * @return the maxConnections */ public int getMaxConnections() { return maxConnections; } /** * @param maxConnections the maxConnections to set */ public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; } protected ObjectPoolFactory createPoolFactory() { return new GenericObjectPoolFactory(null, maximumActive); } public int getIdleTimeout() { return idleTimeout; } public void setIdleTimeout(int idleTimeout) { this.idleTimeout = idleTimeout; } /** * allow connections to expire, irrespective of load or idle time. This is useful with failover * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery * * @param expiryTimeout non zero in milliseconds */ public void setExpiryTimeout(long expiryTimeout) { this.expiryTimeout = expiryTimeout; } public long getExpiryTimeout() { return expiryTimeout; }
其中默认的Connecton连接数为1:
默认的每一个Connecton的创建的Session数量为500个。
默认是实现的PooledConnectionFactory类如下:
AmqJNDIPooledConnectionFactory
JcaPooledConnectionFactory
XaPooledConnectionFactory
PooledConnectionFactory
其中关于Session的管理的资源池使用如下类:
public class SessionPool implements PoolableObjectFactory
获取Session是从ConnectonPool中获取代码如下:
public Session createSession(boolean transacted, int ackMode) throws JMSException { SessionKey key = new SessionKey(transacted, ackMode); SessionPool pool = cache.get(key); if (pool == null) { pool = createSessionPool(key); cache.put(key, pool); } PooledSession session = pool.borrowSession(); return session; }
使用实例如下:
package easyway.app.activemq.demo.acknow; import java.io.IOException; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.activemq.transport.TransportListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * ActiveMQ 中连接池的使用 * * @author longgangbai * */ public class ActiveMQConnectionPool { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class); private BrokerService broker; private ActiveMQConnectionFactory factory; private PooledConnectionFactory pooledFactory; public void testEviction() throws Exception { broker = new BrokerService(); broker.setPersistent(false); broker.addConnector("tcp://localhost:61619"); broker.start(); factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false"); pooledFactory = new PooledConnectionFactory(factory); PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); ActiveMQConnection amqC = connection.getConnection(); final CountDownLatch gotExceptionEvent = new CountDownLatch(1); amqC.addTransportListener(new TransportListener() { public void onCommand(Object command) { } public void onException(IOException error) { // we know connection is dead... // listeners are fired async gotExceptionEvent.countDown(); } public void transportInterupted() { } public void transportResumed() { } }); sendMessage(connection); Connection connection2 = pooledFactory.createConnection(); sendMessage(connection2); } /** * 发送消息的方法 * @param connection * @throws JMSException */ private void sendMessage(Connection connection) throws JMSException { //获取会话信息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO")); producer.send(session.createTextMessage("Test")); session.close(); } public static void main(String[] args) throws Exception { ActiveMQConnectionPool test=new ActiveMQConnectionPool(); test.testEviction(); } }
在ActiveMQResourceManage的配置如下:
/** * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager * in a way that will allow the transaction manager to correctly recover XA transactions. * * For example, it can be used the following way: * <pre> * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> * <property name="brokerURL" value="tcp://localhost:61616" /> * </bean> * * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean"> * <property name="maxConnections" value="8" /> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * * <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * </pre> */
相关推荐
ActiveMQ相关jar包--使用Connection连接池,jar的版本很重要,请注意。
提供了activemq和zookeeper的安装包资源以及安装手册,如果操作过程中有任何问题,可以随时联系我
linux下activemq安装包和配置文档
自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用
ActiveMQ的安装与使用。单机版。ActiveMQ是业界比较流行的消息中间件。
activemq rcp测试的依赖资源,不包含rcp的例子
Linux下activeMQ的启动和停止
JMS教程+activemq以及activemq和tomcat的整合+整合实例代码+持久化消息配置以及工程+tomcat服务器的配置
1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...
ActiveMQ和HornetQ性能对比
构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...
activemq activeMq笔记.docx
请将本maven项目引入你自己的maven项目中(在你自己的pom.xml文件中配置这个项目的gourp和id以及版本号),通过模块化导入,注意把spring-activeMQ.xml加载到容器当中,运行tomcat启动项目,即可看到效果。
ActiveMQ(中文)参考手册 大名鼎鼎的 JMS 实现 Apache ActiveMQ 介绍文档
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的... activemq下载文件中有一个demo 包含所有jar包。此处就不上传了。
主要讲解activemq的安装,使用,集群的搭建,以及拓展
ActiveMQ 的安装与使用,ActiveMQ与spring整合,生产都、消费者、测试类等。
activeMQ推送服务端和客户端完整案例
ActiveMQ中文手册,可以帮助你的开发。
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行