Java Dubbo协议下的服务端线程怎样使用
发布时间:2023-03-21 11:01:36 所属栏目:教程 来源:
导读:Provider端线程模型
在了解服务端线程模型之前,先了解一下dubbo对Channel上的操作抽象,dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为
在了解服务端线程模型之前,先了解一下dubbo对Channel上的操作抽象,dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为
Provider端线程模型 在了解服务端线程模型之前,先了解一下dubbo对Channel上的操作抽象,dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为org.apache.dubbo.remoting.ChannelHandler,该接口是SPI的,用户可以自己扩展,接口代码如下: 该接口抽象的五种Channel上的行为解释如下: 建立连接:connected,主要是的职责是在channel记录read、write的时间,以及处理建立连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(onconnect),即在该操作中执行。 断开连接:disconnected,主要是的职责是在channel移除read、write的时间,以及处理端开连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(ondisconnect),即在该操作中执行。 发送消息:sent,包括发送请求和发送响应。记录write的时间。 接收消息:received,包括接收请求和接收响应。记录read的时间。 异常捕获:caught,用于处理在channel上发生的各类异常。 dubbo框架的线程模型与以上这五种行为息息相关,dubbo协议Provider端线程模型提供了五种实现,虽说都是五种但是别把二者混淆,线程模型的顶级接口是org.apache.dubbo.remoting.dispatcher,该接口也是SPI的,提供的五种实现分别是Alldispatcher、Directdispatcher、MessageOnlydispatcher、Executiondispatcher、Connectionordereddispatcher,默认的使用的是Alldispatcher。 Java Dubbo协议下的服务端线程如何使用 org.apache.dubbo.remoting.ChannelHandler作为Channel上的行为的顶级接口对应dubbo协议Provider端的5种线程模型同样也提供了对应的5种实现,分别是AllChannelHandler、DirectChannelHandler、MessageOnlyChannelHandler、ExecutionChannelHandler、ConnectionorderedChannelHandler,这里Channel上行为的具体实现不展开讨论。 Java Dubbo协议下的服务端线程如何使用 Channel上行为和线程模型之间使用策略可以参考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers的源代码,这里不做详细的介绍,下面的各个章节只针对5种线程模型做简单的介绍。 Alldispatcher IO线程上的操作: 接口响应序列化 sent操作 dubbo线程池上的操作: received、connected、disconnected、caught都是在dubbo线程池上执行 服务端反序列化操作的dubbo线程池上执行 Alldispatcher代码如下,Alldispatcher的dispatch方法实例化了AllChannelHandler,AllChannelHandler实现了received、connected、disconnected、caught操作在dubbo线程池中,代码如下: public class Alldispatcher implements dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } } public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void connected(Channel channel) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.disCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } } Directdispatcher 该线程模型Channel上的所有行为均在IO线程中执行,并没有在dubbo线程池中执行 Directdispatcher与Alldispatcher相似,实例化了DirectChannelHandler,DirectChannelHandler只实现了received行为,但是received中获取的线程池如果是ThreadlessExecutor才会提交task,否则也是在ChannelHandler中执行received行为,ThreadlessExecutor和普通线程池最大的区别是不会管理任何线程,这里不展开讨论。 public class Directdispatcher implements dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new DirectChannelHandler(handler, url); } } public class DirectChannelHandler extends WrappedChannelHandler { public DirectChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); if (executor instanceof ThreadlessExecutor) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } else { handler.received(channel, message); } } } Executiondispatcher 在IO线程中执行的操作有: sent、connected、disconnected、caught操作在IO线程上执行。 序列化响应在IO线程上执行。 在dubbo线程中执行的操作有: received都是在dubbo线程上执行的。 反序列化请求的行为在dubbo中做的。 同样的,我们可以直接看ExecutionChannelHandler源码,逻辑是当message的类型是Request时received行为在dubbo线程池执行。感兴趣的可以自己看源码,这里不做介绍。 MessageOnlydispatcher Message Only dispatcher所有的received行为和反序列化都是在dubbo线程池中执行的 public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } } Connectionordereddispatcher 该线程模型与Alldispatcher类似,sent操作和相应的序列化是在IO线程上执行;connected、disconnected、received、caught操作在dubbo线程池上执行,他们的区别是在connected、disconnected行为上Connectionordereddispatcher做了线程池隔离,并且在dubbo connected thread pool中提供了链接限制、告警灯能力,我们直接看ConnectionorderedChannelHandler源码,代码如下: public class ConnectionorderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queueWarningLimit; public ConnectionorderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.disCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queueWarningLimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit)); } } } (编辑:汽车网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐