activemq js接收消息怎么判断接收成功

1.下载ActiveMQ
去官方网站下载:
我下载的时候是 ActiveMQ 5.8.0 Release版
2.运行ActiveMQ
解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。
3.创建Eclipse项目并运行
创建java project:ActiveMQ-5.8,新建lib文件夹
打开apache-activemq-5.8.0\lib目录
拷贝
activemq-broker-5.8.0.jar
activemq-client-5.8.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
slf4j-api-1.6.6.jar
这5个jar文件到lib文件夹中,并Build Path-&Add to Build Path
Sender.java
package com.lm.
* @Header: Sender.java
* 类描述:
* @author: lm
上午10:52:42
* @company 欢
* @addr 北京市朝阳区劲松
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF // Connection :JMS 客户端到JMS
// Provider 的连接
Connection connection = // Session: 一个发送或接收消息的线程
S // Destination :消息的目的地;消息发送给谁.
Des // MessageProducer:消息发送者
MessageP // TextM
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try { // 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i &= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 发送的消息"
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
Receiver.java
package com.lm.
* @Header: Receiver.java
* 类描述:
* @author: lm
上午10:52:58
* @company 欢
* @addr 北京市朝阳区劲松
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.MessageC
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection =
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
// 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
5.测试过程
先运行:Receiver.java
再运行:Sender.java
可以看到结果
Sender运行后:
发送消息:ActiveMq 发送的消息1
发送消息:ActiveMq 发送的消息2
发送消息:ActiveMq 发送的消息3
发送消息:ActiveMq 发送的消息4
发送消息:ActiveMq 发送的消息5
Receiver运行后:
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
要想看到不同的输出内容,通过点击如下图的按钮切换console
在Receiver.java中,可以设置一个时间,比如receive(500000),如下代码所示:
TextMessage message = (TextMessage) consumer.receive(500000);
这个时候运行Receiver.java的话,会使得这个Receiver.java一直运行500秒,在eclipse中可以发现:
点击那个红色方块可以手动停止运行程序
黑色头发:/
浏览 92858
登录:http://localhost:8161/admin/ 时需要用户名和密码?什么填写?http://blog.csdn.net/stevenprime/article/details/7091224看看这个就知道用户名和密码是多少了
不错,可以运行,最近正在研究mq,不知楼主有.net的例子么,我下载了MQ提供的.net开发包,但是总是提示少一个dll木有哦
heisetoufa
浏览: 8966987 次
来自: 北京
解决了一个文件加载问题,感谢
// 设置不持久化,此处学习,实际根据项目决定
tianhei_3 写道登录:http://localhost ...
18:09:42,101 DEBUG [ ...activemq中发送者如何知道消费者是否已经收到消息 - ITeye问答
利用activemq进行消息收发,如果在线的消费者因某种特殊情况没有收到消息(例如消费者突然断网),发送方有什么办法能够立即知道消费者没有收到该消息,activemq有没有提供相应的手段?
问题补充:这个跟持久化有关系吗?如果发送消息采用的是NON_PERSISTENCE呢
问题补充:首先谢谢 boy00fly 的关注和热心解答
&br /&确实可以利用心跳机制并订阅消费者的advisory消息来知道一个消费者的start 和 stop状态,心跳默认的是30毫秒,就是说如果一个消费者掉线了,过了三十毫秒后,activemq broker才会产生该消费者掉线的advisory消息。把心跳时间改成1-5秒,确实可以做到当消费者掉线时,立即收到该advisory消息。不过这好像对系统是有影响的。InactivityMonitor这个类不知道你研究过没,它里面有个setReadCheckTime,好像是设置consumer从broker上读取消息的时间,默认也是30毫秒的,我想将这个时间设置到短点,但是,如何获取自己程序的InactivityMonitor实例呢,
&br /&
&br /&
&br /&&div class="quote_title"&boy00fly 写道&/div&&div class="quote_div"&不好意思,一开始理解你的问题有误。
&br /&消费者注册到activemq服务器时,在activemq服务器上会记录消费者的信息。并且消费者与服务器之间会保持一种类似于心跳线的机制,只要任意一方出现的异常导致程序退出,另外一方都会知道(可能是内部启动一个线程每隔几秒请求一次确定连接是否异常)。
&br /&&a href="/blog/1103586" target="_blank"&/blog/1103586&/a&这是我写得一些基础的内容。但是并没有你想知道的问题的具体分析,有时间我再把你的这个问题补充上去。&img src="/images/smiles/icon_biggrin.gif"/& &/div&
&br /&
在ActiveMQ中,消息生产者是无法知道消息消费者是否已经接受到消息的,因为消息发送和接收是两个相对独立的过程,生产者将消息发送给MOM,当MOM对接受消息进行了应答时,生产者就默认了你MOM一定会不惜任何代价将消息发送到消费者(这里假设消息有效期是设为0,为了健壮性,我们一般使用持久化消息)。如果有这种需求,在队列模型下,你可以使用两个队列来达到这种效果,生产者通过队列Q1向消费者发送消息,消费者消费完消息后再向队列Q2发送一条应答消息,而生产者负责异步监听这个Q2队列,如果想在生产者端做到同步效果,可以在生产者端的Q1和Q2之间使用信号量机制。
对于你提的第二个问题,我目前也没有深入分析过,以下是我的猜测!
activemq服务器本身应该是有个默认的心跳时间的,并且可以设置的(正如你而言)。
但是对于consumer而言,是不是可以单独配置针对此consumer的心跳时间?
如果不行的话,就只能改activemq服务器默认的时间了(要有把握,不然还是默认较好)。
推荐看一本书《ActiveMq in action》,官方也推荐这本书,或许能够找到你的答案。建议你看英文原版的,比较原汁原味,有需要我可以传给你,不过网上有的下载的,你可以搜索一下。 ,有空再交流!!
不好意思,一开始理解你的问题有误。
消费者注册到activemq服务器时,在activemq服务器上会记录消费者的信息。并且消费者与服务器之间会保持一种类似于心跳线的机制,只要任意一方出现的异常导致程序退出,另外一方都会知道(可能是内部启动一个线程每隔几秒请求一次确定连接是否异常)。
这是我写得一些基础的内容。但是并没有你想知道的问题的具体分析,有时间我再把你的这个问题补充上去。
activemq会持久化你的消息,等到此消息有消费者后再发送给消费者。
activemq5默认是会持久化你的消息的,当然你也可以关掉,不过一般不会这么做的.
持久化有两种方式disk、database
详情请参照官方文档
已解决问题
未解决问题博客分类:
本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
一.JMS回顾
因为ActiveMQ是一个JMS Provider的实现,因此在开始实作前,有必要复习下JMS的基础知识
Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
*1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受Topic消息。
在JMS中间主要定义了2种消息模式Point-to-Point (点对点),Publich/Subscribe Model (发布/订阅者),
其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。
下面是JMS规范基本的接口和实现
JMS Common Interfacse PTP-Specific Interface
Pub/Sub-specific interfaces
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
QueueSession
TopiSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver/QueueBrwer TopicSubscriber
二.使用Queue
下面以ActiveMQ example的代码为主进行说明
1. 使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory
//建立Connection
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
connection.start();
//建立Session
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
2。发送消息的代码 //建立QueueSession
protected MessageProducer createProducer(Session session) throws JMSException {
Destincation destination = session.createQueue("queue.hello");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
//使用Producer发送消息到Queue
producer.send(message);
3。接受消息,在JMS规范里面,你可以使用
QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口
public void onMessage(Message message) {
Destincation destination = session.createQueue("queue.hello");
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
以上就是使用jms queue发送接受消息的基本方式
1. 建立连接
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
connection.start();
2. 建立Session
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
3.创建Producer 发送消息到Topic
topic = session.createTopic("topic.hello");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message);
4.创建Consumer接受消息(基本上和Queue相同)
Destincation destination
= session.createTopic("topic.hello");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
四:连接ActiveMQ的方式
ActiveMQConnectionFactory 提供了多种连接到Broker的方式
常见的有 vm://host:port
tcp://host:port
//tcp ssl://host:port
//SSL stomp://host:port
//stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);
浏览 17167
浏览: 1363192 次
来自: 上海
有demo吗?
推荐给大家一些有用的爬虫源码:https://github.c ...
除了.classpath文件以外,.project文件也应该了 ...
造成eclipse自动关闭的原因有很多,这里有很多介绍:ecl ...
网上都是这些,这种文章。。。Activemq判断队列存活脚本
时间: 22:22:31
&&&& 阅读:288
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&Activemq是每一位运维工作人员必须掌握的一款软件,他是公司运行的命脉之一,既然如此重要,那么它的看门狗也是比较含金量的。Activemq的看门狗不是像那种# ps -ef|grep mq看进程或者#netstat -ntpl|grep 61613这样直白,因为Activemq还有一种假死情况,那就是“进程和端口都在,但是队列已经死掉了”,遇到这种情况就必须重启当前的activemq。那么要写出这样的一个脚本,需要满足以下条件:1)因为mq是集群,有主备之分,首先我们先在主mq上判断当前机器61613端口是否监听;2)如果当前机器61613端口正常,往mq里添加信息,如果信息可以成功添加到对应的队列里,那么可以证明mq是正常的;3)如果信息无法添加,那么监听备用mq的61613端口是否监听;4)如果备用mq的61613端口不在,则发送邮件报警并同时重启主备mq进程;5)如果备用mq的61613端口正常,则测试备用mq的信息队列是否正常;6)如果备用mq的信息队列不正常,一样发送邮件报警并同时重启主备mq进程;以上就是我们mq看门狗脚本的逻辑思路,这里我们用python语言来写,为什么要用python,因为python有一个stomp模块,这个模块是镇服activemq失灵的不二法宝!首先这个stomp是需要我们手动安装的,他的下载地址是clone&or download",然后把整个压缩文件传递到linux服务器里,解压缩之后,在stomp文件夹里使用#&python setup.py install,然后就看见如下文字可以证明stomp已经安装成功了:然后我们来写一个简单的测试队列的脚本,脚本内容如下:[&~]#&cat&mqtest.py&
import&time
import&sys
import&stomp
class&MyListener(object):
&&&&def&on_error(self,&headers,&message):
&&&&&&&&print(‘received&an&error&%s‘&%&message)
&&&&def&on_message(self,&headers,&message):
&&&&&&&&print(‘received&a&message&%s‘&%&message)
conn&=&stomp.Connection([(‘mq的内网ip地址‘,61613)])
conn.set_listener(‘‘,&MyListener())
conn.start()
conn.connect(‘mq的账号‘,‘mq的密码‘)
conn.subscribe(destination=‘/queue/chenshuo‘,&id=1,&ack=‘auto‘)
conn.send(body=‘hello,this&is&my&test&message!‘,&destination=‘/queue/chenshuo‘)
time.sleep(2)
conn.disconnect()在这个脚本里,我们把mq的内网地址作为连接地址,因为这样做很安全,其次我们建立一个叫“chenshuo”的队列,这个队列里传输一个消息,消息的内容就是hello,this&is&my&test&message!启动一下这个脚本,我们看一下linux端的效果。再看一下web界面:可见已经生成了chenshuo这个队列,而且入列和出列的消息数是3个(因为我之前传递了两个做实验),上面这个脚本是成功的。但是这个脚本只是一个草样,因为这个消息是没有“消费者”的,没有消费者的话,那么这三个消息就会堆积,而他们的命运要不被删除掉要么就是等待消费者出现把他们消费掉。(未完待续)本文出自 “” 博客,请务必保留此出处标签:&&&&&&&&&&&&
&&国之画&&&& &&&&chrome插件
版权所有 京ICP备号-2
迷上了代码!今天项目在联调过程中,ActiveMQ突然不好使了。在此之前一月内,项目组的人都没有去修改、重启过MQ服务。
虽然现在知道是由于权限的问题导致只能收不能发(站在ActiveMQ角度是收不到,但可以发)。只是到现在还不知道原来没变过的代码,怎么以前可以用,现在却不行了???
通过查询示例代码,发现有connectionFactory.setUserName("system");由此联想到,应该跟访问权限的设计有关系。
查询activemq.home/conf/果然有activemq-security.xml,有关于安全方面的配置示例。查询资料,可以通过设置anonymousAccessAllowed="false"使得MQ不需要密码访问。
去掉密码是不安全的,但之前没有设置过,真不知道怎么可以突然不行了。疑惑疑惑……
&!-- Conf Username, passwords and groups --&
&simpleAuthenticationPlugin anonymousAccessAllowed="false"&
&authenticationUser username="system" password="${activemq.password}"
groups="users,admins"/&
&authenticationUser username="user" password="${guest.password}"
groups="users"/&
&authenticationUser username="guest" password="${guest.password}" groups="guests"/&
&/simpleAuthenticationPlugin&
&/plugins&
浏览: 213400 次
来自: 厦门
另外一个方法实现eclipse tomcat 热部署:http ...
太有用了,我就是这个该死的错误,没注意啊。感谢。
写道一品哥,好巧啊,百度搜到滴,哈哈那个汗 ...

我要回帖

更多关于 activemq接收不到消息 的文章

 

随机推荐