RxIo 链式调用
RxIo 除了支持单个操作符或者网络请求外,也支持多个操作符链式重复调用,灵活搭配。
代码示例
@Test
public void testRxIoFullCall() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final EventLoopGroup group = new EventLoopGroup(4);
group.start();
final Executor executor = getNamedExecutor("ExecutorOnThread");
// 先请求url1,再请求url2,最后做逻辑处理
// 应用场景:传单通过微信api获取用户tokenid,再根据tokenid获取用户头像等
long start = System.currentTimeMillis();
final String url1 = "http://www.baidu.com";
final String url2 = "http://www.163.com";
// 先http异步请求,再配置线程池对请求回来的网络数据进行处理
RxIo.http(group, url1).map(new IoFunction<HttpAnswer, String>() {
// 进行map数据转换,一般是作业务逻辑处理
@Override
public String call(HttpAnswer resp) throws IOException {
System.err.println("----------------- Map1 " + Thread.currentThread() + url1 + " ----------------");
System.out.println(resp.getHeaders().toString());
return resp.getContent();
}
}).request(new IoFunction<String, RxIo<HttpAnswer>>() {
// 继续异步请求外网
@Override
public RxIo<HttpAnswer> call(String t) throws Exception {
System.err.println("----------------- Request " + Thread.currentThread() + url1 + " ----------------");
// 异步外网请求会在EventLoop中执行,所以接下来的业务逻辑处理还需要在线程池处理的话依然要开启executOn
return RxIo.http(group, url2);
}
}).map(new IoFunction<HttpAnswer, String>() {
@Override
public String call(HttpAnswer resp) throws Exception {
System.err.println("----------------- Map2 " + Thread.currentThread() + url2 + " ----------------");
return resp.getHeaders().toString();
}
}).match(new IoFunction<String, Errno>() {
// 判断请求的数据是否正常,不正常返回Errno.Error,onError接口会做最终的异常处理
@Override
public Errno call(String t) throws Exception {
if (t == null) {
return Errno.ERROR;
}
return Errno.OK;
}
}).subscribe(new IoSubscriber<String>() {
// 真正的业务逻辑处理,包括数据输出或者降级处理
@Override
public void onNext(String response) {
System.err.println("----------------- Subscribe " + Thread.currentThread() + url2 + " ----------------");
System.out.println(response);
latch.countDown();
group.shutdown();
((ExecutorService) executor).shutdown();
}
@Override
public void onCompleted() {
System.out.println("execute complete");
latch.countDown();
group.shutdown();
((ExecutorService) executor).shutdown();
}
@Override
public void onError(Throwable e) {
System.err.println("exception caught");
e.printStackTrace();
latch.countDown();
group.shutdown();
((ExecutorService) executor).shutdown();
}
}).start();
latch.await();
long exeTime = System.currentTimeMillis() - start;
System.err.println("----------------- " + Thread.currentThread() + " execute time:" + exeTime + " ----------------");
}