Where to Put observeOn in Rxjava2

Where to put observeOn matters!

Guowei Lv

1 minute read

Me: Why I have put the observeOn(AndroidSchedulars.mainThread()) but still onNext() is NOT called in android main thread?!

Rx Master: Show me your code.

Me: Here you are my master. I just want to wait for 5 seconds then call an api:

    override fun onCreate(savedInstanceState: Bundle?) {

        fun networkObservable(): Observable<String> {
            return Observable.just("test").subscribeOn(Schedulers.io())

        Observable.timer(5, TimeUnit.SECONDS)
            .flatMap { _ -> networkObservable() }
            .subscribe(Consumer {
                Log.d("testtest", it)
                Log.d("testtest", Thread.currentThread().name)

Rx Master: … Read the Doc of observeOn:

Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler}

Me: Ummm… That means the emmissions and notifications(onNext, onError, onComplete) of the timer observable is on main thread. And… OH yes, the flatMap then returns a new observable, and that observable is emmiting on io thread. So I guess I could just move observeOn after flatMap?

Rx Master: Indeed, my young follower. You learned well.


comments powered by Disqus