ActiveMQ

使用

// 创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);Connection connection = connectionFactory.createConnection();connection.start();// 创建会话 自动确认消息Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建目的Destination destination = session.createQueue(queueName);

队列模式

生产消息

MessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {    TextMessage textMessage = session.createTextMessage();    textMessage.setText("text"+i);    producer.send(textMessage);}

消费消息

MessageConsumer consumer = session.createConsumer(destination);consumer.setMessageListener(new MessageListener() {    public void onMessage(Message message) {        System.out.println(message);    }});
ActiveMQTextMessage msg = (ActiveMQTextMessage) consumer.receive();System.out.println(msg);

队列模式

destination = session.createQueue(sourceQueue);

在队列模式下,消费者会平均消费生产者生产的消息

主题模式

Destination destination = session.createTopic(topicName);

主题模式也被称为订阅通知模式,订阅之后才能收到消息 生产者生产消息会推送给所有消费者

消息持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

可靠数据传输

JMS消息只有在被确认之后,才认为已经被成功的消费了

集成Spring JMS

@Beanpublic ConnectionFactory connectionFactory(){    return new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");}@Beanpublic Destination destination(){    return new ActiveMQQueue("queue");}@Beanpublic JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){    return new JmsTemplate(connectionFactory);}
jmsTemplate.send(destination,new MessageCreator() {    public Message createMessage(Session session) throws JMSException {        TextMessage msg = session.createTextMessage();        msg.setText(message);        return msg;    }});
@Beanpublic MessageListener messageListener(){    return new ConsumerMessageListener();}@Beanpublic DefaultMessageListenerContainer defaultMessageListenerContainer(Destination destination,                                ConnectionFactory connectionFactory,                                MessageListener messageListener){    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();    container.setConnectionFactory(connectionFactory);    container.setMessageListener(messageListener);    container.setDestination(destination);    return container;}

SpringBoot整合

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-activemq</artifactId></dependency>
spring:  activemq:    broker-url: tcp://127.0.0.1:61616    user: admin    password: adminqueue: myQueue
@Value("${queue}")private String queueName;@Beanpublic Queue queue(){    return new ActiveMQQueue(queueName);}

生产者

@Componentpublic class Producer {    @Autowired    private JmsMessagingTemplate template;    @Autowired    private Queue queue;    @Scheduled(fixedDelay = 5000)    public void send() {        String payload = UUID.randomUUID().toString();        System.out.println("producer send:" + payload);        template.convertAndSend(queue, payload);    }}

消费者

@Componentpublic class Consumer {    @JmsListener(destination = "myQueue")    public void receive(String msg){        System.out.println("consumer receive:"+msg);    }}

集群

集群方式

企业开发需要解决的问题