3. 组件
3.1 EventLoop
(1)概念
EventLoop 事件循环对象本质是一个单线程处理器(同时维护了一个 Selector),它继承的父类分为:
- ScheduledExecutorService,定时任务线程池
- netty - OrderedEventExecutor,可以判断线程是否属于当前的 EventLoop,和查找线程属于哪个 EventLoop
EventLoopGroup 是 EventLoop 事件循环对象的一组集合,Channel 会通过 register 方法来注册绑定其中的一个 EventLoop,后续该 Channel 中的事件都由绑定的 EventLoop 来处理,它的功能包含:
- 遍历组中的事件循环对象
- 获取下一个事件循环对象(实现了 Iterable 接口)
(2)EventLoop 任务处理 Demo
package com.sw.netty._02_EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class EventLoopTest {
public static void main(String[] args) {
// 1. 创建事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
// 2. 执行普通任务
// eventExecutors.next().execute(() -> log.info("execute normal task"));
// 3. 定时任务
eventExecutors.next().scheduleAtFixedRate(() -> log.info("execute schedule task"), 1L, 2L, TimeUnit.SECONDS);
log.info("main");
}
}
(3)EventLoop 处理 io 事件 Demo
Server
package com.sw.netty._02_EventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class Server {
public static void main(String[] args) {
DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
// accept eventLoop, read eventLoop
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler-01", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("处理时间较长的请求");
ByteBuf bf = (ByteBuf) msg;
log.info(bf.toString(Charset.defaultCharset()));
// 将消息传递给下一个 handler
ctx.fireChannelRead(msg);
}
})
.addLast(defaultGroup, "handler-02", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("处理时间正常的请求");
ByteBuf bf = (ByteBuf) msg;
log.info(bf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8088);
}
}
Client
package com.sw.netty._02_EventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class Client {
public static void main(String[] args) throws InterruptedException {
// 1.启动器
Channel channel = new Bootstrap()
// 2. EventLoop
.group(new NioEventLoopGroup())
// 3. channel 实现
.channel(NioSocketChannel.class)
// 4. 添加 handler
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 在连接建立之后被调用
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8088))
.sync()
.channel();
System.out.println("test");
}
}
评论 (0)