RxIo 链式调用

RxIo 除了支持单个操作符或者网络请求外,也支持多个操作符链式重复调用,灵活搭配。

代码示例

@Test
public void testRxIoFullCall() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);
    final EventLoopGroup group = new EventLoopGroup(4);
    group.start();
    final Executor executor = getNamedExecutor("ExecutorOnThread");

    // 先请求url1,再请求url2,最后做逻辑处理
    // 应用场景:传单通过微信api获取用户tokenid,再根据tokenid获取用户头像等
    long start = System.currentTimeMillis();
    final String url1 = "http://www.baidu.com";
    final String url2 = "http://www.163.com";
    // 先http异步请求,再配置线程池对请求回来的网络数据进行处理
    RxIo.http(group, url1).map(new IoFunction<HttpAnswer, String>() {
        // 进行map数据转换,一般是作业务逻辑处理
        @Override
        public String call(HttpAnswer resp) throws IOException {
            System.err.println("----------------- Map1 " + Thread.currentThread() + url1 + " ----------------");
            System.out.println(resp.getHeaders().toString());
            return resp.getContent();
        }
    }).request(new IoFunction<String, RxIo<HttpAnswer>>() {
        // 继续异步请求外网
        @Override
        public RxIo<HttpAnswer> call(String t) throws Exception {
            System.err.println("----------------- Request " + Thread.currentThread() + url1 + " ----------------");
            // 异步外网请求会在EventLoop中执行,所以接下来的业务逻辑处理还需要在线程池处理的话依然要开启executOn
            return RxIo.http(group, url2);
        }
    }).map(new IoFunction<HttpAnswer, String>() {
        @Override
        public String call(HttpAnswer resp) throws Exception {
            System.err.println("----------------- Map2 " + Thread.currentThread() + url2 + " ----------------");
            return resp.getHeaders().toString();
        }
    }).match(new IoFunction<String, Errno>() {
        // 判断请求的数据是否正常,不正常返回Errno.Error,onError接口会做最终的异常处理
        @Override
        public Errno call(String t) throws Exception {
            if (t == null) {
                return Errno.ERROR;
            }
            return Errno.OK;
        }
    }).subscribe(new IoSubscriber<String>() {
        // 真正的业务逻辑处理,包括数据输出或者降级处理
        @Override
        public void onNext(String response) {
            System.err.println("----------------- Subscribe " + Thread.currentThread() + url2 + " ----------------");
            System.out.println(response);
            latch.countDown();
            group.shutdown();
            ((ExecutorService) executor).shutdown();
        }

        @Override
        public void onCompleted() {
            System.out.println("execute complete");
            latch.countDown();
            group.shutdown();
            ((ExecutorService) executor).shutdown();
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("exception caught");
            e.printStackTrace();
            latch.countDown();
            group.shutdown();
            ((ExecutorService) executor).shutdown();
        }
    }).start();
    latch.await();
    long exeTime = System.currentTimeMillis() - start;
    System.err.println("----------------- " + Thread.currentThread() + " execute time:" + exeTime + " ----------------");
}