博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
activemq安装与简单消息发送接收实例
阅读量:4183 次
发布时间:2019-05-26

本文共 9073 字,大约阅读时间需要 30 分钟。

安装环境: Activemq5.11.1, jdk1.7(activemq5.11.1版本需要jdk升级到1.7),虚拟机: 192.168.147.131
[root@localhost software]# pwd/export/software[root@localhost software]# tar -zxvf apache-activemq-5.11.1-bin.tar.gz[root@localhost software]# mv apache-activemq-5.11.1 /usr/local
配置Nginx代理Activemq后台管理应用默认绑定的8161端口  
upstream tomcat_tools.activemq.local {        server 127.0.0.1:8161  weight=10 max_fails=2 fail_timeout=300s;}server {        listen                   80;        server_name              tools.activemq.local.com;        root                     /usr/local/apache-activemq-5.11.1/webapps/;        access_log               /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_access.log main;        error_log                /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_error.log warn;        error_page               403 404 /40x.html;        location / {            index index.html index.htm;            proxy_next_upstream     http_500 http_502 http_503 http_504 error timeout invalid_header;            proxy_set_header        Host  $host;            proxy_set_header       X-Real-IP        $remote_addr;            proxy_set_header        X-Forwarded-For $proxy_add_x_forwarded_for;            proxy_pass              http://tomcat_tools.activemq.local;        }        #静态文件,nginx自己处理            location ~ ^/(images|javascript|js|css|flash|media|static)/ {                    #过期30天,静态文件不怎么更新,过期可以设大一点,                    #如果频繁更新,则可以设置得小一点。                    expires 30d;            }}
重启nginx
启动activemq
[root@localhost linux-x86-64]# pwd/usr/local/apache-activemq-5.11.1/bin/linux-x86-64[root@localhost linux-x86-64]# ./activemq start
 
配置host[192.168.147.131 tools.activemq.local.com]

登录activemq的后台,默认账号 admin/admin
http://tools.activemq.local.com/admin 实例展示MQ消息的发送和接收[消息类型分为queue 和 Topic]
pom引入
org.apache.activemq
activemq-all
5.11.1
Queue类型消息 1、定义消息destination和brokerUrl[61616为activemq用于消息通讯的端口]
public class Constant {    public static final String brokerURL = "tcp://192.168.147.131:61616";    public static final String queueDestination = "testQueue";}
 
2、编写消息的发送程序
package com.mq.base.queue;import javax.jms.*;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */public class MqSender {    public static void main(String[] args) throws JMSException {        // 默认的账号和密码为null        String username = ActiveMQConnection.DEFAULT_USER;        String password = ActiveMQConnection.DEFAULT_PASSWORD;        // 初始化连接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL);        // 创建连接        Connection connection = factory.createConnection();        connection.start();        // 创建会话        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);        // 创建消息主题Queue        Destination destination = session.createQueue(Constant.queueDestination);        // MessageProducer负责发送消息        MessageProducer producer = session.createProducer(destination);        // 消息不持久化        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        ObjectMessage message = session.createObjectMessage("hello world...");        producer.send(message);        // 只有commit之后,消息才会进入队列        session.commit();        System.out.println("send...");        // 测试状态,这里把关闭会话和连接注释掉了。        // session.close();        // connection.close();    }}
 

 执行消息发送,在管理后台查看


3、编写消息的消费程序

package com.mq.base.queue;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.ObjectMessage;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */public class MqReceiver {    public static void main(String[] args) throws JMSException {        // 默认的账号和密码为null        String username = ActiveMQConnection.DEFAULT_USER;        String password = ActiveMQConnection.DEFAULT_PASSWORD;        // 初始化连接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL);        // 创建连接        Connection connection = factory.createConnection();        connection.start();        // 创建会话        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createQueue(Constant.queueDestination);        // MessageConsumer负责接受消息        MessageConsumer consumer = session.createConsumer(destination);        ObjectMessage message = (ObjectMessage)consumer.receive();        if (null != message) {            String messageString = (String)message.getObject();            System.out.println("Receive : " + messageString);        }        // 测试状态,这里把关闭会话和连接注释掉了。        // session.close();        // connection.close();    }}

执行这段代码会输出接收到的消息内容:

管理后台在查看queue中心结果如下:

Topic类型消息

1、定义消息destination和brokerUrl[61616为activemq用于消息通讯的端口]

public class Constant {    public static final String brokerURL = "tcp://192.168.147.131:61616";    public static final String topicDestination = "testTopic";}

2、编写消息生产者

package com.mq.base.topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */public class MqSender {    public static void main(String[] args) throws JMSException {        // 默认的账号和密码为null        String username = ActiveMQConnection.DEFAULT_USER;        String password = ActiveMQConnection.DEFAULT_PASSWORD;        // 初始化连接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL);        // 创建连接        Connection connection = factory.createConnection();        connection.start();        // 创建会话        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);        // 创建消息主题Topic,和Queue的区别就在此        Destination destination = session.createTopic(Constant.topicDestination);        // MessageProducer负责发送消息        MessageProducer producer = session.createProducer(destination);        // 消息不持久化        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        TextMessage message = session.createTextMessage(); // createObjectMessage("hello world...");        message.setStringProperty("msgId","topicMessage");        producer.send(message);        // 只有commit之后,消息才会进入队列        session.commit();        System.out.println("send...");        // 测试状态,这里把关闭会话和连接注释掉了。        // session.close();        // connection.close();    }}

3、编写消息消费者

package com.mq.base.topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */public class MqReceiver {    public static void main(String[] args) throws JMSException {        // 默认的账号和密码为null        String username = ActiveMQConnection.DEFAULT_USER;        String password = ActiveMQConnection.DEFAULT_PASSWORD;        // 初始化连接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL);        // 创建连接        Connection connection = factory.createConnection();        connection.start();        // 创建会话        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createTopic(Constant.topicDestination);        // MessageConsumer负责接受消息        MessageConsumer consumer = session.createConsumer(destination);        TextMessage message = (TextMessage)consumer.receive();        if (null != message) {            String messageString = message.getStringProperty("msgId");            System.out.println("Receive : " + messageString);            session.commit();        }        // 测试状态,这里把关闭会话和连接注释掉了。        // session.close();        // connection.close();    }}

先启动消费者:

启动生产者,生产消息,此时会接收到消息如图:

观察topic后台管理

 

Queue模型消息和Topic模型消息区别

queue[点对点模型]  1、只有一个消费者  每条消息只有一个消费者,如果这条消息被消费,那么其它消费者不能接受到此消息。  2、时间无关性  消息的消费和时间无关,只要消息被发送了,在消息过期之前,如果没有其他消费者消费了这个消息,那么客户端可以在任何时候来消费这条消息。  3、消费者必须确认  消费者收到消息之后,必须向Message Provider确认,否则会被认为消息没有被消费,仍然可以被其他消费者消费。可以设置自动确认。这个特点其实也是保证一条消息只能由一个消费者来消费。  4、非持久化的消息只发一次  非持久化的消息,可能会丢失,因为消息会过期,另外Message Provider可能宕机。  5、持久化的消息严格发一次  消息可以被持久化,比如持久化在文件系统或者数据库中,这样可以避免Message Provider的异常或者其它异常导致消息丢失。 Topic[发布者/订阅者模型]  1、每条消息可以有多个订阅者  2、订阅者只能消费它们订阅topic之后的消息  3、非持久化订阅,订阅者必须保持为活动状态才能使用这些消息,如果一个订阅者A断开了10分钟,那么A就会收不到这10分钟内的消息。  4、持久化订阅,Message Provider会保存这些消息,即使订阅者因为网络原因断开了,再重新连接以后,能让消费这些消息。  5、是否使用持久化订阅,需要根据业务场景判断。

 

转载请注明出处:[]

 
热爱生活,热爱Coding,敢于挑战,用于探索 ...
你可能感兴趣的文章
秒杀系统设计思路和实现方法
查看>>
Redis常见面试题
查看>>
JDK重要包和Java学习方法论
查看>>
网络通讯中的三次握手与四次挥手原理详解
查看>>
GitHub 开源神器:图片秒变文件
查看>>
openstack ice resize 详解(三)
查看>>
事务与锁(转)
查看>>
Namenode HA原理详解(脑裂)
查看>>
Differences between VMware FT and HA(转)
查看>>
Cloud Prizefight: OpenStack vs. VMware(转)
查看>>
亚马逊Auto Scaling
查看>>
openstack-instance-high-availability-Evacuate
查看>>
evacuate-instance-automatically
查看>>
pycharm常用设置(keymap设置及eclipse常用快捷键总结)
查看>>
关于在openstack的环境变量.bashrc自定自己简化命令
查看>>
Openstack Heat Project介绍(转)
查看>>
How to Perform an Upgrade from Icehouse to Juno(ice升级到juno)
查看>>
高扩展性网站的50条原则(转)-思维导图
查看>>
解决openstack novnc一段时间后自动挂断登录不上问题,novncproxy dead but pid file exists
查看>>
构建OpenStack的云基础架构:ManageIQ(转)
查看>>