转载请以链接形式标明出处: 本文出自:103style的博客
过滤相关的操作符 以及 官方介绍
RxJava 之 过滤操作符 官方介绍 :Filtering Observables
debouncedistinctdistinctUntilChangedelementAtelementAtOrErrorfilterfirstfirstElementfirstOrErrorignoreElementignoreElementslastlastElementlastOrErrorofTypesampleskipskipLasttaketakeLastthrottleFirstthrottleLastthrottleLatestthrottleWithTimeouttimeout
debounce
丢弃超过debounce设置的时间的事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: A
onNext: D
onNext: E
onCompletedistinct
过滤相同的事件
官方示例:
代码语言:javascript复制Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制2
3
4
1distinctUntilChanged
过滤连续的相同事件流
官方示例:
代码语言:javascript复制Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制1
2
1
2
3
4elementAt
获取事件流中从零开始的第指定下标的元素
官方示例:
代码语言:javascript复制Observable.range(0, 10)
.elementAt(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制5elementAtOrError
索引不存在则走onError
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});输出:
代码语言:javascript复制onError: java.util.NoSuchElementExceptionfilter
自定义过滤规则
官方示例:
代码语言:javascript复制Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制2
4
6first
获取事件流中第一个事件,返回值为 Single
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});输出:
代码语言:javascript复制AfirstElement
获取事件流中第一个事件,返回值为 Maybe
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> firstOrDefault = source.firstElement();
firstOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});输出:
代码语言:javascript复制AfirstOrError
输出第一个事件并捕获异常。
官方示例:
代码语言:javascript复制Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});输出:
代码语言:javascript复制onError: java.util.NoSuchElementExceptionignoreElement
过滤一个事件
官方示例:
代码语言:javascript复制Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();输出:
代码语言:javascript复制Done!ignoreElements
过滤所有事件
官方示例:
代码语言:javascript复制Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("Done!");
}
}).blockingAwait();输出:
代码语言:javascript复制Done!last
获取事件流中最后一个事件,返回值为 Single
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});输出:
代码语言:javascript复制ClastElement
获取事件流中最后一个事件, 返回值为 Maybe
官方示例:
代码语言:javascript复制Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> lastOrDefault = source.lastElement();
lastOrDefault.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});输出:
代码语言:javascript复制ClastOrError
同 firstOrError
官方示例:
代码语言:javascript复制Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onSuccess will not be printed!");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError: " throwable);
}
});输出:
代码语言:javascript复制onError: java.util.NoSuchElementExceptionofType
根据类型过滤
官方示例:
代码语言:javascript复制Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制1
3
7sample
仅在周期性时间间隔内发出最近发出的事件来过滤事件流中的事件。
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制// 700(500 200)
//1500(500 200 800)
//2100(500 200 800 600)
onNext: C
onNext: D
onCompleteskip
跳过事件流中开头的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制5
6
7
8
9
10skipLast
跳过事件流中结尾的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制1
2
3
4
5
6take
取事件流中开头的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制1
2
3
4takeLast
取事件流中结尾的指定个数事件
官方示例:
代码语言:javascript复制Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.takeLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});输出:
代码语言:javascript复制7
8
9
10throttleFirst
和 sample相反 去指定连续时间内的第一个事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: A
onNext: D
onCompletethrottleLast
和 sample一样 去指定连续时间内的最后一个事件
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: C
onNext: D
onCompletethrottleLatest
发出事件流中的事件,然后在它们之间经过指定的超时时定期发出最新项目(如果有)。
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: A
onNext: C
onNext: D
onCompletethrottleWithTimeout
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(1500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.throttleWithTimeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: A
onNext: D
onNext: E
onCompletetimeout
在超时时间内发出每一个事件,如果超过超时事件则报错
官方示例:
代码语言:javascript复制Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
}
});
source.subscribeOn(Schedulers.io())
.timeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext: " s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});输出:
代码语言:javascript复制onNext: A
onNext: B
onNext: C
java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.以上


