最近在升级美恰SDK的过程中,客服妹子想要在app中提示用户的未读消息数,以方便更快的给到用户反馈,于是clone了他们的demo看了一下,看完之后觉得很震惊,因为他们用的一些技术都是目前比较火的库,包括rxjava,retrfit,okhttp,leakcanary,glide,picasso等等,像他们这种第三方服务商,用户的多样性还是很大的,但是他们也都毅然地选择了拥抱新技术,我觉得这是很值得学习和借鉴的。
回到刚刚的需求,在app中获取用户的未读消息数,我们在拿到消息数后怎么通知呢,这就要用到消息通知机制了,有人选择Android原生的android.database.Observable,也有人选择EventBus,或者otto,大家用的比较多的应该是EventBus吧,我在知乎上看到以前关于RxJava和EventBus的区别?,可以参读一下,大家支持使用Rxjava的还是占多数的,这里美恰团队也是使用的Rxjava来实现的事件总线,所以我也打算在项目中使用RxJava了,当然RxJava的功能远不止这么简单,那就从用RxJava实现事件总线开始吧,因为RxJava也是基于观察者模式,天生就具备这种事件通知的功能。
首先我们看一下美恰的实现方式:
public class RxBus {
private Subject<Object, Object> mBus;
private static RxBus sInstance;
private RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
if (sInstance == null) {
// [1]
synchronized (RxBus.class) {
if (sInstance == null) {
//单例模式之双重检测:线程一在此之前线程二到达了位置[1],如果此处不二次判断,那么线程二执行到这里的时候还会重新new
sInstance = new RxBus();
}
}
}
return sInstance;
}
private Subject<Object, Object> getBus() {
return mBus;
}
public static Observable<Object> toObserverable() {
return getInstance().getBus();
}
public static void send(Object obj) {
getInstance().getBus().onNext(obj);
}
public static boolean hasObservers() {
return getInstance().getBus().hasObservers();
}
}
下面是我的实现方式:
public class RxBus {
private static volatile RxBus mInstance;
private final Subject bus;
private Subject getBus() {
return bus;
}
//Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject
//PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
RxBus rxBus = mInstance;
if (mInstance == null) {
synchronized (RxBus.class) {
rxBus = mInstance;
if (mInstance == null) {
rxBus = new RxBus();
mInstance = rxBus;
}
}
}
return rxBus;
}
public static void post(Object o) {
if (hasObservers()){
getInstance().getBus().onNext(o);
}
}
public static <T> Observable<T> toObserverable(Class<T> eventType) {
return getInstance().getBus().ofType(eventType);
}
public static boolean hasObservers() {
return getInstance().getBus().hasObservers();
}
}
使用方法
发送事件
RxBus.post(new UnreadChatMessageEvent());
监听事件
mSubscription = RxBus.toObserverable(IncreaseEvent.class).subscribe(new Action1<IncreaseEvent>() { @Override public void call(IncreaseEvent increaseEvent) { tv_increase.setText(increaseEvent.getNum() + ""); }
});
这里有点需要注意,在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题
@Override
protected void onDestroy() {
super.onDestroy();
if (mSubscription != null && !mSubscription.isUnsubscribed()) {
mSubscription.unsubscribe();
}
}
改进
- 单例加了双重校验锁,更多单例讲解请看Android设计模式之单例模式
根据传递的 eventType 类型返回特定类型(eventType)的 被观察者,我们可以看下
ofType
的源码实现:public final <R> Observable<R> ofType(final Class<R> klass) { return filter(new Func1<T, Boolean>() { @Override public final Boolean call(T t) { return klass.isInstance(t); } }).cast(klass);
}
其内部就是filter+cast,filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。cast操作符可以将一个Observable转换成指定类型的Observable。