`
longgangbai
  • 浏览: 7252860 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ简单实例发送消息

阅读更多

        在半年多的时间里,天天闲着,没有写blog也没有学习,现在要学习一些工作要使用的东西咯,公司使用的TongGTP和TongLinkQ最近一直不稳定,老大准备开发一个文件传输中间件,需要借鉴一些开源的mq框架学习和架构。

因为ActiveMQ是一个Apache的一个组件,比较活跃,并且有以下优点:

 

 

目前优点:

1.支持jms1.1和jms1.2等jms API。

2.容易跟当前的Spring框架整合。

3.容易JMX等整合,便于管理。

4.提供了高可靠性,可以采用主从服务的方式,Master broker只有在消息成功被复制到slave broker之后才会响应客户,保证消息的完整性

5.高容错性提供了failover机制。

6.支持多语言种类,java,c#等。

7.支持消息的持久化为文件或者数据库信息。

8安全可靠,可以和jaas整合。

9.在消息发送方面效率高于jbossmq.

 

 

如下为一个简单的发送消息的方法:

需要的jar为:

<classpathentry kind="lib" path="src/activemq-all-5.5.0.jar"/>
 <classpathentry kind="lib" path="src/slf4j-api-1.5.2-sources.jar"/>
 <classpathentry kind="lib" path="src/slf4j-api-1.5.2.jar"/>
 <classpathentry kind="lib" path="src/slf4j-simple-1.5.2.jar"/>
 <classpathentry kind="lib" path="src/log4j-1.2.8.jar"/>
 <classpathentry kind="lib" path="src/commons-dbcp-1.4.jar"/>
 <classpathentry kind="lib" path="src/commons-pool-1.5.4.jar"/>
 <classpathentry kind="lib" path="src/commons-collections-3.2.1.jar"/>

 

1.首先使用activemq.bat启动ActiveMQ的broker进程,监听默认的61616端口可以使用如下命令查看端口:

netstat -a

检查显示信息有无61616端口信息。

 

2.开发消息生产者代码如下:

package easyway.app.activemq.demo2;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息的创建者
 * @author longgangbai
 *
 */
public class StreamMsgProducer {
	public void send(File file) {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
		Connection conn = null;
		try {
			conn = factory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageProducer producer = session.createProducer(queue);

			InputStream in = new FileInputStream(file);
			byte[] buffer = new byte[2048];
			int c = -1;
			while ((c = in.read(buffer)) > 0) {
				StreamMessage smsg = session.createStreamMessage();
				smsg.writeBytes(buffer, 0, c);
				producer.send(smsg);
				System.out.println("send: " + c);
			}
			in.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		File file = new File("c:\\send.txt");
		new StreamMsgProducer().send(file);
	}
}

 

3.开发消息消费者

package easyway.app.activemq.demo2;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息的消费者
 * @author longgangbai
 *
 */
public class StreamMsgConsumer {
	public void receive() {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
		Connection conn = null;
		try {
			conn = factory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageConsumer consumer = session.createConsumer(queue);

			OutputStream out = new FileOutputStream("c:\\receive.txt");
			byte[] buffer = new byte[2048];
			while (true) {
				Message msg = consumer.receive(5000);
				if (msg == null) {
					break;
				}

				if (msg instanceof StreamMessage) {
					StreamMessage smsg = (StreamMessage) msg;
					int c = smsg.readBytes(buffer);
					out.write(buffer, 0, c);
					System.out.println("Receive: " + c);
				}
			}
			out.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		new StreamMsgConsumer().receive();
	}
}

 

4.分别启动消息消费者和生产者即可。

 

 

分享到:
评论
1 楼 yuandaf 2011-11-07  
有个成熟的文件传输中间件产品,可以参考 hulft(中文版叫 海度),不知有没有听说过?

相关推荐

Global site tag (gtag.js) - Google Analytics