RxIo lock 操作符

lock 异步锁操作符是在微服务开发中针对某些增、删操作时进行加锁操作,底层采用的是队列先进先出的方式来实现锁操作中的原子性,并辅以线程池进行异步操作,保证即使代码上加锁了也依然不会阻塞整个服务运行。

代码示例

@Test
public void testThreadRxIoLock() throws Exception {
    int threadCount = 10;
    CountDownLatch latch = new CountDownLatch(threadCount);
    Actor actor = new Actor(2);

    // 模拟同一个用户多次进行订单操作下的加锁
    ActorLock lockKey = Actor.createLock(854);
    List<Thread> orderTheadList = new ArrayList<Thread>();
    for (int i = 0; i < threadCount; i++) {
        orderTheadList.add(new MyThread(i, lockKey, actor, latch));
    }
    for (int i = 0; i < orderTheadList.size(); i++) {
        orderTheadList.get(i).start();
        Thread.sleep(100);
    }
    latch.await();
}

static class MyThread extends Thread {
    private int index;

    private ActorLock lockKey;

    private Actor actor;

    private CountDownLatch latch;

    public MyThread(int index, ActorLock lockKey, Actor actor, CountDownLatch latch) {
        this.index = index;
        this.lockKey = lockKey;
        this.actor = actor;
        this.latch = latch;
    }

    public int getIndex() {
        return index;
    }

    @Override
    public void run() {
        Random random = new Random();
        int time = random.nextInt(2000);
        // 模拟EventLoop多线程下RxIo响应式编程
        RxIo.lock(lockKey, actor, new RxIo.OnSubscribe<String>() {
            @Override
            public void call(SafeIoSubscriber<? super String> t) throws Exception {
                // 模拟即使有请求进来,但耗时比较久,其他同lockkey的请求也是要阻塞保证串行执行
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                }
                // 模拟HTTP请求结束会调用此方法触发数据的发送
                t.onNext("This is a Emitter Lock String");
            }
        }).subscribe(new IoAction<String>() {
            @Override
            public void call(String s) throws Exception {
                // 输出时会按顺序输出0:MsgXX,1:MsgXX,即使同lockkey队列方法中执行的耗时不同,依然也是顺序执行
                System.out.println(index + ";Msg=" + s + ";LockKey=" + lockKey + ";SleepTime=" + time);
                latch.countDown();
            }
        }).start();
    }

    @Override
    public String toString() {
        return String.valueOf(this.index);
    }
}