在Ubuntu上安装ActiveMQ
系统初始化
1 2 3 4
| $ sudo apt update $ sudo apt dist-upgrade $ sudo apt autoremove $ sudo apt clean
|
搭建activemq服务
1 2 3 4 5 6 7 8 9 10 11
| $ mkdir /home/active-mq $ cd /home/active-mq $ wget http://www.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
$ tar -zxvf apache-activemq-5.15.9-bin.tar.gz
$ ./activemq start INFO: Loading '/home/active-mq/apache-activemq-5.15.9//bin/env' INFO: Using java '/usr/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/home/active-mq/apache-activemq-5.15.9//data/activemq.pid' (pid '6356')
|
监控
浏览器打开http://localhost:8161/admin/,输入admin,admin

至此,ActiveMQ搭建完成。
理解JMS( Java Message Service)
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。
JMS模型
点对点(P2P)或队列模型
- 只有一个消费者将获得消息
- 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
- 每一个成功处理的消息都由接收者签收
发布/订阅模型
- 多个消费者可以获得消息
- 在发布者和订阅者之间存在时间依赖性。发布者需要创建一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
传统API
传统API提供的主要接口如下:
ConnectionFactory:客户端用来创建连接的受管对象。简化API也会使用此接口。
Connection:客户端到JMS提供者之间的活动连接。
Session:发送和接收消息的一个单线程上下文。
MessageProducer:由Session创建的对象,用于发送消息到Queue或Topic
MessageConsumer:由Session创建的对象,用于接收Queue或Topic中的消息

简化API
简化API与传统API提供的消息功能是一样的,但是它需要的接口更少、使用更方便。 简化API提供的主要接口如下:
- ConnectionFactory:客户端用来创建连接的受管对象。传统API也会使用此接口。
- JMSContext:客户端到JMS提供者之间的活动连接,以及发送和接收消息的一个单线程上下文。
- JMSProducer:由JMSContext创建的对象,用于发送消息到Queue或Topic
- JMSConsumer:由JMSContext创建的对象,用于接收Queue或Topic中的消息

在简化API中,一个JMSContext对象封装了传统API中Connection和Session两个对象的行为。
开发一个JMS客户端
一个使用传统API的JMS客户端典型的使用步骤如下:
- 使用JNDI查找一个ConnectionFactory对象
- 使用JNDI查找一个或多个Destination对象
- 使用ConnectionFactory创建一个JMS Connection对象
- 使用Connection创建一个或多个JMS Session对象
- 使用Session和Destination对象创建需要的MessageProducer和MessageConsumer对象
- 通知Connection对象开始投递消息
Active MQ是完全实现JMS规范的JMS客户端
Hello World
创建Hello World项目
创建gradle项目,并编辑build.gradle
1 2
| compile group: 'org.apache.activemq', name: 'activemq-all', version: '5.15.9' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
|
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class HelloWorldProducer implements Runnable { @Override public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); TextMessage message = session.createTextMessage(text); System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message);
session.close(); connection.close(); } catch (JMSException e) { System.out.println("Caught: " + e); e.printStackTrace(); } } }
|
创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class HelloWorldConsumer implements Runnable, ExceptionListener { @Override public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); }
consumer.close(); session.close(); connection.close(); } catch (JMSException e) { System.out.println("Caught: " + e); e.printStackTrace(); } }
@Override public synchronized void onException(JMSException exception) { System.out.println("JMS Exception occured. Shutting down client."); } }
|
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class App { public static void main(String[] args) throws InterruptedException { thread(new HelloWorldProducer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); Thread.sleep(1000); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); Thread.sleep(1000); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldProducer(), false); Thread.sleep(1000); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldConsumer(), false); thread(new HelloWorldProducer(), false); }
public static void thread(Runnable runnable, boolean daemon) { Thread brokerThread = new Thread(runnable); brokerThread.setDaemon(daemon); brokerThread.start(); } }
|
运行我们的测试程序,控制台将会打印:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Sent message: 507732978 : Thread-6 Sent message: 2056557229 : Thread-0 Sent message: 39234146 : Thread-8 Sent message: 1100925878 : Thread-13 Sent message: 1566392082 : Thread-17 Sent message: 1329793151 : Thread-1 Sent message: 988436874 : Thread-16 Received: Hello world! From: Thread-6 : 1442537083 Received: Hello world! From: Thread-1 : 1531760310 Received: Hello world! From: Thread-0 : 1817576164 Received: Hello world! From: Thread-8 : 262381200 Received: Hello world! From: Thread-17 : 1647178742 Received: Hello world! From: Thread-13 : 1610404140
|