RxJava中操作符组合的复用利器compose()

首先我们看一个Retrofit+RxJava做网络请求的例子

接口:

1
2
@GET("/api/v1/users")
Observable<Response<Profile>> getProfile();

调用如下

1
2
3
4
5
6
7
8
9
getApi().getProfile()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Response<Profile>, Observable<?>>() {
@Override
public Observable<?> call(final Response<Profile> response) {
return flatResponse(response);
}
});

Response是为了统一处理返回结果而定义的泛型类

1
2
3
4
5
6
7
8
9
10
public class Response<T> {
public int code;
public String message;
public T data;
public boolean isSuccess() {
return code == 200;
}
}

flatResponse是一个分割操作,旨在将Response<T>转换成Observable<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 对网络接口返回的Response进行分割操作
*
* @param response
* @param <T>
* @return
*/
public <T> Observable<T> flatResponse(final Response<T> response) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (response.isSuccess()) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response.data);
}
} else {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(new ApiGenerator.APIException(response.code, response.message));
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
});
}

这只是一个接口调用,如果我们再增加一个接口getXX(),我们又需要这样

1
2
3
4
5
6
7
8
9
getApi().getXX()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Response<Profile>, Observable<?>>() {
@Override
public Observable<?> call(final Response<Profile> response) {
return flatResponse(response);
}
});

我们发现这样是不是太麻烦了,每次都要重复写这些操作符,那有没有办法将一组操作符重用于多个数据流中呢?当然有,接下来就来讲今天的重头–> compose操作符

compose

在说compose之前要先介绍下Transformer
Transformer实际上就是一个Func1<Observable<T>, Observable<R>>,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable,这里我们就是通过Transformer将Observable<Response<T>>转换成了Observable<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected <T> Observable.Transformer<Response<T>, T> applySchedulers() {
return new Observable.Transformer<Response<T>, T>() {
@Override
public Observable<T> call(Observable<Response<T>> responseObservable) {
return responseObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Response<T>, Observable<T>>() {
@Override
public Observable<T> call(Response<T> tResponse) {
return flatResponse(tResponse);
}
})
;
}
};
}

我们在新增一个接口的时候就可以通过compose操作符实现Transformer的转换

1
2
3
public Observable<Profile> getProfile() {
return getApi().getProfile().compose(this.<Profile>applySchedulers());
}

这样我们就实现了Observable<Response<Profile>>Observable<Profile>的类型转换,并处理了线程调度、分割返回结果等操作符组合,达到了复用的目的

参考

【译】避免打断链式结构:使用.compose( )操作符
给 Android 开发者的 RxJava 详解
RxJava+Retrofit框架Demo(一)