ActiveMQ的消息模式——主题/订阅模式(Topic)
2020-02-16 07:26:03 来源:魔思科技 作者:思乐 字数:100 阅读数:122 喜欢:0

代码在运行的时候需要先运行一下消费者,相当于你订阅了这个topic,不然接收不到消息。

生产者:
package activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.jms.*;

@SpringBootApplication
public class Producer{
    //连接账号
    private static String userName = "admin";
    //连接密码
    private static String password = "password";
    //连接地址
    private static String brokerURL = "tcp://localhost:61616";


    public static void main(String []args) throws JMSException {
        //初始化连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
        //获得连接
        Connection conn = connectionFactory.createConnection();
        //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Destination dest = session.createTopic("first_topic");
        //通过session可以创建消息的生产者
        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
        //启动连接 ,连接的启动位置和queue略有不同,需要把配置配完,在启动连接
        conn.start();
        for (int i=0;i<3;i++) {
            //初始化一个mq消息
            TextMessage message = session.createTextMessage("发送消息id:" + i);
            System.out.println("发送消息id:" + i);
            //发送消息
            producer.send(message);
        }
        session.commit();
        session.close();
        //关闭mq连接
        conn.close();
    }
}
消费者:
package activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.jms.*;

@SpringBootApplication
public class Consumer{
    //连接账号
    private static String userName = "admin";
    //连接密码
    private static String password = "password";
    //连接地址
    private static String brokerURL = "tcp://localhost:61616";
    public static void main(String []args) throws JMSException {
        //初始化ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
        //创建mq连接
        Connection conn = connectionFactory.createConnection();
        //参数名称随便起
        conn.setClientID("client_id_1");
        //创建会话
        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //通过会话创建目标
        Topic topic = session.createTopic("first_topic");
        //第二个参数随便起
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "name_1");//持久化消息接收
        //启动连接
        conn.start();
        //创建mq消息的消费者
        Message msg = subscriber.receive();
        while(msg!=null){
            onMessage(msg);
            msg = subscriber.receive(1000L);
        }
        session.commit();
        session.close();
        subscriber.close();
        conn.close();
    }
    public static void onMessage(Message message) throws JMSException {
        TextMessage txtMessage = (TextMessage)message;
        System.out.println("接受消息id:" + txtMessage.getText());
    }
}

可以在后台管理页面查看:

因为我们创建的是topic,所以点击topics,这个对应于生产者的一些信息

点击Subscribers查看消费者的信息:

16602723175
1297700744
yanqiweixixi
武汉市洪山区光谷