Demystify RxJava (2)

About Disposable

Guowei Lv

6 minute read

In this article, we focus on how the dispose system works in RxJava.

First, let’s take a look at the Disposable interface.

public interface Disposable {
    void dispose();
    boolean isDisposed();
}

Not much going on here, basically it says a Disposable can be disposed.

First example we are going to examine is:

Observable.interval(1, TimeUnit.SECONDS)

If you click through the code, the implementation is actually this class ObservableInterval.

First let’s look at the constructor of this class:

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }
    ...

Typical constructor stuff, sets up some internal fields.

The interesting thing happens when it is subscribed, and this function will get run:

/** Simplified */
@Override
public void subscribeActual(Observer<? super Long> observer) {
    // what is this IntervalObserver???
    IntervalObserver is = new IntervalObserver(observer);
    // Well, it must implement Disposable if it can be passed to `onSubscribe()`
    observer.onSubscribe(is);

    // OK, the work is scheduled here
    Scheduler sch = scheduler;
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    // What is setResource() and why we need to do this here?
    is.setResource(d);
}

Well, it is only a few lines but are quite dense and a lot of things that we don’t understand yet.

Let’s start with the IntervalObserver class, and here is the signature:

static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

Wow, a lot is going on here. Let’s look at them one by one.

First, it extends AtomicReference<Disposable>. This means that this class contains a reference to a Disposable object, and the reference can be changed to pointing to another object at runtime, and all operations are thread safe. This behavior is actually important, we will see why later.

Second, it implements Disposable. Wait, we just said that it is a reference to a Disposable, but now it is itself a Disposable as well, isn’t this kind of confusing??? Actually, this is a typical Proxy or Deligate pattern, just in reality, if no one points you in that direction, it is very hard to wrap your head around such code. But now you know, so no big deal.

Last, it is also a Runnable. We notice that the InterevalObserver object is also passed into the Scheduler, so it is no surprise that it implements the Runnable interface.

Notice that even though the class has Observer at the end of the name, but it does not implement the Observer from RxJava. Keep this in mind!

Now, we have a rough idea of things, let’s dig deeper.

Let’s first look at the methods related to Disposable interface.

@Override
public void dispose() {
    DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
    return get() == DisposableHelper.DISPOSED;
}

It delegates the work to this DisposableHelper class, let’s look at that then:

/**
 * Atomically disposes the Disposable in the field if not already disposed.
 * @param field the target field
 * @return true if the current thread managed to dispose the Disposable
 */
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}

Let’s see two cases: 1. When the AtomicReference field is not set yet, so that the underlying disposable it refers to is null. In such case, the field will simply be set to a DISPOSED constant. 2. When the AtomicReference field has a underlying disposable, and it is not the DISPOSED constant. In such case, the field will be set to DISPOSED, and the previously referenced disposable will be disposed.

So basically this helper function knows how to correctly dispose a AtomicReference<Disposable>.

Let’s now look at another important helper function, setOnce():

/**
 * Atomically sets the field to the given non-null Disposable and returns true
 * or returns false if the field is non-null.
 * If the target field contains the common DISPOSED instance, the supplied disposable
 * is disposed. If the field contains other non-null Disposable, an IllegalStateException
 * is signalled to the RxJavaPlugins.onError hook.
 * 
 * @param field the target field
 * @param d the disposable to set, not null
 * @return true if the operation succeeded, false
 */
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
    ObjectHelper.requireNonNull(d, "d is null");
    if (!field.compareAndSet(null, d)) {
        d.dispose();
        if (field.get() != DISPOSED) {
            reportDisposableSet();
        }
        return false;
    }
    return true;
}

The official doc is already very clear. Just I want to highlight one thing: >If the target field contains the common DISPOSED instance, the supplied disposable is disposed

This is an important behavior, we will see why later.

Next, let’s look at the method related to the Runnable interface, there is only one:

@Override
public void run() {
    if (get() != DisposableHelper.DISPOSED) {
        downstream.onNext(count++);
    }
}

This is where the counting is done, it will keep sending the count, if it is not disposed. This makes sense.

Now we have examined the class IntervalObserver in detail, let’s step back a little, and take one more look at the subscribeActual method of ObservableInterval:

/** Simplified */
@Override
public void subscribeActual(Observer<? super Long> observer) {
    IntervalObserver is = new IntervalObserver(observer);
    observer.onSubscribe(is);

    Scheduler sch = scheduler;
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
}

I hope you can understand more this time.

Now there is only one question left unanswered: why we need to use this AtomicReference thing? Or why we need to be able to change the internal Disposable at runtime?

Well, I don’t know 100% why, but this seems to be a pattern followed by RxJava. When the Observable is subscribed, the observer is immediately given a Disposable. Even if later the sitution changes, like some task gets scheduled, we do not need to notify the observer again about this. It’s like I will give you a Disposable object from the beginning, and you can keep it and use it whenever you like, and I handle all the cases internally.

Exercise: What if we dispose immediately in onSubscribe()?

 Observable.interval(1, TimeUnit.SECONDS)
      .subscribe(object : Observer<Long> {
        override fun onSubscribe(d: Disposable) {
          d.dispose()
        }

        override fun onNext(t: Long) {
          Log.d("test", t.toString())
        }

        override fun onError(e: Throwable) {
          Log.d("test", "OnError" + e.message)
        }

        override fun onComplete() {
          Log.d("test", "onComplete")
        }
      })

Well, the anwser is of course the onNext will never get called. Trace down the code and see how is this done.

comments powered by Disqus