RxIo lock 操作符
lock 异步锁操作符是在微服务开发中针对某些增、删操作时进行加锁操作,底层采用的是队列先进先出的方式来实现锁操作中的原子性,并辅以线程池进行异步操作,保证即使代码上加锁了也依然不会阻塞整个服务运行。
代码示例
@Test
public void testThreadRxIoLock() throws Exception {
int threadCount = 10;
CountDownLatch latch = new CountDownLatch(threadCount);
Actor actor = new Actor(2);
// 模拟同一个用户多次进行订单操作下的加锁
ActorLock lockKey = Actor.createLock(854);
List<Thread> orderTheadList = new ArrayList<Thread>();
for (int i = 0; i < threadCount; i++) {
orderTheadList.add(new MyThread(i, lockKey, actor, latch));
}
for (int i = 0; i < orderTheadList.size(); i++) {
orderTheadList.get(i).start();
Thread.sleep(100);
}
latch.await();
}
static class MyThread extends Thread {
private int index;
private ActorLock lockKey;
private Actor actor;
private CountDownLatch latch;
public MyThread(int index, ActorLock lockKey, Actor actor, CountDownLatch latch) {
this.index = index;
this.lockKey = lockKey;
this.actor = actor;
this.latch = latch;
}
public int getIndex() {
return index;
}
@Override
public void run() {
Random random = new Random();
int time = random.nextInt(2000);
// 模拟EventLoop多线程下RxIo响应式编程
RxIo.lock(lockKey, actor, new RxIo.OnSubscribe<String>() {
@Override
public void call(SafeIoSubscriber<? super String> t) throws Exception {
// 模拟即使有请求进来,但耗时比较久,其他同lockkey的请求也是要阻塞保证串行执行
try {
Thread.sleep(time);
} catch (InterruptedException e) {
}
// 模拟HTTP请求结束会调用此方法触发数据的发送
t.onNext("This is a Emitter Lock String");
}
}).subscribe(new IoAction<String>() {
@Override
public void call(String s) throws Exception {
// 输出时会按顺序输出0:MsgXX,1:MsgXX,即使同lockkey队列方法中执行的耗时不同,依然也是顺序执行
System.out.println(index + ";Msg=" + s + ";LockKey=" + lockKey + ";SleepTime=" + time);
latch.countDown();
}
}).start();
}
@Override
public String toString() {
return String.valueOf(this.index);
}
}