无法通过合并并行运行两个 Observable。
class MyExample {
//Создание observable
public static Observable<String> createObservable(String name){
return Observable.create(o -> {
Integer count = 1;
while (count++ < 100) {
Thread.sleep(100);//TODO:Имитация работы удаленного сервера
o.onNext(name+"_"+count.toString());
}
o.onComplete();
});
}
public MyExample() {
//Формируем список из observable
List<Observable<String>> ol = new ArrayList<>();
ol.add(createObservable("Первый"));
ol.add(createObservable("Второй"));
Observable<String> rxObservable = Observable.merge(ol)
.subscribeOn(Schedulers.io())
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread());
//Слушатель, выводит в консоль все что ему присылают
DisposableObserver<String> observer = new DisposableObserver<String>() {
@Override
public void onNext(String s) {
Log.e("rx", s);
}
@Override
public void onError(Throwable e) {
Log.e("rx", "error");
}
@Override
public void onComplete() {
Log.e("rx", "complete");
}
};
rxObservable.subscribe(observer); //Запускаем
}
}
执行时,这些行Первый_x
和Второй_x
应该在控制台中显示为混合,如工作所描述的那样.megre()
。但是由于某种原因,执行是按顺序进行的,首先出现了一切Первый_x
,然后才出现了一切 Второй_x
。问题是什么?
原来,
subscribeOn()
formerge()
, 告诉订阅指定的线程在哪个线程中Observables
。为了设置它们应该在哪个线程中执行,您需要为Observable
每个subscribeOn()
.答案在这里和@zRrr 的评论中找到。