java使用netty实现客户端与多个远程服务器建立连接并接收数据及发送数据。
pom依赖
基于netty-4.1.58开发。
代码部分
FoFParseClientFactory连接工厂代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
public final class FoFParseClientFactory { private static Bootstrap bootstrap = new Bootstrap(); private static EventLoopGroup group = new NioEventLoopGroup(); private static List<Channel> channelList = new ArrayList<Channel>(128); private final static FoFParseClientFactory instance = new FoFParseClientFactory();
private FoFParseClientFactory() { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_TIMEOUT, 60) .option(ChannelOption.TCP_NODELAY, true); }
public static FoFParseClientFactory getInstance() { return instance; }
public void newEmailParseTask(String host, int port, String userId, IMessageListener messageListener) throws Exception { if (!TcpHelper.isRemoteAlive(host, port)) { throw new Exception("邮件解析服务器未启动, 连接失败."); }
try { bootstrap.handler(new FoFNewNioChannelInitializer(userId, messageListener)); ChannelFuture channelFuture = bootstrap.connect(host, port); channelFuture.sync(); channelList.add(channelFuture.channel()); } catch (Exception e) { throw new Exception("与邮件服务器连接失败."); } }
public void disConnect(Channel channel) { channelList.remove(channel); }
public void shutdown() { for (Channel channel : channelList) { channel.close(); }
channelList.clear(); group.shutdownGracefully().syncUninterruptibly(); } }
|
FoFNewNioChannelInitializer连接通道初始化代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class FoFNewNioChannelInitializer extends ChannelInitializer<NioSocketChannel> { private String userId; private IMessageListener messageListener;
public FoFNewNioChannelInitializer() { }
public FoFNewNioChannelInitializer(String userId) { this.userId = userId; }
public FoFNewNioChannelInitializer(String userId, IMessageListener messageListener) { this.userId = userId; this.messageListener = messageListener; }
@Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LineEncoder(LineSeparator.WINDOWS)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new FoFParseResponseHandler(userId, messageListener)); } }
|
FoFParseResponseHandler数据包解析代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
public class FoFParseResponseHandler extends ChannelInboundHandlerAdapter { private String userId; private IMessageListener messageListener;
public FoFParseResponseHandler() { }
public FoFParseResponseHandler(String userId) { this.userId = userId; }
public FoFParseResponseHandler(String userId, IMessageListener messageListener) { this.userId = userId; this.messageListener = messageListener; }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (messageListener != null) messageListener.onConnect(userId); if (StringUtils.isBlank(userId)) { ctx.close(); }
String message = MessageHelper.emailParseRequest(userId); ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes()); ctx.writeAndFlush(byteBuf); if (messageListener != null) messageListener.onWrite(message.getBytes(), message.getBytes().length); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); EmailParseResponse response = MessageHelper.emailParseResponse((String) msg); if (response == null) { return; }
if (messageListener == null) return; if(messageListener.onRead(userId, response) == MessageCallBackAction.SHUTDOWN) { ctx.close(); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); FoFParseClientFactory.getInstance().disConnect(ctx.channel()); }
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); if (messageListener != null) { messageListener.onClose(userId); } FoFParseClientFactory.getInstance().disConnect(ctx.channel()); }
public void setMessageListener(IMessageListener messageListener) { this.messageListener = messageListener; } }
|
客户端使用代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public BusinessResult<String> startParseTask() { String userId = getUserId(); if (StringUtils.isBlank(userId)) { log.error("获取当前登陆用户失败, 启动邮件解析任务失败."); return new BusinessResult<String>().fail(BusinessCode.AUTH_USER_NOT_EXIST.getCode(), "当前用户不存在, 发送邮件解析指令失败!"); }
if (StringUtils.isBlank(remoteHost) || remotePort <= 0 || remotePort > 65535) { log.error("当前没有配置邮件解析服务器远程地址."); return new BusinessResult<String>().fail(BusinessCode.SERVICE_UNAVAILABLE.getCode(), "当前没有配置邮件解析服务器远程地址!"); }
try { FoFParseClientFactory.getInstance().newEmailParseTask(remoteHost, remotePort, userId, new IMessageListener() { @Override public void onConnect(String userId) { log.info("与邮件服务器建立连接成功."); }
@Override public MessageCallBackAction onRead(String userId, EmailParseResponse response) { log.info("接收到邮件解析数据:{}, {}", userId, response); if (response.isDone()) { return MessageCallBackAction.SHUTDOWN; }
return MessageCallBackAction.CONTINUE; }
@Override public void onWrite(byte[] data, int length) { log.info("发送数据到邮件解析服务器成功, {}, {}", length, new String(data)); }
@Override public void onClose(String userId) { log.info("与邮件服务器断开连接."); } });
log.info("发送邮件解析任务成功."); return new BusinessResult<String>().success(); } catch (Exception e) { return new BusinessResult<String>().fail(BusinessCode.SYSTEM_EXCEPTION.getCode(), e.getMessage()); } }
|