腾讯微群加入QQ群

 找回密码
 加入我们

!connect_header_login!

!connect_header_login_tip!

搜索
查看: 1108|回复: 0

RabbitMQ 入门指南(Java)

[复制链接]
发表于 2016-4-18 14:45:56 | 显示全部楼层 |阅读模式
RabbitMQ是一个受欢迎的消息代理,通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成。本文简单介绍了如何使用 RabbitMQ,假定你已经配置好了rabbitmq服务器。 RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。
1<dependency>

2        <groupId>com.rabbitmq</groupId>

3        <artifactId>amqp-client</artifactId>

4        <version>3.0.4</version>

5</dependency>



像RabbitMQ这样的消息代理可用来模拟不同的场景,例如点对点的消息分发或者订阅/推送。我们的程序足够简单,有两个基本的组件,一个生产者用于产生消息,还有一个消费者用来使用产生的消息。
在这个例子里,生产者会产生大量的消息,每个消息带有一个序列号,另一个线程中的消费者会使用这些消息。
抽象类EndPoint:
我们首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些。
01package co.syntx.examples.rabbitmq;

02

03import java.io.IOException;

04

05import com.rabbitmq.client.Channel;

06import com.rabbitmq.client.Connection;

07import com.rabbitmq.client.ConnectionFactory;

08

09/**

10 * Represents a connection with a queue

11 * @author syntx

12 *

13 */

14public abstract class EndPoint{

15     

16    protected Channel channel;

17    protected Connection connection;

18    protected String endPointName;

19     

20    public EndPoint(String endpointName) throws IOException{

21         this.endPointName = endpointName;

22         

23         //Create a connection factory

24         ConnectionFactory factory = new ConnectionFactory();

25         

26         //hostname of your rabbitmq server

27         factory.setHost("localhost");

28         

29         //getting a connection

30         connection = factory.newConnection();

31         

32         //creating a channel

33         channel = connection.createChannel();

34         

35         //declaring a queue for this channel. If queue does not exist,

36         //it will be created on the server.

37         channel.queueDeclare(endpointName, false, false, false, null);

38    }

39     

40     

41    /**

42     * 关闭channel和connection。并非必须,因为隐含是自动调用的。

43     * @throws IOException

44     */

45     public void close() throws IOException{

46         this.channel.close();

47         this.connection.close();

48     }

49}



生产者: 生产者类的任务是向队列里写一条消息。我们使用Apache Commons Lang把可序列化的Java对象转换成 byte 数组。commons lang的maven依赖如下:
<dependency>        <groupId>commons-lang</groupId>        <artifactId>commons-lang</artifactId>        <version>2.6</version></dependency>
01package co.syntx.examples.rabbitmq;

02

03import java.io.IOException;

04import java.io.Serializable;

05

06import org.apache.commons.lang.SerializationUtils;

07

08

09/**

10 * The producer endpoint that writes to the queue.

11 * @author syntx

12 *

13 */

14public class Producer extends EndPoint{

15     

16    public Producer(String endPointName) throws IOException{

17        super(endPointName);

18    }

19

20    public void sendMessage(Serializable object) throws IOException {

21        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));

22    }   

23}



消费者: 消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
01package co.syntx.examples.rabbitmq;

02

03import java.io.IOException;

04import java.util.HashMap;

05import java.util.Map;

06

07import org.apache.commons.lang.SerializationUtils;

08

09import com.rabbitmq.client.AMQP.BasicProperties;

10import com.rabbitmq.client.Consumer;

11import com.rabbitmq.client.Envelope;

12import com.rabbitmq.client.ShutdownSignalException;

13

14

15/**

16 * 读取队列的程序端,实现了Runnable接口。

17 * @author syntx

18 *

19 */

20public class QueueConsumer extends EndPoint implements Runnable, Consumer{

21     

22    public QueueConsumer(String endPointName) throws IOException{

23        super(endPointName);        

24    }

25     

26    public void run() {

27        try {

28            //start consuming messages. Auto acknowledge messages.

29            channel.basicConsume(endPointName, true,this);

30        } catch (IOException e) {

31            e.printStackTrace();

32        }

33    }

34

35    /**

36     * Called when consumer is registered.

37     */

38    public void handleConsumeOk(String consumerTag) {

39        System.out.println("Consumer "+consumerTag +" registered");     

40    }

41

42    /**

43     * Called when new message is available.

44     */

45    public void handleDelivery(String consumerTag, Envelope env,

46            BasicProperties props, byte[] body) throws IOException {

47        Map map = (HashMap)SerializationUtils.deserialize(body);

48        System.out.println("Message Number "+ map.get("message number") + " received.");

49         

50    }

51

52    public void handleCancel(String consumerTag) {}

53    public void handleCancelOk(String consumerTag) {}

54    public void handleRecoverOk(String consumerTag) {}

55    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}

56}



Putting it together: 在下面的测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。
01package co.syntx.examples.rabbitmq;

02

03import java.io.IOException;

04import java.sql.SQLException;

05import java.util.HashMap;

06

07public class Main {

08    public Main() throws Exception{

09         

10        QueueConsumer consumer = new QueueConsumer("queue");

11        Thread consumerThread = new Thread(consumer);

12        consumerThread.start();

13         

14        Producer producer = new Producer("queue");

15         

16        for (int i = 0; i < 100000; i++) {

17            HashMap message = new HashMap();

18            message.put("message number", i);

19            producer.sendMessage(message);

20            System.out.println("Message Number "+ i +" sent.");

21        }

22    }

23     

24    /**

25     * @param args

26     * @throws SQLException

27     * @throws IOException

28     */

29    public static void main(String[] args) throws Exception{

30      new Main();

31    }

32}



本文地址:http://www.oschina.net/translate/getting-started-with-rabbitmq-in-java
原文地址:http://syntx.co/languages-frameworks/java/getting-started-with-rabbitmq-in-java/

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 加入我们

本版积分规则

QQ|手机版|Archiver|小黑屋|一起疯|苦咖啡 ( 新ICP备12000197号  

GMT+8, 2018-4-26 12:09 , Processed in 0.111256 second(s), 13 queries , Memcache On.

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表