netty客户端与多个远程服务器连接

java使用netty实现客户端与多个远程服务器建立连接并接收数据及发送数据。

pom依赖

基于netty-4.1.58开发。

代码部分

FoFParseClientFactory连接工厂代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
 /**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: CopyRight (c) 2020-2035</p>
* <p>Company: lehoon Co. LTD.</p>
* <p>Author: lehoon</p>
* <p>Date: 2021/12/3 14:25</p>
*/
public final class FoFParseClientFactory {
private static Bootstrap bootstrap = new Bootstrap();
private static EventLoopGroup group = new NioEventLoopGroup();
private static List<Channel> channelList = new ArrayList<Channel>(128);
private final static FoFParseClientFactory instance = new FoFParseClientFactory();

private FoFParseClientFactory() {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_TIMEOUT, 60)
.option(ChannelOption.TCP_NODELAY, true);
}

public static FoFParseClientFactory getInstance() {
return instance;
}

public void newEmailParseTask(String host, int port, String userId, IMessageListener messageListener) throws Exception {
if (!TcpHelper.isRemoteAlive(host, port)) {
throw new Exception("邮件解析服务器未启动, 连接失败.");
}

try {
bootstrap.handler(new FoFNewNioChannelInitializer(userId, messageListener));
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.sync();
channelList.add(channelFuture.channel());
} catch (Exception e) {
throw new Exception("与邮件服务器连接失败.");
}
}

public void disConnect(Channel channel) {
channelList.remove(channel);
}

public void shutdown() {
for (Channel channel : channelList) {
channel.close();
}

channelList.clear();
group.shutdownGracefully().syncUninterruptibly();
}
}

FoFNewNioChannelInitializer连接通道初始化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 /**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: CopyRight (c) 2020-2035</p>
* <p>Company: lehoon Co. LTD.</p>
* <p>Author: lehoon</p>
* <p>Date: 2021/12/3 14:35</p>
*/
public class FoFNewNioChannelInitializer extends ChannelInitializer<NioSocketChannel> {
private String userId;
private IMessageListener messageListener;

public FoFNewNioChannelInitializer() {
}

public FoFNewNioChannelInitializer(String userId) {
this.userId = userId;
}

public FoFNewNioChannelInitializer(String userId, IMessageListener messageListener) {
this.userId = userId;
this.messageListener = messageListener;
}

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineEncoder(LineSeparator.WINDOWS));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new FoFParseResponseHandler(userId, messageListener));
}
}

FoFParseResponseHandler数据包解析代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: CopyRight (c) 2020-2035</p>
* <p>Company: lehoon Co. LTD.</p>
* <p>Author: lehoon</p>
* <p>Date: 2021/12/3 14:47</p>
*/
public class FoFParseResponseHandler extends ChannelInboundHandlerAdapter {
private String userId;
private IMessageListener messageListener;

public FoFParseResponseHandler() {
}

public FoFParseResponseHandler(String userId) {
this.userId = userId;
}

public FoFParseResponseHandler(String userId, IMessageListener messageListener) {
this.userId = userId;
this.messageListener = messageListener;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (messageListener != null) messageListener.onConnect(userId);
if (StringUtils.isBlank(userId)) {
ctx.close();
}

String message = MessageHelper.emailParseRequest(userId);
ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(byteBuf);
if (messageListener != null) messageListener.onWrite(message.getBytes(), message.getBytes().length);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
EmailParseResponse response = MessageHelper.emailParseResponse((String) msg);
if (response == null) {
return;
}

if (messageListener == null) return;
if(messageListener.onRead(userId, response) == MessageCallBackAction.SHUTDOWN) {
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
FoFParseClientFactory.getInstance().disConnect(ctx.channel());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
if (messageListener != null) {
messageListener.onClose(userId);
}
FoFParseClientFactory.getInstance().disConnect(ctx.channel());
}

public void setMessageListener(IMessageListener messageListener) {
this.messageListener = messageListener;
}
}

客户端使用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public BusinessResult<String> startParseTask() {
String userId = getUserId();
if (StringUtils.isBlank(userId)) {
log.error("获取当前登陆用户失败, 启动邮件解析任务失败.");
return new BusinessResult<String>().fail(BusinessCode.AUTH_USER_NOT_EXIST.getCode(), "当前用户不存在, 发送邮件解析指令失败!");
}

if (StringUtils.isBlank(remoteHost) || remotePort <= 0 || remotePort > 65535) {
log.error("当前没有配置邮件解析服务器远程地址.");
return new BusinessResult<String>().fail(BusinessCode.SERVICE_UNAVAILABLE.getCode(), "当前没有配置邮件解析服务器远程地址!");
}

try {
FoFParseClientFactory.getInstance().newEmailParseTask(remoteHost, remotePort, userId, new IMessageListener() {
@Override
public void onConnect(String userId) {
log.info("与邮件服务器建立连接成功.");
}

@Override
public MessageCallBackAction onRead(String userId, EmailParseResponse response) {
log.info("接收到邮件解析数据:{}, {}", userId, response);
if (response.isDone()) {
return MessageCallBackAction.SHUTDOWN;
}

return MessageCallBackAction.CONTINUE;
}

@Override
public void onWrite(byte[] data, int length) {
log.info("发送数据到邮件解析服务器成功, {}, {}", length, new String(data));
}

@Override
public void onClose(String userId) {
log.info("与邮件服务器断开连接.");
}
});

//开始标志
log.info("发送邮件解析任务成功.");
return new BusinessResult<String>().success();
} catch (Exception e) {
return new BusinessResult<String>().fail(BusinessCode.SYSTEM_EXCEPTION.getCode(), e.getMessage());
}
}
文章目录
  1. 1. pom依赖
  2. 2. 代码部分
    1. 2.1. FoFParseClientFactory连接工厂代码
    2. 2.2. FoFNewNioChannelInitializer连接通道初始化代码
    3. 2.3. FoFParseResponseHandler数据包解析代码
    4. 2.4. 客户端使用代码