响应式异步编程开发

服务异步请求是相对于同步而言,尤其是公司业务架构很多外网请求都是同步请求调用,例如请求微信的外网接口,代码会用HttpUrlConnection获取外网数据,在数据返回之前会一直同步阻塞等待,一旦外网请求出现延迟会严重影响服务性能进而影响整微服务响应,导致各种服务异常报警。

基于此,底层对于外网的请求改造成支持异步网络请求,异步的主流实现目前有回调、Promise、协程、响应式等思想,著名的node.js采用的便是回调的方式,但为了避免各种回调地狱,且便于提供更大的灵活空间,底层的异步设计思想参照RxJava采用响应式流式调用,即基于RxIo提供的响应式异步编程开发模型,有一定的学习成本,但也提供最大的灵活性。

服务端代码示例

package cloud.apposs.netkit;

import cloud.apposs.netkit.filterchain.http.client.HttpAnswer;
import cloud.apposs.netkit.filterchain.line.TextLineFilter;
import cloud.apposs.netkit.rxio.IoAction;
import cloud.apposs.netkit.rxio.RxIo;
import cloud.apposs.netkit.rxio.io.http.IoHttp;
import cloud.apposs.netkit.server.ServerConfig;
import cloud.apposs.netkit.server.ServerHandlerAdaptor;
import cloud.apposs.netkit.server.ServerHandlerContext;
import cloud.apposs.netkit.server.TcpServer;

public class TestHttpRawServer {
    public static void main(String[] args) throws Exception {
        ServerConfig config = new ServerConfig();
        config.setHost("0.0.0.0");
        config.setPort(8882);
        config.setNumOfGroup(16);
        TcpServer server = new TcpServer(config);
        server.getFilterChain().addFilter(new TextLineFilter("\r\n\r\n"));
        server.setHandler(new HttpSlow());
        server.start();
    }

    static class HttpSlow extends ServerHandlerAdaptor {
        @Override
        public void channelRead(final ServerHandlerContext context,
                                final Object message) throws Exception {
            System.out.println(Thread.currentThread());

            final EventLoopGroup group = context.getLoopGroup();
            // 异步通过http获取信息,此方法会快速返回并释放netkit底层线程资源
            RxIo<HttpAnswer> request = RxIo.create(group, new IoHttp("http://www.baidu.com"));
            request.subscribe(new IoAction<HttpAnswer>() {
                @Override
                public void call(HttpAnswer response) throws Exception {
                    // 在有数据响应后异步输出数据,即通过EventLoop进行数据输出
                    context.write(response.getContent());
                }
            }).start();
        }

        @Override
        public void channelSend(ServerHandlerContext context, WriteRequest request) {
            context.close(true);
        }
    }
}