问题总结(一)

自己其实遇到过很多的问题,但是大多数只是搜索答案,然后就完结了,这样就很容易的知其所以然,但是不知其为什么所以然了,一知半解的导致自己的知识体系漏洞百出。

  1. springboot加载工程,到底是怎么启动的,我知道springboot中加了一个tomcat的源码,可是这个tomcat是怎么启动的呢?
  2. NIO的编程,netty是如果处理的?
  3. 分布式锁的使用场合,除了使用redis,zookeeper等框架实现之外,还有其他的实现方式了吗?

慢慢的来,一点一点的梳理。

Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,目的就是为了减少用户开发工作量,降低开发难度。ServerBootstrap是Socket服务端的启动辅助类,用户通过ServerBootstrap可以方便地创建Netty的服务端。

netty启动服务端的时序图

  1. ServerBootstrap就是netty服务端的启动类,创建的时候,竟然没有参数。
  2. Reactor线程池,这个就是EventLoop的数组,实际上就是处理Selector上的Channel的线程,在一个循环体重执行,这个封装的比较的好。 这个只是一个简单的概括,具体的实现还是比较的复杂。
  3. 注册的ServerSocketChannnel,被netty中的NioServerSocketChannel所替代。这个是由工程类根据NioServerSocketChannel的类名,通过反射生成。
  4. 然后就是添加ChannelPipeline的handler,根据添加的handler不同,来处理socket数据流。
  5. 剩下的就是比较一致的绑定接口,设置TCP的参数等等的全局的设置。

netty启动的时候客户端的时序图:

netty启动客户端的时序图

代码说话了:

public class NettyServer {
	private static final Log LOG = LogFactory.getLog(NettyServer.class);
	public void bind() throws Exception {//2
		// 配置服务端的NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();//4
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		ServerBootstrap b = new ServerBootstrap();//3
		b.group(bossGroup, workerGroup)//5
		.channel(NioServerSocketChannel.class)//6
		.option(ChannelOption.SO_BACKLOG, 100)//7
		.handler(new LoggingHandler(LogLevel.INFO))//8
		.childHandler(new ChannelInitializer<SocketChannel>() {//9
					@Override
					public void initChannel(SocketChannel ch) throws IOException {
						ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
						ch.pipeline().addLast(new NettyMessageEncoder());
						ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
						ch.pipeline().addLast(new LoginAuthRespHandler());
						ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
					}
				});

		// 绑定端口,同步等待成功
		b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();//10
		LOG.info("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
	}
	public static void main(String[] args) throws Exception {
		new NettyServer().bind();//1
	}
}

1 首先执行1,然后顺着执行到了10

 io.netty.bootstrap.AbstractBootstrap.bind(String, int)

具体的内容:

 public ChannelFuture bind(SocketAddress localAddress) {
        validate();//校验参数
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

具体的执行:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise;
        if (regFuture.isDone()) {
            promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }

        return promise;
    }
final ChannelFuture initAndRegister() {
        Channel channel;
        try {
            channel = createChannel();
        } catch (Throwable t) {
            return VoidChannel.INSTANCE.newFailedFuture(t);
        }

        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regFuture = channel.newPromise();
        channel.unsafe().register(regFuture);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
    
     @Override
    Channel createChannel() {
        EventLoop eventLoop = group().next();
        return channelFactory().newChannel(eventLoop, childGroup);
    }

 @Override
    public EventExecutor next() {
        return children[Math.abs(childIndex.getAndIncrement() % children.length)];
    }
    默认的处理的children的个数是:Runtime.getRuntime().availableProcessors() * 2
    这个应该是服务端能够同时处理客户端的连接数。
    一般是你CPU *2的个数,在Reactor模式中,mainReactor角色一般只需要一个线程就搞定了,subReactor角色就是那个苦逼的worker了,一般boss(mainReactor)一个就够了,subReactor就是需要多个了。

Channnel的初始化:

     @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                        currentChildAttrs));
            }
        });
    }

具体的绑定的代码:

 private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

最后是调用sync的代码:

io.netty.channel.DefaultChannelPromise.sync();

 @Override
    public ChannelPromise sync() throws InterruptedException {
        super.sync();
        return this;
    }
    
     @Override
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

@Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        synchronized (this) {
            while (!isDone()) {
                checkDeadLock();
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

  @Override
    public boolean isDone() {
        return isDone0(result);
    }

    private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
    }
    
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
    private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException());
    //volatile的关键字来标识连接的状态
    private volatile Object result;
    

Powered by andiHappy and Theme by AndiHappy