响应式异步编程开发
服务异步请求是相对于同步而言,尤其是公司业务架构很多外网请求都是同步请求调用,例如请求微信的外网接口,代码会用HttpUrlConnection获取外网数据,在数据返回之前会一直同步阻塞等待,一旦外网请求出现延迟会严重影响服务性能进而影响整微服务响应,导致各种服务异常报警。
基于此,底层对于外网的请求改造成支持异步网络请求,异步的主流实现目前有回调、Promise、协程、响应式等思想,著名的node.js采用的便是回调的方式,但为了避免各种回调地狱,且便于提供更大的灵活空间,底层的异步设计思想参照RxJava
采用响应式流式调用,即基于RxIo
提供的响应式异步编程开发模型,有一定的学习成本,但也提供最大的灵活性。
服务端代码示例
package cloud.apposs.netkit;
import cloud.apposs.netkit.filterchain.http.client.HttpAnswer;
import cloud.apposs.netkit.filterchain.line.TextLineFilter;
import cloud.apposs.netkit.rxio.IoAction;
import cloud.apposs.netkit.rxio.RxIo;
import cloud.apposs.netkit.rxio.io.http.IoHttp;
import cloud.apposs.netkit.server.ServerConfig;
import cloud.apposs.netkit.server.ServerHandlerAdaptor;
import cloud.apposs.netkit.server.ServerHandlerContext;
import cloud.apposs.netkit.server.TcpServer;
public class TestHttpRawServer {
public static void main(String[] args) throws Exception {
ServerConfig config = new ServerConfig();
config.setHost("0.0.0.0");
config.setPort(8882);
config.setNumOfGroup(16);
TcpServer server = new TcpServer(config);
server.getFilterChain().addFilter(new TextLineFilter("\r\n\r\n"));
server.setHandler(new HttpSlow());
server.start();
}
static class HttpSlow extends ServerHandlerAdaptor {
@Override
public void channelRead(final ServerHandlerContext context,
final Object message) throws Exception {
System.out.println(Thread.currentThread());
final EventLoopGroup group = context.getLoopGroup();
// 异步通过http获取信息,此方法会快速返回并释放netkit底层线程资源
RxIo<HttpAnswer> request = RxIo.create(group, new IoHttp("http://www.baidu.com"));
request.subscribe(new IoAction<HttpAnswer>() {
@Override
public void call(HttpAnswer response) throws Exception {
// 在有数据响应后异步输出数据,即通过EventLoop进行数据输出
context.write(response.getContent());
}
}).start();
}
@Override
public void channelSend(ServerHandlerContext context, WriteRequest request) {
context.close(true);
}
}
}