我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有多个JMS Session和消息消费者实例的从同一个队列中获取消息的时候,就不能保证消息顺序处理。因为消息被多个不同线程并发处理着。
在ActiveMQ4.x中可以采用Exclusive Consumer或者Exclusive Queues,避免这种情况,Broker会从消息队列中,一次发送消息给一个消息消费者来保证顺序。A. 当在接收信息的时候有一个或者多个备份接收消息者和一个独占消息者的同时接收时候,无论两者创建先后,在接收的时候,均为独占消息者接收。B. 当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。C. 当有多个备份消息者和多个独占消费者的时候,当所有的独占消费者均close的时候,只有一个备份消费者接到到消息。备注:备份消费者为不带任何参数的消费者。12.1.1选择一个独占的message consumer对于应用来说,那些重要的order ,或者,你需要确保这里仅仅只有一个message consumer对于queen,activemq提供了一个客户端选项来确保只有一个active message consumer来处理messageactivemq meaasge broker也会在queen上选择一个consumer来处理消息,这样的好处就是允许broker来选择,即使consumer失败或者停止了,然后另外一个message consumer能够被选择成为active的如果你混合了标准consumer和exclusive consumer在同一个queen上 ,the activemq将会仅仅选择exclusive的其中一个consumer,,如果所有的exclusive consumer都变为inactive那么就会选择标准的consumer,然后queen的消费将会变为正常的传输模式, queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);12.1.2利用exclusive consumer来提供分布式锁的功能通常你用message从外部资源来广播数据,如果你想构建一个冗余的, 即使你有一个实例阅读和广播changedate失败了【改变数据库记录,在文件里面的内容用逗号分隔】,另一个实例都将要接管,通常你依靠锁住资源【行锁或者文件锁】来确保仅仅只有一个程序能够acess data并且广播over topic ,但是当你不想利用数据库,或者想要运行一个程序跨越一个机器(不能用分布式锁),然后你就只能用独占consumer来创建一个分布式锁为了能够使用独占consumer来创建分布式锁,我们需要我没得producer订阅独占的queen, 如果message producer接收到queen,他就便激活了, 并且能够 订阅实时的feed和把实时数据变为jms message this.connection = this.factory.createConnection(); this.connection.start(); this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true"); Message message = this.session.createMessage(); MessageProducer producer = this.session.createProducer(destination); producer.send(message); MessageConsumer consumer = this.session.createConsumer(destination); consumer.setMessageListener(this);在这个代码片中,我们总是send a message到queen这一步总是被外部的管理程序执行的,注意到Session.CLIENT_ACKNOWLEDGE模式来消费这个消息,尽管我们想要被通知我们是独占的consumer, 因此我们有锁,我们不想要remove,我们不想要remove这一条消息吗, 如果我们失败了, 我们的另一个独占producer将会active正在这个列子中我们实现了MessageListener,如果我们没有active, 我们将要call一个功能性方法start producing ,如果我们是实时应用, 这个方法将要订阅一个实时的并且转换实时的data 进入jms message public void onMessage(Message message) { if (message != null && this.active==false) { this.active=true; startProducing(); } } 12.2 message groups全部的message 都将要转向单一的message consumer,message也能够分组来给予单一的consumer, 一个message producer也能指定一个group,通过指定message header JMSXGroupId,ActiveMQ将要确保全部相同的JmsxGroupID的message发送给相同的consumer如果Activemq broker制定了consumer接受消息通过JmsxGroupID,那么他就应该close掉,然后activemq broker将要选择一个不同的message consumer来dispatch给不同的message为了创建一个group,你需要设置JmsGroupID string property在消息上 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("<foo>test</foo>"); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); producer.send(message);这个列子显示了message producer已经被创建了, 并且设置好textmessage 属于message group TEST_GROUP_Amessage group利用正常的message consumer,因此没有额外的工作需要group来消费message, 全部的工作都被message producer来定义一个group的消息属于什么, activemqbroker选择一个message consumer来处理全部的分组消息activemqbroker对于group里面的每一条消息都会添加一条sequeence no,[通过JMSXGroupSeq,从1开始] 但是从consumer视角来说,你不能假定你从一个新的group里收到的第一条JMSXGroupSeq设置1, 如果一个存在的group close掉或者死掉之后, 任何消息route到这个group里的都会分配给一个新的consumer为了帮助识别一个消息的consumer 从一个新的group里收到消息,或者一个新的group从来没有被看见过, 一个boolean 参数叫做JMSXGroupFirstForConsumer被设置了对于第一个message, 你也能够核对是否他是为第一条message设置的【对于新组】, 你也能够核对消息是否被 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageConsumer consumer = session.createConsumer(queue); Message message = consumer.receive(); String groupId = message.getStringProperty("JMSXGroupId"); if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) { // do processing for new group }The Activemq message代理允许 分配各种各样的消息groups跨越多个consumer,但是如果这里早已经有message等着dispatch, the message group典型的分配给第一个consumer,为了确保一个基数de的分布式负载均衡,他可能考虑message broker等着开启更多的messgae consumer , 为了这样做, 你不得不设置destination policy在active broker 配置里面,设置好consumersBeforeDispatchStarts参数用 <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000"> </policyEntry> </policyEntries> </policyMap></destinationPolicy>这个配置告诉ActiveMq broker, 都应该等着两个consumer在dispatch之前, 另外我们也可以看到timeBeforeDispatchStarts参数5000ms来通知activemq broker如果两个message consumer在5s内没有砸queen上得到消息,利用messgae group添加最小化的active broker 就每个消息group存储routing 信息而言。这是明晰的关掉message group通过发送message从activemq broker 的JMSXGroupID 设置为-1 Connection connection = new ActiveMQConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("group.queue"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("<foo>close</foo>"); message.setStringProperty("JMSXGroupID", "TEST_GROUP_A"); message.setIntProperty("JMSXGroupSeq", -1); producer.send(message);12.3activemqstreamActivemq stream是一个高级的特色,他允许使用activemq来作为Jave Io stream,activeMQ将要break 一个outputstream对于不同的data chunk并且send每一个chunk通过activemq作为jms message一个相应的activemq jms inputstream应该用在consumer边重新结合data chunk如果你用queen 作为streamd的destination,使用不止一个consumer 在queen上(或者一个独占的consumer)是很好的, 由于group的这个特色【用同样的groupid指向一个单一的consumer】,使用超过一个的producer可能会造成message排序order的问题利用jms的好处就是activemq 把breank stream 分为了管理的块【chunk】, 并且允许你在consumer端给合并, 因此这是允许你传输大文件用这个功能为了证明这个用stream //source of our large data FileInputStream in = new FileInputStream("largetextfile.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); OutputStream out = connection.createOutputStream(destination); //now write the file on to ActiveMQ byte[] buffer = new byte[1024]; while(true){ int bytesRead = in.read(buffer); if (bytesRead==-1){ break; } out.write(buffer,0,bytesRead); } out.close(); 在下面的这个例子中我们创建了一个ActiveMQConnection并且创建了一个inputstream利用一个queen, 注意到我们利用一个独占的consumer通过apend"?consumer.exclusive=true";我们确保仅仅一个consumer 能够阅读到一个queen,我们read InputStream并且通过FileOutputStream来重组file在硬盘上你也能够使用topic, 尽管这个 //destination of our large data FileOutputStream out = new FileOutputStream("copied.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //we want be be an exclusive consumer String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true"; Queue destination = session.createQueue(exclusiveQueueName); InputStream in = connection.createInputStream(destination); //now write the file from ActiveMQ byte[] buffer = new byte[1024]; while(true){ int bytesRead = in.read(buffer); if (bytesRead==-1){ break; } out.write(buffer,0,bytesRead); } out.close(); }12.4 Blob消息activemq引进了blob来处理large message自己处理中转如果自己处理文件的话,一个简单方式是使用共享或ftp、dfs等方式,先把文件发送到一个大家都可以拿到的地方,然后发送message,payload或properties中包含文件的路径信息。这样,consumer拿到文件路径后去指定的地方,按照给定的方式去获取文件数据即可。优势:这种方式可以用来处理大数据,并且不需要client或broker在内存中持有文件数据本身,非常的节省资源。而且文件是通过额外的方式处理,跟ActiveMQ本身无关,所以符合jms协议、处理的效率也相对比较高。劣势:需要自己处理很多文件相关的操作。BlobMessage对文件中转的封装幸运的是,ActiveMQ把上面繁复的文件处理工作进行了封装,屏蔽掉文件中转的整个处理过程,使得我们可以使用类似jms规范的API来简单操作文件传输。 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com")); producer.send(message);consumer for blob:: FileOutputStream out = new FileOutputStream("blob.txt"); String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); BlobMessage blobMessage = (BlobMessage) consumer.receive(); InputStream in = blobMessage.getInputStream(); // now write the file from ActiveMQ byte[] buffer = new byte[1024]; while (true) { int bytesRead = in.read(buffer); if (bytesRead == -1) { break; } out.write(buffer, 0, bytesRead); } out.close(); }12.5网络存活 或者代理失败后的失效转移协议failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)activemq将会随机的选择list中的其中一个用失效转移协议,如果仅仅只有一个uri那么客户端将会隔断时间查看是否broker available,你可以利用TransportListener来监听activemq的连接public class ClientTransportListener implements TransportListener { protected final Logger logger = LoggerFactory.getLogger(ClientTransportListener.class); public void onCommand(Object o) { logger.debug("onCommand检测到服务端命令:{}", o); } public void onException(IOException error) { logger.error("onException,与服务器连接发生错误......"); } public void transportInterupted() { logger.error("transportInterupted,与服务器连接发生中断......"); IConnector connector = new Connector(); connector.reConnect(); } public void transportResumed() { logger.info("transportResumed,恢复与服务器连接...."); } } 当你想要按照顺序来启动failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?random=false如果个了段时间还是连不上,the failover protocol将会增加一段总量来连接activemq broker,这个叫做指数退避算法Exponential Backoff默认的useExponentialBackoff是enable参数 默认值 含义initialReconnectDelay 10ms, 重连之前等待的时间(ms)backOffMultiplier 1.5 增大等待时间的系数maxReconnectDelay 30000 重连之前等待的最大时间(ms)failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?backOffMultiplier=2,initialReconnectDelay=1000在maxInactivityDuration时间里没有连接上话就是invalidatefailover:(tcp://host1:61616?wireformat.maxInactivityDuration=0,tcp://host2:61616,ssl://host3:61616?wireformat.maxInactivityDuration=0)默认的话activemq传输是持久化的,如果你使用非持久化的方式传输的话,为了防止丢失你就要使用trackMessages=truemaxCachesizebackup=true,backupPoolSize=2updateClusterClientsrebalanceClusterClientsupdateClusterClientOnRemoveupdateClusterFilter 12.6在future传输messageProperty name type descriptionAMQ_SCHEDULED_DELAY long The time in milliseconds that a message will wait before being scheduled to be delivered by the brokerAMQ_SCHEDULED_PERIOD long The time in milliseconds to wait after the start time to wait before scheduling the message againAMQ_SCHEDULED_REPEAT int The number of times to repeat scheduling a message for deliveryAMQ_SCHEDULED_CRON String Use a Cron entry to set the schedule 例如,有一个消息,原定在60秒-交付你需要设置amq_scheduled_delayMessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");long time = 60 * 1000;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);producer.send(message);你可以设置一个消息,等待一个初始延迟,并重复传送10次,等待10秒之间的每一个重新交付MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");long delay = 30 * 1000;long period = 10 * 1000;int repeat = 9;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);producer.send(message);你也可以使用cron调度信息,例如,如果你想要一个消息如期交付的每一个小时,你就需要设置cron入口是0 * -例如MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");producer.send(message);cron调度优先使用消息延迟,然而,如果一个重复周期设置一个cron入门,ActiveMQ调度器将安排每次cron进入火灾的消息传递。用一个例子来解释更容易。假设你想要一个消息,10次,一一秒的延迟之间的每一个消息-你希望这个发生每小时-你会这样做:MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);producer.send(message)