rocketmq客户端sdk

rocketmq开源版的客户端sdk包括:java版、c++版本、go版本。商业版包括:java版本、c/c++版本、.net版本、nodejs版本。

开源版本sdk

java版

1、通过maven方式引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.8</version>
<type>pom</type>
</dependency>

2、下载jar包
主要依赖:rocketmq-client-3.5.8.jar、rocketmq-common-3.5.8.jar、rocketmq-3.5.8-remoting-3.5.8.jar三个包。
3、参考代码,生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.1.2:9876;192.168.1.3:9876");
producer.start();
for (int i = 0; i < 10000000; i++)
try {
{
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID188",// key
("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("192.168.1.182:9876");
consumer.setInstanceName("Consumber1");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("topic_test", "*");
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
System.out.println(msg.getQueueId() + "========" + msg.getMsgId() + "========" + new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("ConsumerStarted.");
}

c/c++版本

在github上由广大用户维护的几个版本:
1、可用资源
a、https://github.com/hooligan520/rocketmq-client4cpp-linuxhooligan 维护一个可用的版本,主要支持linux平台。推荐使用。
b、https://github.com/lehoon/rocketmq-client4cpp-linux 主要是修改文件编码为UTF-8,后续继续维护更新。
2、下载代码后,通过sh build.sh编译出lib在release文件夹下。
3、参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 初始化RocketMQ生产者,传入生产组名称
RMQ_DEBUG("producer.new: %s", group.c_str());
DefaultMQProducer producer(group);

// 设置MQ的NameServer地址
RMQ_DEBUG("producer.setNamesrvAddr: %s", namesrv.c_str());
producer.setNamesrvAddr(namesrv);

// 启动生产者
RMQ_DEBUG("producer.start");
producer.start();

std::string tags[] = { "TagA", "TagB", "TagC", "TagD", "TagE" };

int _cost = 0, _tps = 0, _avg = 0, _min = 0, _max = 0;
int _failCnt = 0;
TimeCount tc;
TimeCount tcTotal;
tcTotal.begin();

int nNow = time(NULL);
char key[64];
char value[1024];

std::string str;
for (int i = 0; i < size; i += 8)
{
str.append("hello baby");
}

for (int i = 0; i < count; i++)
{
try
{

tc.begin();

snprintf(key, sizeof(key), "KEY_%d_%d", nNow, i);
snprintf(value, sizeof(value), "%011d_%s", i, str.c_str());
Message msg(topic,// topic
tags[i % 5],// tag
key,// key
value,// body
strlen(value)+1
);

// 同步生产消息
SendResult sendResult = producer.send(msg);

tc.end();

int cost = tc.countMsec();
_min = (_min == 0) ? cost : (std::min(cost, _min));
_max = (_max == 0) ? cost : (std::max(cost, _max));

if (print_screen)
{
printf("cost:%dms, result:{sendStatus=%d,msgId=%s,messageQueue=[topic=%s,brokerName=%s,queueId=%d],queueOffset=%d}\n",
cost,
sendResult.getSendStatus(), sendResult.getMsgId().c_str(),
sendResult.getMessageQueue().getTopic().c_str(),
sendResult.getMessageQueue().getBrokerName().c_str(),
sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset());
}
}
catch (MQClientException& e)
{
std::cout << e << std::endl;
_failCnt++;
}
}
tcTotal.end();

printf("statsics: num=%d, fail=%d, total_cost=%ds, tps=%d, avg=%dms, min=%dms, max=%dms\n",
count, _failCnt, tcTotal.countSec(), (int)((double)count/(tcTotal.countMsec()/1000)),
tcTotal.countMsec()/count, _min, _max);

// 停止生产者
producer.shutdown();

go版本

1、go版本是由嘀嗒拼车在github上开源的一个版本。github地址
2、引入的go_rocket_mq库
3、参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
conf := &rocketmq.Config{
Nameserver: "192.168.1.234:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf)
if err != nil {
log.Panic(err)
}
consumer.Subscribe("test2", "*")
consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) error {
for i, msg := range msgs {
log.Print(i, string(msg.Body))
}
return nil
})
consumer.Start()

商业版sdk

商业版在阿里云上,参考网址:https://help.aliyun.com/product/29530.html?spm=5176.doc29561.3.1.FiDeyn

java sdk

参考网址:https://help.aliyun.com/document_detail/29546.html?spm=5176.doc42419.6.571.RM2pFd

c/c++ sdk

参考网址:https://help.aliyun.com/document_detail/29555.html?spm=5176.doc29546.6.581.qYwUZd

.net sdk

参考网址:https://help.aliyun.com/document_detail/29561.html?spm=5176.doc29555.6.587.D9x3wx

nodejs sdk

由大神维护的一个开源版本,github地址: https://github.com/XadillaX/aliyun-ons
此版本依赖于官方的c/c++库,所以需要先安装c/c++库。

文章目录
  1. 1. 开源版本sdk
    1. 1.1. java版
    2. 1.2. c/c++版本
    3. 1.3. go版本
  2. 2. 商业版sdk
    1. 2.1. java sdk
    2. 2.2. c/c++ sdk
    3. 2.3. .net sdk
    4. 2.4. nodejs sdk