`
longgangbai
  • 浏览: 7249814 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ Spring 整合持久化到数据库的实现

阅读更多

本文主要目的实现activemq和spring将消息写入数据库的方法:

activemq.xml的内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

  <!-- Allows us to use system properties as variables in this configuration file -->
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>


  <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#derby-ds"/>
    </persistenceAdapter>

    <transportConnectors>
       <transportConnector name="default" uri="tcp://localhost:61619"/>
    </transportConnectors>
  </broker>
  <bean id="derbyds" class="org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter"/>
  <!-- Embedded Derby DataSource Sample Setup -->
  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
    <property name="databaseName" value="derbydb"/>
    <property name="createDatabase" value="create"/>
  </bean>

</beans>

 

activemq-jdbc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

   <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
      <property name="config"  value="classpath:activemq.xml"/>
      <property name="start"  value="true"/>
   </bean>
   
   <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
      <property name="brokerURL" value="tcp://localhost:61619"/>
   </bean>

</beans>

 

消息生产者:

package easyway.activemq.app.demo3;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * 消息的创建者
 * @author longgangbai
 *
 */
public class StreamMsgProducer {
	
	public static void main(String[] args) {
		ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
		ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
		Connection conn = null;
		try {
			conn = activeMqfactory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageProducer producer = session.createProducer(queue);
             File file=new File("C:\\send.txt");
			InputStream in = new FileInputStream(file);
			byte[] buffer = new byte[2048];
			int c = -1;
			while ((c = in.read(buffer)) > 0) {
				StreamMessage smsg = session.createStreamMessage();
				smsg.writeBytes(buffer, 0, c);
				producer.send(smsg);
				System.out.println("send: " + c);
			}
			in.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	


}

 

消息消费者:

package easyway.activemq.app.demo3;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * 消息的消费者
 * @author longgangbai
 *
 */
public class StreamMsgConsumer {
	public void receive() {
		
		ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
		
		
		ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
		
		Connection conn = null;
		try {
			conn = activeMqfactory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageConsumer consumer = session.createConsumer(queue);

			OutputStream out = new FileOutputStream("c:\\receive.txt");
			byte[] buffer = new byte[2048];
			while (true) {
				Message msg = consumer.receive(5000);
				if (msg == null) {
					break;
				}

				if (msg instanceof StreamMessage) {
					StreamMessage smsg = (StreamMessage) msg;
					int c = smsg.readBytes(buffer);
					out.write(buffer, 0, c);
					System.out.println("Receive: " + c);
				}
			}
			out.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		new StreamMsgConsumer().receive();
	}
}

 

分享到:
评论
1 楼 phane 2012-03-06  

相关推荐

Global site tag (gtag.js) - Google Analytics