activeMq的安装和基础

Author Avatar
hlmk 5月 22, 2018
  • 在其它设备中阅读本文章

申明:请尽量与我本博文所有的软件版本保持一致,避免不必要的错误。

所用软件版本列表:
activeMq:apache-activemq-5.15.3-bin.tar.gz
JDK8

所用软件官方网站列表:
activeMq官网:http://activemq.apache.org/
activeMq5.15.3:http://activemq.apache.org/activemq-5153-release.html

activeMq的基本介绍

ActiveMQ是什么

ActiveMQ是Apache推出的,一款开源的,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现的消息中间件(Message Oriented Middleware, MOM)

ActiveMQ能干什么

最主要的功能就是:实现JMS Provider 用来帮助实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务的系统

ActiveMQ特点

完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
完全多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
可插拔的体系结构,可以灵活定制,如:消息存储方式、安全管理等
很容易和Application Server集成使用
多种语言和协议编写客户端。语言:Java,C,C++,C#,Ruby,Perl,Python,PHP
从设计上保证了高性能的集群,客户端-服务器,点对点
可以很容易和spring结合使用
支持通过JDBC和journal提供高速的消息持久化
支持与Axis的整合

消息中间件

MOM基本功能:将消息以消息的形式,从一个应用程序传送到另一个或多个应用程序。
MOM主要特点:

  1. 消息一部接收,类似手机短信的行为,消息发送者不需要等待消息接受者的响应,减少软件多系统集成的耦合度;
  2. 消息可靠接收,确保消息在中间件可靠保存,只有接收方收到后才会删除消息,多个消息也可以组成原子事务

    消息中间件的主要应用场景:
    在多个系统间进行整合和通讯的时候,通常会要求:

    1. 可靠传输,数据不能丢失,有的时候,也会要求不能重复传输;
    2. 异步传输,否则各个系统同步发送接收数据,互相等待,造成系统瓶颈

目前比较知名的消息中间件:
IBM MQSeries
BEA WebLogic JMS Server
Oracle AQ
Tibco
SwiftMQ
ActiveMQ:是免费的Java实现的消息中间件

ActiveMq的安装和基本使用

下载并安装ActiveMQ服务器端

  1. 下载ActiveMQ
    在Linux系统中输入一下下载命令:
    wget http://mirrors.hust.edu.cn/apache//activemq/5.15.3/apache-activemq-5.15.3-bin.tar.gz

  2. 直接解压,然后拷贝到你要安装的位置就好了
    tar -zxvf apache-activemq-5.15.3-bin.tar.gz

  3. 启动运行

    1. 普通启动:到ActiveMQ/bin下面,./activemq start
    2. 启动并指定日志文件 ./activemq start > /tmp/activemqlog
  4. 检查是否已经启动
    ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服务,执行以下命令便以检验是否已成功启动ActiveMQ服务:

    1. 比如查看61616端口是否打开:netstat -an | grep 61616
    2. 也可以直接查看控制台输出或者日志文件
    3. 还可以直接访问ActiveMQ的管理控制台页面: http://192.168.159.170:8161/admin/ 默认的用户名和密码都是admin/admin

    关闭ActiveMQ,可以用 ./activemq stop
    暴力点可以直接用ps -ef | grep activemq 来得到进程号,然后kill掉

    不要忘记开放linux端口号
    firewall-cmd --zone=public --add-port=8161/tcp --permanent
    firewall-cmd --zone=public --add-port=61616/tcp --permanent
    重启防火墙:firewall-cmd --reload
    查看开放的端口号:firewall-cmd --list-ports

ActiveMQ 后台管理页面:
enter description here

基本的Queue消息发送

配置Maven所需的依赖,示例如下:

1.创建maven项目:
2.引入项目activemq依赖
enter description here
依赖代码如下:

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>

基本的生产者和消费者代码示例

生产者 QueueSender代码

public class QueueSender {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.159.170:61616");

        //从连接工厂获取连接
        Connection connection = connectionFactory.createConnection();
        //启动连接
        connection.start();

        //从连接中获取该次session会话,并设置事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        //创建会话队列
        Destination destination = session.createQueue("my-queue");

        //创建生产者
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 3; i++) {
            //创建消息对象
            TextMessage message = session.createTextMessage("message---" + i);
            //生产者发送消息
            producer.send(message);
        }
        //提交事务
        session.commit();
        //关闭session会话
        session.close();
        //关闭连接
        connection.close();
    }
}

运行生产者之后,后台会出现生产者的信息,如下图:
enter description here

消费者代码

public class QueueReceiver {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.159.170:61616");

        // 从连接工厂获取连接
        Connection connection = connectionFactory.createConnection();
        // 启动连接
        connection.start();

        // 从连接中获取该次session会话,并设置事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // 创建会话队列
        Destination destination = session.createQueue("my-queue");

        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        int i=0;
        while(i < 3) {
            i++;
            //取出消息
            TextMessage message = (TextMessage) consumer.receive();
            session.commit();
            System.out.println("收到消息:" + message.getText());
        }

        // 关闭session会话
        session.close();
        // 关闭连接
        connection.close();
    }
}

控制台结果如下
enter description here
enter description here

JMS基本概念和模型

JMS基本概念

JSM是什么
JMS Java Message Service,Java消息服务,是Java EE中的一个技术

JMS规范
JMS定义了Java中访问消息中间件的接口,并没有给予实现,实现JMS接口的消息中间件称为JSM Provider,例如ActiveMQ

JMS Provider
实现了JMS接口和规范的消息中间件
JMS Message
JMS的消息,JMS消息由以下三部分组成:

  1. 消息头:每个消息头字段都有相应的getter和setter方法
  2. 消息属性:如果需要除消息头字段以为的值,那么可以使用消息属性
  3. 消息体:封装具体的消息数据

JMS Producer
消息消费者,创建和发送JMS消息的客户端应用
JMS Consumer
消息消费者,接收和处理JMS消息的客户端应用

消息的消费可以采用以下两种方法之一:

  1. 同步消费:通过调用消费者的receive方法从目的地中显示提取消息,receive方法可以一直阻塞到消息到达。
  2. 异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

JMS Domains
消息传递域,JMS规范中定义了两种欧冠呢消息传递域:点对点(point-to-point,简写成PTP)消息传递域和发布/订阅消息传递域(publish/subscribe,简写成pub/sub)

  1. 点对点消息传递域的特点如下:
    (1) 每个消息只能有一个消费者
    (2)消息的生产者和消费者之间没有时间上的关联性。无论消费者在生产者发送消息的时候是否处于运行状态,他都可以提取消息。
    enter description here

    1. 发布/订阅消息传递域的特点如下:
      (1)每个消息可以有多个消费者
      (2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
  2. 在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)
    enter description here

Connection factory
链接工厂,用来创建连接对象,以连接到JSM的provider
JMS Connection
封装了客户与JMS 提供者之间的一个虚拟的连接
JMS Session
是生产者和消费者的一个单线程上下文
会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
Destination
消息发送到的目的地
Acknowledge
签收
Transaction
事务
JMS client
用来收发消息的Java应用
Non-JMS client
使用JMS Provider本地API写的应用,用来替换JMS API实现收发消息的功能,通常会提供其他的一些特性,比如:CORBA、RMI等。
Administered objects
预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,比如:ConnectionFactory和Destination

JMS的消息结构

JMS 消息由以下几个部分组成
消息头、属性和消息体
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如下:

  1. JMSDestination:由send方法设置
  2. JMSDeliveryMode:由send方法设置
  3. JMSExpiration:由send方法设置
  4. JMSPriority:由send方法设置
  5. JMSMessageID:由send方法设置
  6. JMSTimestamp:由客户端设置
  7. JMSCorrelationID:由客户端设置
  8. JMSReplyTo:由客户端设置
  9. JMSType:由客户端设置
  10. JMSRedelivered:由JMS Provider设置

标准的JMS消息头包含以下属性:

  1. JMSDestination:消息发送的目的地:主要是指Queue和Topic,自动分配
  2. JMSDeliveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器回复之后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将永远丢失。自动分配
  3. JMSExpiration:消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。自动分配
  4. JMSPriority:消息优先级,从0-9十个级别,0-4是普通消息级别,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要优先于普通消息到达。默认是4级。自动分配
  5. JMSMessageID:唯一识别每个消息的表示,由JMS Provider产生。自动分配
  6. JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。自动分配
  7. JMSCorrelationID:用来连接到另一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。由开发者设置
  8. JMSReplyTo:提供本消息回复消息的目的地地址。由开发者设置
  9. JMSType:消息类型的标识符。由开发者设置
  10. JMSRedelivered:如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些的时候接收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true反之,JMSRedelivered=false。自动设置

消息体
JMS API定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据,并可以兼容现有的消息格式。包括:TextMessage、MapMessage、ByteMessage、StreamMessage和ObjectMessage
消息属性,包含以下三种类型的属性

  1. 应用程序设置和添加的属性,比如:
    Message.setStringProperty(“username”,username);
  2. JMS定义的属性
    使用“JMSX”作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(),方法返回所有连接支持的JMSX属性的名字。
  3. JMS供应商特定的属性
    JMS定义的属性如下:
  4. JMSXUserID:发送消息的用户标识,发送时提供商设置
  5. JMSXAppID:发送消息的应用标识,发送时提供商设置
  6. JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,…,发送时提供商设置
  7. JMSXGroupID:消息所在消息组的标识,由客户端设置
  8. JMSXGroupSeq:组成消息的序列号第一个消息是1,第二个是2,…,有客户端设置
  9. JMSXProducerTXID:产生消息的事务的事务标识,发送时提供商设置
  10. JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置
  11. JMSXRcvTimestamp:JMS转发消息到消费者的时间,接收时提供商设置
  12. JMSXState:假定存在一个消息库,它存储了每个消息的单独拷贝,且这些消息从原始消息被发送时开始。每个拷贝的状态有:1(等待),2(准备),3(到期)或4(保留)。由于状态与生产者和消费者无关,所以它不是由我们来提供,它是和在仓库中查找消息相关,因此JMS没有提供这种API。由供应商设置

JMS的可靠性机制

消息接收确认

JMS消息只有在被确认之后,才认为已经被成功消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有一下三个可选值:

  1. Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
  2. Session.CLIENT_ACKNOWLEDGE:客户通过调用消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息将自动确认所有已被会话消费的消息。例如:如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
  3. Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true

消息持久性

PERSISTENT:指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失
NON_PERSISTENT:不要求JMS provider持久保存消息

消息优先级

可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4.需要注意的是,JMS provider并不一定保证安好优先级的顺序提交消息

消息过期

可以设置消息在一定时间后过期,默认是永不过期

消息的临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息

持久订阅

首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是topic。第二个参数是订阅的名称。
JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户,使用相同的连接工厂和连接的客户ID,相同的主题和相同的订阅名,再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所发布的消息。
持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

本地事务

在一个JMS客户端,可以使用本地事务来组合消息的发送射接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的需所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被回复并重新提交,除非它们已经过期。
事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事物就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。
需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事物中等待接收该消息的回复,那么程序将被挂起,因为直到事务提交,发送操作才会真正执行。
需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

JMS的PTP模型

JMS PTP模型定义了客户端如何向队列发送消息,从队列接收消息,以及浏览队列中的消息。
PTP模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和右键系统中的邮箱一样,队列可以包含各种消息,JMS Provider提供工具管理队列的创建、删除。
PTP的一些特点

  1. 如果在session关闭时,有一些消息已经被接收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收
  2. 如果用户在receive方法 中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到
  3. 队列可以长久地保存消息直到消费者接收消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

JMS的Pub/Sub模型

JMS Pub/Sub模型定义了如何向一内容节点发布和订阅消息,这些节点被称作topic
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。主题使得消息订阅者和消息发布者保持相互的独立,不需要接触即可保持消息的传递。
Pub/Sub的一些特点

  1. 消息订阅分为非持久订阅和持久订阅
    非持久订阅只有当客户端处于激活状态,也就是JMS provider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线的状态,这个时间段发到主题的消息将会丢失,永远不会收到。
    持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMS provider会为这个ID保存所有发送到主题的消息,当客户端处于离线时,JMS Provider会为这个ID保持所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
  2. 如果用户在receive方法中设定了消息选择条件,那么不符合条件的消息不会被接收
  3. 非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。
  4. 当所有的消息必须被接收,则用持久订阅。当丢失消息能够被容忍,则用非持久订阅