问题总结(一)
自己其实遇到过很多的问题,但是大多数只是搜索答案,然后就完结了,这样就很容易的知其所以然,但是不知其为什么所以然了,一知半解的导致自己的知识体系漏洞百出。
- springboot加载工程,到底是怎么启动的,我知道springboot中加了一个tomcat的源码,可是这个tomcat是怎么启动的呢?
- NIO的编程,netty是如果处理的?
- 分布式锁的使用场合,除了使用redis,zookeeper等框架实现之外,还有其他的实现方式了吗?
慢慢的来,一点一点的梳理。
Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,目的就是为了减少用户开发工作量,降低开发难度。ServerBootstrap是Socket服务端的启动辅助类,用户通过ServerBootstrap可以方便地创建Netty的服务端。
- ServerBootstrap就是netty服务端的启动类,创建的时候,竟然没有参数。
- Reactor线程池,这个就是EventLoop的数组,实际上就是处理Selector上的Channel的线程,在一个循环体重执行,这个封装的比较的好。 这个只是一个简单的概括,具体的实现还是比较的复杂。
- 注册的ServerSocketChannnel,被netty中的NioServerSocketChannel所替代。这个是由工程类根据NioServerSocketChannel的类名,通过反射生成。
- 然后就是添加ChannelPipeline的handler,根据添加的handler不同,来处理socket数据流。
- 剩下的就是比较一致的绑定接口,设置TCP的参数等等的全局的设置。
netty启动的时候客户端的时序图:
代码说话了:
public class NettyServer {
private static final Log LOG = LogFactory.getLog(NettyServer.class);
public void bind() throws Exception {//2
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//4
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();//3
b.group(bossGroup, workerGroup)//5
.channel(NioServerSocketChannel.class)//6
.option(ChannelOption.SO_BACKLOG, 100)//7
.handler(new LoggingHandler(LogLevel.INFO))//8
.childHandler(new ChannelInitializer<SocketChannel>() {//9
@Override
public void initChannel(SocketChannel ch) throws IOException {
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
}
});
// 绑定端口,同步等待成功
b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();//10
LOG.info("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
}
public static void main(String[] args) throws Exception {
new NettyServer().bind();//1
}
}
1 首先执行1,然后顺着执行到了10
io.netty.bootstrap.AbstractBootstrap.bind(String, int)
具体的内容:
public ChannelFuture bind(SocketAddress localAddress) {
validate();//校验参数
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
具体的执行:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
@Override
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
默认的处理的children的个数是:Runtime.getRuntime().availableProcessors() * 2
这个应该是服务端能够同时处理客户端的连接数。
一般是你CPU *2的个数,在Reactor模式中,mainReactor角色一般只需要一个线程就搞定了,subReactor角色就是那个苦逼的worker了,一般boss(mainReactor)一个就够了,subReactor就是需要多个了。
Channnel的初始化:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
具体的绑定的代码:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
最后是调用sync的代码:
io.netty.channel.DefaultChannelPromise.sync();
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
@Override
public boolean isDone() {
return isDone0(result);
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException());
//volatile的关键字来标识连接的状态
private volatile Object result;