博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty
阅读量:7125 次
发布时间:2019-06-28

本文共 11809 字,大约阅读时间需要 39 分钟。

netty

概念: Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

新特性

  • 处理大容量数据流更简单
  • 处理协议编码和单元测试更简单
  • I/O超时和idle状态检测
  • 应用程序的关闭更简单,更安全
  • 更可靠的OutOfMemoryError预防

性能

  • 更好的吞吐量,更低的延迟
  • 更少的资源消耗
  • 最小化不必要的内存拷贝

具体使用见代码及注释

Helloword版

服务端这边绑定了两个端口,可以根据业务区别对待如端口1是做A业务,端2做B业务.

public class Server {    public static void main(String[] args) throws InterruptedException {        //1.创建两个线程组 (只有服务器端需要 )        //一个线程组专门用来管理接收客户端的请求连接的        //一个线程组进行网络通信(读写)        EventLoopGroup receiveGroup = new NioEventLoopGroup();        EventLoopGroup dealGroup = new NioEventLoopGroup();        //创建辅助工具类,用于设置服务器通道的一系列配置        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(receiveGroup, dealGroup)//绑定两个线程组        .channel(NioServerSocketChannel.class)   //指定NIO的模式        .option(ChannelOption.SO_BACKLOG, 1024)     //设置tcp缓冲区        .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲区大小        .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小        .option(ChannelOption.SO_KEEPALIVE, true)  //保持连接        .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { //3 在这里配置具体数据接收方法的处理 sc.pipeline().addLast(new ServerHandler()); } }); //4 进行绑定 ChannelFuture cf1 = serverBootstrap.bind(8765).sync(); ChannelFuture cf2 = serverBootstrap.bind(8764).sync(); //5 等待关闭 cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); receiveGroup.shutdownGracefully(); dealGroup.shutdownGracefully(); }}

服务端处理器:

public class ServerHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("server channel active... ");    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {            ByteBuf buf = (ByteBuf) msg;            byte[] req = new byte[buf.readableBytes()];            buf.readBytes(req);            String body = new String(req, "gbk");            System.out.println("Server :" + body );            String response = "进行返回给客户端的响应:" + body ;            //注意使用了writeAndFlush的话就可以不释放ReferenceCountUtil.release(msg); 否则需要释放ByteBuf容器的数据。            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));            //.addListener(ChannelFutureListener.CLOSE);//监听,内容传输完毕后就关闭管道    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx)            throws Exception {        System.out.println("读完了");        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)            throws Exception {        ctx.close();    }}

客户端:

public class Client {public static void main(String[] args) throws Exception{                EventLoopGroup group = new NioEventLoopGroup();        Bootstrap b = new Bootstrap();        b.group(group)        .channel(NioSocketChannel.class)        .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //发送消息 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes())); Thread.sleep(1000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes())); cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); group.shutdownGracefully(); }}

客户端处理器:

public class ClientHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("客户端的channelActive()方法");    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            ByteBuf buf = (ByteBuf) msg;                        byte[] req = new byte[buf.readableBytes()];            buf.readBytes(req);                        String body = new String(req, "gbk");            System.out.println("Client :" + body );        } finally {            ReferenceCountUtil.release(msg);        }    }

TCP拆包粘包问题

TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

通俗意义来说可能是三个数据如'A','B','C' 但经过TCP协议流式传输后成了'AB','C'两个数据了,这种就是粘包了数据包之间粘一起了。那么拆包的话有三种方式。

  • 设置每个数据包的大小如200个字节,如果某个数据包不足200个字节可能会出现丢包的情况,即该数据包未从一个端到另一个端,此时需要用空格或者既定的符号补充.
  • 在数据包之间使用一些字符进行分割如$号之类的,解析的时候先处理掉分隔符再拿到各个数据包就好了。(一般用的比较多)
  • 细粒化数据包分为头和尾(将消息分为消息头和消息尾)
  • 其他

两根水管(服务器与客户端)需要相互流通水(数据),那么需要一个转接头(套接字)连接,水流式无法区分一段段的数据,一种方式在流通的过程中设置些标志性物品如记号笔勾一下(分隔符),另一种方式则是设定每一段都是多少容量的水来区分.

使用分隔符解决TCP粘包

可以理解管道流里流的都是ByteBuffer类型的数据,那么使用分隔符(非ByteBuffer类型)的话可能就意味着一个转码与解码的过程。

服务端:

public class Server {    public static void main(String[] args) throws Exception{        //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的        EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();                //2 创建服务器辅助类        ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup)         .channel(NioServerSocketChannel.class)         .option(ChannelOption.SO_BACKLOG, 1024)         .option(ChannelOption.SO_SNDBUF, 32*1024)         .option(ChannelOption.SO_RCVBUF, 32*1024)         .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置特殊分隔符 解决TCP拆包黏包问题, ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定连接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}

服务端处理器:

public class ServerHandler extends ChannelHandlerAdapter{    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println(" server channel active... ");    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        String request = (String)msg;        System.out.println("Server channelRead:" + request);        String response = "服务器响应:" + msg + "$";        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelReadComplete");    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {        System.out.println("exceptionCaught");        ctx.close();    }}

客户端:

public class Client {public static void main(String[] args) throws Exception {                EventLoopGroup group = new NioEventLoopGroup();                Bootstrap b = new Bootstrap();        b.group(group)         .channel(NioSocketChannel.class)         .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据A$".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据B$".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}

客户端处理器:

public class ClientHandler extends ChannelHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("client channel active... ");    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        try {            String response = (String)msg;            System.out.println("Client: " + response);        } finally {            ReferenceCountUtil.release(msg);        }    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelReadComplete");    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("exceptionCaught");        ctx.close();    }}

设置长度大小解决TCP拆包黏包问题

服务端:

public class Server {    public static void main(String[] args) throws Exception{        //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的        EventLoopGroup pGroup = new NioEventLoopGroup();        EventLoopGroup cGroup = new NioEventLoopGroup();                //2 创建服务器辅助类        ServerBootstrap b = new ServerBootstrap();        b.group(pGroup, cGroup)         .channel(NioServerSocketChannel.class)         .option(ChannelOption.SO_BACKLOG, 1024)         .option(ChannelOption.SO_SNDBUF, 32*1024)         .option(ChannelOption.SO_RCVBUF, 32*1024)         .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置定长字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定连接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }

客户端:

public class Client {    public static void main(String[] args) throws Exception {                EventLoopGroup group = new NioEventLoopGroup();                Bootstrap b = new Bootstrap();        b.group(group)         .channel(NioSocketChannel.class)         .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}

服务端与客户端的处理器参照上例以字符串分割的.

新手上路,多多关注...

转载地址:http://uveel.baihongyu.com/

你可能感兴趣的文章
assign,copy,retain -Object-C中纠结的属性(转)
查看>>
发布系统之发布流程和发布salt相关命令
查看>>
从加载的XML文档重建工作流
查看>>
python基础知识~list详解
查看>>
jQuery对象和DOM对象的互换
查看>>
项目包进行分层
查看>>
linux 一些命令
查看>>
poj 3909
查看>>
redis之 3.0集群安装
查看>>
Java类加载机制
查看>>
Angular.js+Bootstrap实现手风琴菜单
查看>>
Android SDK开发包国内下载地址
查看>>
windows环境下SVN服务器限制注释字数
查看>>
2018-2019-1 20165318 20165326 实验五 通讯协议设计
查看>>
C#面向对象课程 类与对象,方法重载,类的静态成员,命名空间和类库12月22日...
查看>>
抽象工厂模式
查看>>
控件移动
查看>>
php mongodb driver : mongodb 笔记
查看>>
获取一篇新闻的全部信息
查看>>
java字符串的替换replace、replaceAll、replaceFirst的区别详解
查看>>