`
longgangbai
  • 浏览: 7249915 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ消息持久化到数据库

阅读更多
  本人实现的功能为activemq将消息持久化到数据库的方法:

1:前言

     这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。

A:持久化为文件

     这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了。涉及到的配置和代码有

<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

B:持久化为MySql

     你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

     接下来你修改配置文件

<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>

在配置文件中的broker节点外增加

复制代码
复制代码
<bean id="derby-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
复制代码
复制代码

从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。

然后重新启动消息队列,你会发现多了3张表

1:activemq_acks

2:activemq_lock

3:activemq_msgs

C:持久化为Oracle

    和持久化为MySql一样。这里我说两点

1;在ActiveMQ安装文件夹里的Lib文件夹中增加Oracle的JDBC驱动。驱动文件位于Oracle客户端安装文件中的product\11.1.0\client_1\jdbc\lib文件夹下。

2:

复制代码
复制代码
<bean id="derby-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:test"/>
<property name="username" value="qdcommu"/>
<property name="password" value="qdcommu"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
复制代码
复制代码

这里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己实际情况设置一下就可以了,特别注意的是test是SID即服务名称而不是TNS中配置的节点名。各位同学只需要替换IP,端口和这个SID就可以了。

 
 
消息消费者的事先代码:
package easyway.activemq.app;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/***
 * 消息持久化到数据库
 *  @author longgangbai
 */
public class MessageCustomer {
	private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;
	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	  private static String QUEUENAME="ActiveMQ.QUEUE";
	  protected static final int messagesExpected = 10;
	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
			  url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
	  
	  
	/***
	 * 创建Broker服务对象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
		BrokerService  broker=new BrokerService();
	    broker.addConnector(url);
		return broker;
	}

	/**
	 * 启动BrokerService进程
	 * @throws Exception
	 */
	public void init() throws Exception{
		BrokerService brokerService=createBroker();
		brokerService.start();
	}
	/**
	 * 接收的信息
	 * @return
	 * @throws Exception
	 */
	public int receiveMessage() throws Exception{
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
		return receiveMessages(messagesExpected,session);
	}
	

	/**
	 * 接受信息的方法
	 * @param messagesExpected
	 * @param session
	 * @return
	 * @throws Exception
	 */
	protected int receiveMessages(int messagesExpected, Session session) throws Exception {
        int messagesReceived = 0;
        for (int i=0; i<messagesExpected; i++) {
            Destination destination = session.createQueue(QUEUENAME);
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = null;
            try {
            	logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
                message = consumer.receive(2000);
                logger.info("Received : " + message);
                if (message != null) {
                    session.commit();
                    messagesReceived++;
                }
            } catch (Exception e) {
            	logger.debug("Caught exception " + e);
                session.rollback();
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        return messagesReceived;
    }


	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
}

 消息生产者的代码:

package easyway.activemq.app;

import java.io.File;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import easyway.activemq.app.utils.BrokenPersistenceAdapter;
/**
 * 消息持久化到数据库
 * @author longgangbai
 *
 */
public class MessageProductor {
	  private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;
	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	  private static String queueName="ActiveMQ.QUEUE";
	  private BrokerService brokerService;
	  protected static final int messagesExpected = 10;
	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
	            "tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
	/***
	 * 创建Broker服务对象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
			BrokerService  broker=new BrokerService();
			BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter();
			broker.setPersistenceAdapter(jdbc);
			jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator);
			jdbc.setAdapter(new MySqlJDBCAdapter());
			broker.setPersistent(true);
			broker.addConnector("tcp://localhost:61617");
			//broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
		return broker;
	}
	/**
	 * 创建Broken的持久化适配器
	 * @return
	 * @throws Exception
	 */
	public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{
		BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter();
		DataSource datasource=createDataSource();
		jdbc.setDataSource(datasource);
		jdbc.setUseDatabaseLock(false);
		//jdbc.deleteAllMessages();
		return jdbc;
	}
	/**
	 * 创建数据源
	 * @return
	 * @throws Exception
	 */
	public DataSource createDataSource() throws Exception{
		Properties props=new Properties();
		props.put("driverClassName", "com.mysql.jdbc.Driver");
		props.put("url", "jdbc:mysql://localhost:3306/activemq");
		props.put("username", "root");
		props.put("password", "root");
		DataSource datasource=BasicDataSourceFactory.createDataSource(props);
		return datasource;
	}
	/**
	 * 启动BrokerService进程
	 * @throws Exception
	 */
	public void init() throws Exception{
		createBrokerService();
		brokerService.start();
	}
	public BrokerService createBrokerService() throws Exception{
		if(brokerService==null){
			brokerService=createBroker();
		}
		return brokerService;
	}
	
	public void sendMessage() throws JMSException{
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination destination = session.createQueue(queueName);        
	    MessageProducer producer = session.createProducer(destination);
	    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		for(int i=0;i<messagesExpected;i++){
			 logger.debug("Sending message " + (i+1) + " of " + messagesExpected);
	         producer.send(session.createTextMessage("test message " + (i+1)));
		}
		connection.close();
	}
	
	

	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
}

 

持久化适配器类

package easyway.activemq.app.utils;


import java.io.IOException;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 
 * @author longgangbai
 *
 */
public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter {

    private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class);

    private boolean shouldBreak = false;

    @Override
    public void commitTransaction(ConnectionContext context) throws IOException {
        if ( shouldBreak ) {
            LOG.warn("Throwing exception on purpose");
            throw new IOException("Breaking on purpose");
        }
        LOG.debug("in commitTransaction");
        super.commitTransaction(context);
    }

    public void setShouldBreak(boolean shouldBreak) {
        this.shouldBreak = shouldBreak;
    }
}

 

测测试代码如下:

 

 

package easyway.activemq.app.test;

import easyway.activemq.app.MessageProductor;

public class MessageProductorTest {
	
	public static void main(String[] args) throws Exception {
		MessageProductor  productor =new MessageProductor();
		productor.init();
		productor.sendMessage();
		//productor.createBrokerService().stop();
	}

}

 

package easyway.activemq.app.test;

import easyway.activemq.app.MessageCustomer;

public class MessageCustomerTest {
  public static void main(String[] args) throws Exception {
	  MessageCustomer  customer=new MessageCustomer();
	  //customer.init();  //当两台机器在不同的服务器上启动客户端的broker进程
	  customer.receiveMessage();
	  
}
}

 

 

备注:运行过程为:首先执行MessageProductorTest,MessageCustomerTest。

        mysql数据库activemq必须存在。关于消息持久化的表结构如下:

 

 

分享到:
评论
20 楼 tanliansheng 2017-08-18  
南城日落北城恋 写道
您好!请问一下当我的activemq_msgs这张表有数据时,Tomcat启动就会报错:com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=80' at line 1,这是怎样回事

相信你已经解决了,但是没看到有回复的,这个应该是jdbc版本太低造成的
19 楼 boonya 2016-04-27  
消费者连接: protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url
+ "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);

生产者连接:protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="
+ messagesExpected);

如文中代码所示,不知道是activeMQ的版本问题不对还是什么,我用你文章中的代码一直收到的都是null

消费者使用的是默认的broker tcp://localhost:61616?,我这边改成 tcp://localhost:61617?与发布broker一致结果就报错了,能帮忙解答一下吗?
18 楼 boonya 2016-04-27  
老兄消费者和发布者 两个用的connectionFactory地址不一样怎么收到数据的呢,你的代码里面
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url
+ "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
17 楼 生活调味品 2015-11-25  
用Topic的话持久化到数据库的MSG再被所有消费者消费掉以后仍然不清理掉,必须重启服务器才行,请问这如何解决呢?
16 楼 qq_28108539 2015-06-08  
您好,为什么我的activemq.xml文件配置完MySql持久化之后就启动报错呢?
15 楼 南城日落北城恋 2015-01-14  
您好!请问一下当我的activemq_msgs这张表有数据时,Tomcat启动就会报错:com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=80' at line 1,这是怎样回事
14 楼 liubey 2014-06-12  
目前你们呢正式环境(生产环境)使用的是哪种持久化方案?
13 楼 hibluse 2013-01-05  
longgangbai 写道
hibluse 写道
你好,数据库持久化方式,默认有个配置 maxactive=200.但是我后台看到数据库线程只有2个在跑。请问是什么原因??谢谢了。

具体我没有看到的的配置文件的,你说的我理解可能是数据库连接池的概念类似,有一个最大连接数max,最小连接数min,在连接池中开始初始化的时候创建min个连接,在连接不够的时候,继续创建连接,如果连接达到max个,那么就不再创建连接,而是等待直到有空闲的连接就使用空闲连接。由此可以说明你说的后台连个连接线程也是正确的,没有任何问题。

现在数据库的性能很低,是不是这个影响的。我想改变msg字段类型为varchar类型。是否能提高性能。请问,怎么配置。如果是纯jdbc配置。
12 楼 longgangbai 2013-01-05  
hibluse 写道
你好,数据库持久化方式,默认有个配置 maxactive=200.但是我后台看到数据库线程只有2个在跑。请问是什么原因??谢谢了。

具体我没有看到的的配置文件的,你说的我理解可能是数据库连接池的概念类似,有一个最大连接数max,最小连接数min,在连接池中开始初始化的时候创建min个连接,在连接不够的时候,继续创建连接,如果连接达到max个,那么就不再创建连接,而是等待直到有空闲的连接就使用空闲连接。由此可以说明你说的后台连个连接线程也是正确的,没有任何问题。
11 楼 hibluse 2013-01-05  
你好,数据库持久化方式,默认有个配置 maxactive=200.但是我后台看到数据库线程只有2个在跑。请问是什么原因??谢谢了。
10 楼 liaozxbj 2012-05-22  
消息是存到数据库了,但是不能被消费啊,能把需要配置的地方再说说吗?
9 楼 龙天1213 2012-04-24  
longgangbai 写道
FQ_kevin 写道
你好,数据库里的三张表自动产生了,msgs这张表中没有数据,还有其它要配置吗?

我不在程序里面,通过设置activemq.xml的持久配置是有数据的。

当发送数据失败,消息堆积的时候msgs表中才有数据。

如果配置中指明采用持久化,并且当你发送消息时,接收放没有及时的接收消息,activemq就会将消息保存在msgs表中,当接收方成功接收消息之后,activemq就会将消息从msgs表中删除,如果指定了超时时间,到超时时间到了,而接收方仍然没有接收消息的话,activemq也会将消息从msgs表中删除掉!
8 楼 longgangbai 2012-04-08  
FQ_kevin 写道
你好,数据库里的三张表自动产生了,msgs这张表中没有数据,还有其它要配置吗?

我不在程序里面,通过设置activemq.xml的持久配置是有数据的。

当发送数据失败,消息堆积的时候msgs表中才有数据。
7 楼 FQ_kevin 2012-03-28  
你好,数据库里的三张表自动产生了,msgs这张表中没有数据,还有其它要配置吗?

我不在程序里面,通过设置activemq.xml的持久配置是有数据的。
6 楼 longgangbai 2012-02-19  
龙天1213 写道
万分的感谢楼主了,按照楼主方法,终于把消息存到了数据库中!

客气了,学习沟通,交流是开发人员的必须具备的。
5 楼 longgangbai 2012-02-19  
bluseli 写道
你好!昨天搞定了。
我想问一下,我们在做项目的时候,activemq.XML这个文件我们是自己在工程里建吗?
因为我们启动本地的activemq服务跟我们工程没有任何关系!我测试的时候就没有启动什么服务啊。只是在工程里把需要的包引入了。
还有就是,如果上述假设成立,那么,第一种方式,我们是可以像持久化一样以code方式引入persistenceAdapter,activemq.XML这个文件里的所有配置都可以通过code形式完成;第二种方式,就是在工程里读文件,通过BrokerService service = new BrokerFactory.createBroker(new URI(someURI))读someURI找到配置文件

我们真正开发的时候走的是这个途径吗?谢谢!


这个文件可以自己创建也可以不创建active的jar文件包含有,如果采用默认的可能需要东西要采用默认的
4 楼 龙天1213 2012-02-16  
万分的感谢楼主了,按照楼主方法,终于把消息存到了数据库中!
3 楼 bluseli 2011-09-07  
你好!昨天搞定了。
我想问一下,我们在做项目的时候,activemq.XML这个文件我们是自己在工程里建吗?
因为我们启动本地的activemq服务跟我们工程没有任何关系!我测试的时候就没有启动什么服务啊。只是在工程里把需要的包引入了。
还有就是,如果上述假设成立,那么,第一种方式,我们是可以像持久化一样以code方式引入persistenceAdapter,activemq.XML这个文件里的所有配置都可以通过code形式完成;第二种方式,就是在工程里读文件,通过BrokerService service = new BrokerFactory.createBroker(new URI(someURI))读someURI找到配置文件

我们真正开发的时候走的是这个途径吗?谢谢!

2 楼 longgangbai 2011-09-06  
bluseli 写道
你好!看了你的帖子,我按照你的方式引入broker并持久化。为什么broker自动关闭了
但是数据库里的三张表自动产生了。但是msgs这张表中没有数据。


你配置数据库了,activemq检查你的配置数据库如果存在信息,就会创建了,msgs消息已经消费了完毕,如果没有消费,则会放在这个表中.
1 楼 bluseli 2011-09-05  
你好!看了你的帖子,我按照你的方式引入broker并持久化。为什么broker自动关闭了
但是数据库里的三张表自动产生了。但是msgs这张表中没有数据。

相关推荐

    ActiveMQ的持久化(数据库)[归类].pdf

    ActiveMQ的持久化(数据库)[归类].pdf

    自己写的ActiveMQ的Demo例子

    自己写的ActiveMQ简单demo,包括生产者、消费者之间发送消息、持久化到文件和持久化到数据库,期中持久化需要修改activemq.xml文件

    java实现的消息中间件之AcitveMQ详解,学习学习

    KahaDB他是默认的持久化策略,所有消息都会顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息...

    activemq集群安装

    从 ActiveMQ 5.9 开始,...LevelDB 是 Google 开发的一套用于持久化数据的高性能类库。LevelDB 并不是一种服务,用户需要自 行实现 Server。是单进程的服务,能够处理十亿级别规模 Key-Value 型数据,占用内存小。

    Java面试八股文.zip

    2.3 Redis持久化机制 2.4 Redis高级特性和集群 3. MySQL数据库篇 3.1 MySQL简介和基本操作 3.2 数据库设计范式和优化 3.3 事务和并发控制 3.4 索引和优化技巧 4. 框架篇 4.1 Spring框架概述 4.2 Spring...

    Java思维导图xmind文件+导出图片

    图解Redis中的AOF和RDB持久化策略的原理 redis读写分离架构实践 redis哨兵架构及数据丢失问题分析 redis Cluster数据分布算法之Hash slot redis使用常见问题及性能优化思路 redis高可用及高并发实战 缓存击穿...

    springboot+dubbo分布式架构,提供分布式缓存、分布式锁、分布式Session、读写分离

    持久层:mybatis持久化,使用MyBatis-Plus优化,减少sql开发量;aop切换数据库实现读写分离。Transtraction注解事务。 MVC: 基于spring mvc注解,Rest风格Controller。Exception统一管理。 缓存和Session:注解redis...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【消息队列】持久化消息非常慢 162 【消息队列】消息的不均匀消费 162 【消息队列】ActiveMQ中的消息重发时间间隔和重发次数吗? 164 【Dubbo】dubbo介绍 166 Dubbo 是什么 166 Dubbo 架构流程图 167 调用流程 167 ...

    xmljava系统源码-high-end-technology::angry_face_with_horns:Sharesomeofthehigh-endtechnologiescomm

    xml java系统源码 高端技术 :wolf: 分享一些大型互联网架构常用的高端技术 目录 消息队列 ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线...C语言编写、支持网络、可基于内存亦可持久化的日志型、K

    transactions-dubbo:dubbo项目基于atomikos的分布式事务管理

    transactions-dubbo dubbo项目基于atomikos的分布式事务管理 框架介绍 ...​ 项目现在有很多不足,微服务框架只支持dubbo,数据库持久化框架只支持mybatis,只支持管理数据库事务、activemq的事务、rock

    word源码java-springboot-study:Springboot内部培训文档,集成所有第三方组件

    C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 MongoDB:MongoDB是一个基于分布式文件存储的NO-SQL型数据库。 ActiveMQ:ActiveMQ 是Apache出品,能力强劲的开源消息...

    大型分布式网站架构与实践

     2.2 持久化存储 71  2.2.1 MySQL扩展 72  2.2.2 HBase 80  2.2.3 Redis 91  2.3 消息系统 95  2.3.1 ActiveMQ & JMS 96  2.4 垂直化搜索引擎 104  2.4.1 Lucene简介 105  2.4.2 Lucene的使用 108  2.4.3 ...

    serverApi:serverApi 可以做微服务

    1.系统技术 1.jooq + spring + jsonrpc + maven + quartz + c3p0 + ... 3).dao层jooq操作数据库 使用spring的@Repository注解来标记持久层 2.task任务 1).定时任务,系统启动时,连接数据库进行任务加载以及初始化

    meetup:微服务探索

    users-handler-实现CQRS的Command部分,并使用h2数据库进行持久化。 这些服务通过Apache ActiveMQ提供的JMS总线进行通信。 更详细的设计文档可。 先决条件 安装Docker Machine并撰写: : 生成并运行 git克隆 cd...

    Redis:Redis代码

    Redis真的那么好用吗? 一、Redis是什么? Redis是一个开源的底层使用C语言...持久化数据访问较慢、 用key查询 2、消息队列 相当于消息订阅系统,比如ActiveMQ、RocketMQ。如果对数据有较高一致性要求时,还是建议使用

    大数据处理流程.pdf

    和Kafka类似消息中间件开源产品还包括RabbiMQ、ActiveMQ、ZeroMQ 等。 MapReduce是Google公司的核⼼计算模型,它将运⾏于⼤规模集群上的复杂并⾏计算过程⾼度抽象为两个函数:map和reduce。 MapReduce最伟⼤之处在于...

    Spring in Action(第2版)中文版

    9.4.4配置消息序列化器 9.4.5处理端点异常 9.4.6提供wsdl文件 9.4.7部署服务 9.5消费spring-wsweb服务 9.5.1使用web服务模板 9.5.2使用web服务的网关支持 9.6小结 第10章spring消息 10.1jms简介 10.1.1...

    Spring in Action(第二版 中文高清版).part2

    9.4.4 配置消息序列化器 9.4.5 处理端点异常 9.4.6 提供WSDL文件 9.4.7 部署服务 9.5 消费Spring-WS Web服务 9.5.1 使用Web服务模板 9.5.2 使用Web服务的网关支持 9.6 小结 第10章 Spring消息 10.1 JMS...

    Spring in Action(第二版 中文高清版).part1

    9.4.4 配置消息序列化器 9.4.5 处理端点异常 9.4.6 提供WSDL文件 9.4.7 部署服务 9.5 消费Spring-WS Web服务 9.5.1 使用Web服务模板 9.5.2 使用Web服务的网关支持 9.6 小结 第10章 Spring消息 10.1 JMS...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    消息队列 jms Queue Topic kafka 持久 复制 Stream Partition rocketMQ RabbitMQ ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS ...

Global site tag (gtag.js) - Google Analytics