用RxJava实现事件总线

最近在升级美恰SDK的过程中,客服妹子想要在app中提示用户的未读消息数,以方便更快的给到用户反馈,于是clone了他们的demo看了一下,看完之后觉得很震惊,因为他们用的一些技术都是目前比较火的库,包括rxjava,retrfit,okhttp,leakcanary,glide,picasso等等,像他们这种第三方服务商,用户的多样性还是很大的,但是他们也都毅然地选择了拥抱新技术,我觉得这是很值得学习和借鉴的。

回到刚刚的需求,在app中获取用户的未读消息数,我们在拿到消息数后怎么通知呢,这就要用到消息通知机制了,有人选择Android原生的android.database.Observable,也有人选择EventBus,或者otto,大家用的比较多的应该是EventBus吧,我在知乎上看到以前关于RxJava和EventBus的区别?,可以参读一下,大家支持使用Rxjava的还是占多数的,这里美恰团队也是使用的Rxjava来实现的事件总线,所以我也打算在项目中使用RxJava了,当然RxJava的功能远不止这么简单,那就从用RxJava实现事件总线开始吧,因为RxJava也是基于观察者模式,天生就具备这种事件通知的功能。

首先我们看一下美恰的实现方式:

public class RxBus {
    private Subject<Object, Object> mBus;
    private static RxBus sInstance;

    private RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (sInstance == null) {
            // [1]
            synchronized (RxBus.class) {
                if (sInstance == null) {
                    //单例模式之双重检测:线程一在此之前线程二到达了位置[1],如果此处不二次判断,那么线程二执行到这里的时候还会重新new
                    sInstance = new RxBus();
                }
            }
        }
        return sInstance;
    }

    private Subject<Object, Object> getBus() {
        return mBus;
    }

    public static Observable<Object> toObserverable() {
        return getInstance().getBus();
    }

    public static void send(Object obj) {
           getInstance().getBus().onNext(obj);
    }

    public static boolean hasObservers() {
        return getInstance().getBus().hasObservers();
    }

}

下面是我的实现方式:

public class RxBus {
    private static volatile RxBus mInstance;

    private final Subject bus;

    private Subject getBus() {
        return bus;
    }

    //Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject 
    //PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
    public RxBus() {    
        bus = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        RxBus rxBus = mInstance;
        if (mInstance == null) {
            synchronized (RxBus.class) {
                rxBus = mInstance;
                if (mInstance == null) {
                    rxBus = new RxBus();
                    mInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    public static void post(Object o) {
        if (hasObservers()){
            getInstance().getBus().onNext(o);
        }
    }


    public static <T> Observable<T> toObserverable(Class<T> eventType) {
        return getInstance().getBus().ofType(eventType);
    }

    public static boolean hasObservers() {
        return getInstance().getBus().hasObservers();
    }
}

使用方法

  1. 发送事件

    RxBus.post(new UnreadChatMessageEvent());      
    
  2. 监听事件

    mSubscription = RxBus.toObserverable(IncreaseEvent.class).subscribe(new Action1<IncreaseEvent>() {
      @Override
      public void call(IncreaseEvent increaseEvent) {
          tv_increase.setText(increaseEvent.getNum() + "");
      }
    

    });

这里有点需要注意,在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题

   @Override
protected void onDestroy() {
    super.onDestroy();
    if (mSubscription != null && !mSubscription.isUnsubscribed()) {
        mSubscription.unsubscribe();
    }
}

改进

  1. 单例加了双重校验锁,更多单例讲解请看Android设计模式之单例模式
  2. 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者,我们可以看下ofType的源码实现:

    public final <R> Observable<R> ofType(final Class<R> klass) {
     return filter(new Func1<T, Boolean>() {
         @Override
         public final Boolean call(T t) {
             return klass.isInstance(t);
         }
     }).cast(klass);
    

    }

    其内部就是filter+cast,filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。cast操作符可以将一个Observable转换成指定类型的Observable。

参考

http://www.jianshu.com/p/ca090f6e2fe2

Android设计模式之单例模式

https://drakeet.me/rxbus