Netty - 组件 - EventLoop

suaxi
2026-07-04 / 0 评论 / 2 阅读 / 正在检测是否收录...

3. 组件

3.1 EventLoop

(1)概念

EventLoop 事件循环对象本质是一个单线程处理器(同时维护了一个 Selector),它继承的父类分为:

  • ScheduledExecutorService,定时任务线程池
  • netty - OrderedEventExecutor,可以判断线程是否属于当前的 EventLoop,和查找线程属于哪个 EventLoop

EventLoopGroup 是 EventLoop 事件循环对象的一组集合,Channel 会通过 register 方法来注册绑定其中的一个 EventLoop,后续该 Channel 中的事件都由绑定的 EventLoop 来处理,它的功能包含:

  • 遍历组中的事件循环对象
  • 获取下一个事件循环对象(实现了 Iterable 接口)


(2)EventLoop 任务处理 Demo

package com.sw.netty._02_EventLoop;

import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class EventLoopTest {
    public static void main(String[] args) {
        // 1. 创建事件循环组
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

        // 2. 执行普通任务
        // eventExecutors.next().execute(() -> log.info("execute normal task"));

        // 3. 定时任务
        eventExecutors.next().scheduleAtFixedRate(() -> log.info("execute schedule task"), 1L, 2L, TimeUnit.SECONDS);
        log.info("main");
    }
}


(3)EventLoop 处理 io 事件 Demo

Server

package com.sw.netty._02_EventLoop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class Server {
    public static void main(String[] args) {
        DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup();
        new ServerBootstrap()
                // accept eventLoop, read eventLoop
                .group(new NioEventLoopGroup(), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler-01", new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        log.info("处理时间较长的请求");
                                        ByteBuf bf = (ByteBuf) msg;
                                        log.info(bf.toString(Charset.defaultCharset()));

                                        // 将消息传递给下一个 handler
                                        ctx.fireChannelRead(msg);
                                    }
                                })
                                .addLast(defaultGroup, "handler-02", new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        log.info("处理时间正常的请求");
                                        ByteBuf bf = (ByteBuf) msg;
                                        log.info(bf.toString(Charset.defaultCharset()));
                                    }
                                });
                    }
                })
                .bind(8088);
    }
}


Client

package com.sw.netty._02_EventLoop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class Client {
    public static void main(String[] args) throws InterruptedException {
        // 1.启动器
        Channel channel = new Bootstrap()
                // 2. EventLoop
                .group(new NioEventLoopGroup())
                // 3. channel 实现
                .channel(NioSocketChannel.class)
                // 4. 添加 handler
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 在连接建立之后被调用
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8088))
                .sync()
                .channel();
        System.out.println("test");
    }
}
0

评论 (0)

取消