本人实现的功能为activemq将消息持久化到数据库的方法:
1:前言
这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。
A:持久化为文件
这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了。涉及到的配置和代码有
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
B:持久化为MySql
你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
接下来你修改配置文件
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
在配置文件中的broker节点外增加
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。
然后重新启动消息队列,你会发现多了3张表
1:activemq_acks
2:activemq_lock
3:activemq_msgs
C:持久化为Oracle
和持久化为MySql一样。这里我说两点
1;在ActiveMQ安装文件夹里的Lib文件夹中增加Oracle的JDBC驱动。驱动文件位于Oracle客户端安装文件中的product\11.1.0\client_1\jdbc\lib文件夹下。
2:
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:test"/>
<property name="username" value="qdcommu"/>
<property name="password" value="qdcommu"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
这里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己实际情况设置一下就可以了,特别注意的是test是SID即服务名称而不是TNS中配置的节点名。各位同学只需要替换IP,端口和这个SID就可以了。
消息消费者的事先代码:
package easyway.activemq.app; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /*** * 消息持久化到数据库 * @author longgangbai */ public class MessageCustomer { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String QUEUENAME="ActiveMQ.QUEUE"; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 创建Broker服务对象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); broker.addConnector(url); return broker; } /** * 启动BrokerService进程 * @throws Exception */ public void init() throws Exception{ BrokerService brokerService=createBroker(); brokerService.start(); } /** * 接收的信息 * @return * @throws Exception */ public int receiveMessage() throws Exception{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); return receiveMessages(messagesExpected,session); } /** * 接受信息的方法 * @param messagesExpected * @param session * @return * @throws Exception */ protected int receiveMessages(int messagesExpected, Session session) throws Exception { int messagesReceived = 0; for (int i=0; i<messagesExpected; i++) { Destination destination = session.createQueue(QUEUENAME); MessageConsumer consumer = session.createConsumer(destination); Message message = null; try { logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected); message = consumer.receive(2000); logger.info("Received : " + message); if (message != null) { session.commit(); messagesReceived++; } } catch (Exception e) { logger.debug("Caught exception " + e); session.rollback(); } finally { if (consumer != null) { consumer.close(); } } } return messagesReceived; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
消息生产者的代码:
package easyway.activemq.app; import java.io.File; import java.util.Properties; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import easyway.activemq.app.utils.BrokenPersistenceAdapter; /** * 消息持久化到数据库 * @author longgangbai * */ public class MessageProductor { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String queueName="ActiveMQ.QUEUE"; private BrokerService brokerService; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 创建Broker服务对象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter(); broker.setPersistenceAdapter(jdbc); jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator); jdbc.setAdapter(new MySqlJDBCAdapter()); broker.setPersistent(true); broker.addConnector("tcp://localhost:61617"); //broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); return broker; } /** * 创建Broken的持久化适配器 * @return * @throws Exception */ public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{ BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter(); DataSource datasource=createDataSource(); jdbc.setDataSource(datasource); jdbc.setUseDatabaseLock(false); //jdbc.deleteAllMessages(); return jdbc; } /** * 创建数据源 * @return * @throws Exception */ public DataSource createDataSource() throws Exception{ Properties props=new Properties(); props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://localhost:3306/activemq"); props.put("username", "root"); props.put("password", "root"); DataSource datasource=BasicDataSourceFactory.createDataSource(props); return datasource; } /** * 启动BrokerService进程 * @throws Exception */ public void init() throws Exception{ createBrokerService(); brokerService.start(); } public BrokerService createBrokerService() throws Exception{ if(brokerService==null){ brokerService=createBroker(); } return brokerService; } public void sendMessage() throws JMSException{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0;i<messagesExpected;i++){ logger.debug("Sending message " + (i+1) + " of " + messagesExpected); producer.send(session.createTextMessage("test message " + (i+1))); } connection.close(); } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
持久化适配器类
package easyway.activemq.app.utils; import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author longgangbai * */ public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter { private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class); private boolean shouldBreak = false; @Override public void commitTransaction(ConnectionContext context) throws IOException { if ( shouldBreak ) { LOG.warn("Throwing exception on purpose"); throw new IOException("Breaking on purpose"); } LOG.debug("in commitTransaction"); super.commitTransaction(context); } public void setShouldBreak(boolean shouldBreak) { this.shouldBreak = shouldBreak; } }
测测试代码如下:
package easyway.activemq.app.test; import easyway.activemq.app.MessageProductor; public class MessageProductorTest { public static void main(String[] args) throws Exception { MessageProductor productor =new MessageProductor(); productor.init(); productor.sendMessage(); //productor.createBrokerService().stop(); } }
package easyway.activemq.app.test; import easyway.activemq.app.MessageCustomer; public class MessageCustomerTest { public static void main(String[] args) throws Exception { MessageCustomer customer=new MessageCustomer(); //customer.init(); //当两台机器在不同的服务器上启动客户端的broker进程 customer.receiveMessage(); } }
备注:运行过程为:首先执行MessageProductorTest,MessageCustomerTest。
mysql数据库activemq必须存在。关于消息持久化的表结构如下:
相关推荐
ActiveMQ的持久化(数据库)[归类].pdf
自己写的ActiveMQ简单demo,包括生产者、消费者之间发送消息、持久化到文件和持久化到数据库,期中持久化需要修改activemq.xml文件
KahaDB他是默认的持久化策略,所有消息都会顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息...
从 ActiveMQ 5.9 开始,...LevelDB 是 Google 开发的一套用于持久化数据的高性能类库。LevelDB 并不是一种服务,用户需要自 行实现 Server。是单进程的服务,能够处理十亿级别规模 Key-Value 型数据,占用内存小。
2.3 Redis持久化机制 2.4 Redis高级特性和集群 3. MySQL数据库篇 3.1 MySQL简介和基本操作 3.2 数据库设计范式和优化 3.3 事务和并发控制 3.4 索引和优化技巧 4. 框架篇 4.1 Spring框架概述 4.2 Spring...
图解Redis中的AOF和RDB持久化策略的原理 redis读写分离架构实践 redis哨兵架构及数据丢失问题分析 redis Cluster数据分布算法之Hash slot redis使用常见问题及性能优化思路 redis高可用及高并发实战 缓存击穿...
持久层:mybatis持久化,使用MyBatis-Plus优化,减少sql开发量;aop切换数据库实现读写分离。Transtraction注解事务。 MVC: 基于spring mvc注解,Rest风格Controller。Exception统一管理。 缓存和Session:注解redis...
【消息队列】持久化消息非常慢 162 【消息队列】消息的不均匀消费 162 【消息队列】ActiveMQ中的消息重发时间间隔和重发次数吗? 164 【Dubbo】dubbo介绍 166 Dubbo 是什么 166 Dubbo 架构流程图 167 调用流程 167 ...
xml java系统源码 高端技术 :wolf: 分享一些大型互联网架构常用的高端技术 目录 消息队列 ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线...C语言编写、支持网络、可基于内存亦可持久化的日志型、K
transactions-dubbo dubbo项目基于atomikos的分布式事务管理 框架介绍 ... 项目现在有很多不足,微服务框架只支持dubbo,数据库持久化框架只支持mybatis,只支持管理数据库事务、activemq的事务、rock
C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 MongoDB:MongoDB是一个基于分布式文件存储的NO-SQL型数据库。 ActiveMQ:ActiveMQ 是Apache出品,能力强劲的开源消息...
2.2 持久化存储 71 2.2.1 MySQL扩展 72 2.2.2 HBase 80 2.2.3 Redis 91 2.3 消息系统 95 2.3.1 ActiveMQ & JMS 96 2.4 垂直化搜索引擎 104 2.4.1 Lucene简介 105 2.4.2 Lucene的使用 108 2.4.3 ...
1.系统技术 1.jooq + spring + jsonrpc + maven + quartz + c3p0 + ... 3).dao层jooq操作数据库 使用spring的@Repository注解来标记持久层 2.task任务 1).定时任务,系统启动时,连接数据库进行任务加载以及初始化
users-handler-实现CQRS的Command部分,并使用h2数据库进行持久化。 这些服务通过Apache ActiveMQ提供的JMS总线进行通信。 更详细的设计文档可。 先决条件 安装Docker Machine并撰写: : 生成并运行 git克隆 cd...
Redis真的那么好用吗? 一、Redis是什么? Redis是一个开源的底层使用C语言...持久化数据访问较慢、 用key查询 2、消息队列 相当于消息订阅系统,比如ActiveMQ、RocketMQ。如果对数据有较高一致性要求时,还是建议使用
和Kafka类似消息中间件开源产品还包括RabbiMQ、ActiveMQ、ZeroMQ 等。 MapReduce是Google公司的核⼼计算模型,它将运⾏于⼤规模集群上的复杂并⾏计算过程⾼度抽象为两个函数:map和reduce。 MapReduce最伟⼤之处在于...
9.4.4配置消息序列化器 9.4.5处理端点异常 9.4.6提供wsdl文件 9.4.7部署服务 9.5消费spring-wsweb服务 9.5.1使用web服务模板 9.5.2使用web服务的网关支持 9.6小结 第10章spring消息 10.1jms简介 10.1.1...
9.4.4 配置消息序列化器 9.4.5 处理端点异常 9.4.6 提供WSDL文件 9.4.7 部署服务 9.5 消费Spring-WS Web服务 9.5.1 使用Web服务模板 9.5.2 使用Web服务的网关支持 9.6 小结 第10章 Spring消息 10.1 JMS...
9.4.4 配置消息序列化器 9.4.5 处理端点异常 9.4.6 提供WSDL文件 9.4.7 部署服务 9.5 消费Spring-WS Web服务 9.5.1 使用Web服务模板 9.5.2 使用Web服务的网关支持 9.6 小结 第10章 Spring消息 10.1 JMS...
消息队列 jms Queue Topic kafka 持久 复制 Stream Partition rocketMQ RabbitMQ ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS ...