2015. október 20., kedd

ConnectableObservables (part 1)

Introduction


We learned about constructing cold (i.e., range) and hot observables (i.e., UnicastSubject) but nothing specific so far about how to convert between the two.

Clearly, since subjects are also Observers, one only has to subscribe them to a cold source and let all the child Subscribers subscribe to the subject only.

But why would one do that in the first place? The conversion has one major benefit, namely it makes side-effects in the cold source happen once (per Subject subscribed to it). From a usage perspective, it means that you can reuse the same stream for multiple purposes and not having multiple and likely independent sequences.

For example, if you wanted to work on subsequent elements of the same stream, you could publish it and observe different parts of it through different subscriptions and combine the results:


Observable<Integer> source = Observable.range(1, 10);

ConnectableObservable<Integer> published = source.publish();

Observable<Integer> first = published;
Observable<Integer> second = published.skip(1);

Observable<String> both = first.zipWith(second, 
    (a, b) -> a + "+" + b);

both.subscribe(System.out::println);

published.connect();


Now let's see what ConnectableObservables should do.


ConnectableObservable requirements

ConnectableObservable is an abstract class that extends Observable and requires one extra method to be implemented.

By extending an Observable, it is subject to the same construction difficulties as are Subjects: namely their constructor requires an OnSubscribe callback which can't really access the outer class' methods at construction time so one needs a factory and an intermediate state object.

The second requirement, also coming from Observable, is that subscription should be thread safe and the implementation should allow it to happen any time, before, during and after the ConnectableObservable "runs".

It may come as a surprise that the extra abstract method isn't connect() but connect(Action1<Subscription> s) instead. The reason for this is due to the synchronous unsubscription possibility with a ConnectableObservable. But when does this come into play?

There are two cases when this feature is essential, one is more public and one is hidden away in certain operators.

The problem with connect() is that if it connects to an underlying cold and synchronous observable, that could run to completion (or never terminate) thus the method never returns. If you didn't have subscribers subscribed to it then those values may be gone forever. In addition, given an infinite synchronous stream, you may attempt to unsubscribe it via the Subscription returned by connect() after a while but then again, connect() never returns.

This comes up quite often with the second case mostly with multicasting operator overloads such as publish(Func1) and replay(Func1). These operators create a ConnectableObservable behind the scenes, run it through the Func1 provided and return a plain Observable which when subscribed to will connect the ConnectableObservable. Now if the source is synchronous and you want to take only a few elements of the returned Observable, the child subscription would never return a Subscription and the whole stream would just keep running.

The solution is to have the second, callback version of connect implemented which calls the callback with a subscription before it connects and thus allows it to be unsubscribed in sequence.

Finally, connection and disconnection has to be idempotent. It means that calling connect twice on a running stream should do nothing as well as calling unsubscribe on such stream twice should unsubscribe a running stream once. One extra thing to be careful with unsubscription is that if one unsubscribes a stream the connects again, the Subscription from the first connection should not affect the state of the second connection.

To summarize, ConnectableObservable has to

  • be thread safe when subscribing to it at any time and from any thread,
  • allow synchronous unsubscription at any time and from any thread and
  • be idempotent in respect of connect and disconnect (unsubscribe).

A basic implementation

Given what we know about ConnectableObservables and Subjects so far, it may come trivial to implement the former with the help of the latter. Let's implement a ConnectableObservable which takes a subject of your chosing and "publishes" a source Observable's values through it.

public final class Multicast<T>
extends ConnectableObservable<T> {
    
    final Observable<T> source;
    final Subject<T, T> subject;
    
    final AtomicReference<Subscription> subscription;         // (1)
    
    public Multicast(Observable<T> source, 
            Subject<T, T> subject) {
        super(s -> {
            subject.subscribe(s);                             // (2)
        });
        this.source = source;
        this.subject = subject;
        this.subscription = new AtomicReference<>();
    }
    
    @Override
    public void connect(
        Action1<? super Subscription> connection) {
        // implement
    }
}

So far, nothing special. We take a source observable, a subject and we will hold the current connection in an AtomicReference instance (1). The OnSubscribe logic is this case is simple and there is no need for the factory approach unlike UnicastSubject: for each incoming subscriber, we subscribe them to the subject directly (2).

The body of the connect() method is a bit more involved but not too complicated:


@Override
public void connect(Action1<? super Subscription> connection) {
    for (;;) {
        Subscription s = subscription.get();                   // (1)
        if (s != null) {
            connection.call(s);                                // (2)
            return;
        }
        
        Subscriber<T> subscriber = new Subscriber<T>() {       // (3)
            @Override
            public void onNext(T t) {
                subject.onNext(t);
            }
            
            @Override
            public void onError(Throwable e) {
                subject.onError(e);
            }
            
            @Override
            public void onCompleted() {
                subject.onCompleted();
            }
        };
        
        subscriber.add(Subscriptions.create(() -> {            // (4)
            subscription.set(null);
        }));
        
        if (subscription.compareAndSet(null, subscriber)) {    // (5)
            connection.call(subscriber);                       // (6)
            
            source.subscribe(subscriber);                      // (7)
            
            return;
        }
    }
}

The implementation is basically a CAS loop:

  1. We keep the current connection's Subscription in the subscription field and if it is not null, it means there is an active connection.
  2. Given an active connection, we simply call the action with it.
  3. Otherwise, there seems to be no active connection and we have to establish one. You may think, why not subscribe the subject directly to the source? The reason is the requirement of synchronous unsubscription: the call to subscribe() returns a Subscription too late, thus we need a Subscription before that. The Subscriber we create will forward events and also present this unsubscription possibility (remember, Subscriber extends Subscription).
  4. When the connection, our Subscriber is unsubscribed, we have to set the subscription field back to null, allowing the next connect() to happen.
  5. To achieve idempotence with a connect, we CAS in a subscriber in place of a null value. If it fails, due to a concurrent call to connect(), the loop resumes at (1).
  6. If the CAS succeeded, we first call the callback with our Subscriber which will allow synchronous cancellation of the connection.
  7. Finally, we subscribe our Subscriber to the source and quit.

Limitations of the basic implementation

The basic implementation seem to work but has some limitations.

Side note: With this blog, I hope to teach the reader how to detect bugs and shortcomings in operators; this is why some examples are not prepared for everything up front.

The first limitation is that if the source terminates, the clearing of the subscription may happen sometime in the future (through a SafeSubscriber) or not at all. The solution is to clear the subscription in the onError and onCompleted methods of our Subscriber, but we can't use set(null) there. We have to conditionally clear it there and in the regular unsubscription path because it is possible that subscription is cleared by the other party (termination vs. unsubscription race). In short, the methods should be changed like this:


    // ...
    @Override
    public void onError(Throwable e) {
        subject.onError(e);
        subscription.compareAndSet(this, null);
    }

    @Override
    public void onCompleted() {
        subject.onCompleted();
        subscription.compareAndSet(this, null);
    }
    // ...

subscriber.add(Subscriptions.create(() -> {
    subscription.compareAndSet(subscriber, null);
}));

In all three places, the clearing only happens if the current connection is still the known subscriber. This way, if there is a termination by any means followed by a reconnection, an unsubscribe() call to an old connection won't affect the new connection.

The second limitation is that once the source runs to termination, the Subject will come to its terminal state as well. New connection attempts will disconnect immediately and child Subscribers will only receive a terminal event (with the standards Subjects of RxJava).

Most likely this isn't what the business logic dictates, therefore, we have to change the parametrization of the Multicast so we can get a fresh Subject for any new connection. I'll show an implementation of this in the next subsection but before that, let's see the final limitation of the basic implementation.

The final limitation is that there is no request coordination: our Subscriber and the Subject itself will run in unbounded mode and ignore all backpressure requests. Since the standard RxJava 1.x Subjects don't support backpressure, we may run into MissingBackpressureExceptions somewhere in the downstream. Although 2.x Subjects are backpressure-aware, 2.x PublishSubject will still throw MissingBackpressureException if the child subscriber can't keep up and 2.x ReplaySubject does effectively unbounded buffering (similar to onBackpressureBuffer)

The resolution is a larger step up on the complexity ladder and will be detailed in the next part of this series about ConnectableObservables.


Fresh Subject on connect

The solution to the lack of reusability with the basic implementation can be solved by using a supplier function instead of a Subject instance and call it just before the connection happens.

This, however, creates another problem. Because the subject doesn't exist the time the constructor sets the OnSubscribe callback, we somehow have to remember the Subscribers that have attempted to subscribe when there was no connection yet but then subscribe to the actual Subject when there is a connection.

First, we now have to manage a more complex state. I'll create a Connection class that represents the state of a connection:


static final class Connection<T> {
    Subject<T, T> subject;                           // (1)
    List<Subscriber<? super T>> subscribers;         // (2)
    boolean connect;                                 // (3)
    final SerialSubscription parent;                 // (4)
    
    public Connection() {
        this.subscribers = new ArrayList<>();
        this.parent = new SerialSubscription();
    }
    
    public void setSubject(Subject<T, T> subject) {  // (5)
        // implement
        
    }
    
    public void subscribe(Subscriber<? super T> s) { // (6)
        // implement
    }
    
    public boolean tryConnect() {                    // (7)
        // implement
    }
}

Let's see its parts:


  1. We have to store a Subject so subscribers can be subscribed to it any time.
  2. Since the subject doesn't exist until connect() is called, we have to store the early birds in a list and subscribe them all once the subject becomes available.
  3. The connection has to happen once per Connection object (termination or unsubscription then has to create an entirely new Connection object, see later).
  4. We have to keep reference to the subscription to the source Observable. However, we can't just store a Subscriber because the connection process may longer at which a concurrent connection might found that reference to be still null (unlike the basic example where the Subscription was atomically established). The container is non null and ensures proper unsubscription on arrival if necessary.
  5. We have to set a Subject once available and subscribe all early bird Subscribers to it.
  6. We also have to provide a way for the OnSubscribe in the constructor to add new subscribers properly, depending on the current state of the connection.
  7. Finally, connection has to happen once per Connection object which is managed by the tryConnect() method.

The implementation of the methods (5-7) are relatively simple but need some short explanation:

public void setSubject(Subject<T, T> subject) {
    List<Subscriber<? super T>> list;
    synchronized (this) {
        this.subject = subject;
        list = subscribers;
        subscribers = null;
    }
    for (Subscriber<? super T> s : list) {
        subject.subscribe(s);
    }
}

In this method, the subject is set while holding a lock on this. The reason for it is that to prevent concurrent subscribe() calls (see below) to happen while the Subject is set. This way, early bird Subscribers will be subscribed to the subject in this method whereas late subscribers will be directly subscribed to the subject once the unlock happens, skipping the list entirely. Subscribing the early birds outside the lock reduces the likelihood of deadlock and also doesn't block the concurrent subscribers while the loop is running.

Next comes the subscribe() method.


public void subscribe(Subscriber<? super T> s) {
    Subject<T, T> subject;
    synchronized (this) {
        subject = this.subject;
        if (subject == null) {
            subscribers.add(s);
            return;
        }
    }
    subject.subscribe(s);
}

What happens here is that, atomically, if the subject is still null (i.e., connect() hasn't been called yet), we add the subscriber to the inner list. If, however, the subject is non-null, we subscribe the Subscriber directly to it. One optimization here would be to have the subject field volatile and do a double-checked locking (since subject will be only set once).

Finally, the tryConnect() method is pretty simple:

public boolean tryConnect() {
   synchronized (this) {
        if (!connect) {
            connect = true;
            return true;
        }
        return false;
    }
}

Atomically check if the connection is false and switch it to true. If this switch happened, return true, otherwise return false. The former will trigger the connection logic while the latter will simply "return" the parent SerialSubscription in connect() (detailed later).

I'll call the new ConnectableObservable MulticastSupplier and it will have the following skeleton:


public final class MulticastSupplier<T> 
extends ConnectableObservable<T> {
    public static <T> MulticastSupplier<T> create(        // (1)
            Observable<T> source, 
            Supplier<Subject<T, T>> subjectSupplier) {
        AtomicReference<Connection<T>> conn = 
            new AtomicReference<>(new Connection<>());    // (2)
        
        return new MulticastSupplier<>(
            source, subjectSupplier, conn);
    }
    

    
    final Observable<T> source;
    final Supplier<Subject<T, T>> subjectSupplier;
    final AtomicReference<Connection<T>> connection;      // (3)
    
    
    protected MulticastSupplier(Observable<T> source, 
            Supplier<Subject<T, T>> subjectSupplier,
            AtomicReference<Connection<T>> connection) {
        super(s -> {
            Connection<T> conn = connection.get();        // (4)
            conn.subscribe(s);
        });
        this.source = source;
        this.subjectSupplier = subjectSupplier;
        this.connection = connection; 
    }

    void replaceConnection(Connection<T> conn) {          // (5)
        Connection<T> next = new Connection<>();
        connection.compareAndSet(conn, next);
    }
    
    @Override
    public void connect(Action1<? super Subscription> connection) {
        // implement
    }
}

Let's see why this looks like as it is:

  1. Since the constructor doesn't allow access to instance field before super is called, we have to create the Connection state before the instantiation of MulticastSupplier so both its body and the OnSubscribe callback can access it. 
  2. Since the connection is not constant (can be reconnected any number of times), we have to use a holder for the connection, an AtomicReference in this case. It will come in handy when the state changes have to happen atomically. In addition, since Subscribers have to be remembered before a connection is established, we can't use a null state anymore.
  3. The same AtomicReference has to be accessible from connect() later on.
  4. The OnSubscribe callback is slightly modified: we retrieve the current Connection instance and call subscribe() on it. If it is connected, it will go straight and subscribe to the underlying Subject, otherwise the Subscriber will be remembered.
  5. Finally, if either a disconnect or source termination happens, we have to replace the old connection with a fresh one so connect() can start again. The implementation first creates a new (empty) Connection and tries to CAS it in, replacing the known Connection when the last connect() has established it. This will prevent old Subscriptions to disconnect newer connections.
Finally, let's see the connect() implementation:


@Override
public void connect(
        Action1<? super Subscription> connection) {

    Connection<T> conn = this.connection.get();          // (1)
    
    if (conn.tryConnect()) {                             // (2)
        Subject<T, T> subject = subjectSupplier.get();
        
        Subscriber<T> parent = new Subscriber<T>() {     // (3)
            @Override
            public void onNext(T t) {
                subject.onNext(t);
            }
            
            @Override
            public void onError(Throwable e) {           // (4)
                subject.onError(e);
                replaceConnection(conn);
            }
            
            @Override
            public void onCompleted() {
                subject.onCompleted();
                replaceConnection(conn);
            }
        };
        
        conn.parent.set(parent);                         // (5)
        
        parent.add(Subscriptions.create(() -> {          // (6)
            replaceConnection(conn);
        }));
        
        conn.setSubject(subject);                        // (7)
        
        connection.call(conn.parent);                    // (8)
        
        source.subscribe(parent);                        // (9)
    } else {
        connection.call(conn.parent);                    // (10)
    }
}

The implementation no longer has a CAS loop because the atomic connection requirement is handled a bit differently:


  1. First, we retrieve the current Connection object from the AtomicReference.
  2. If not connected, set the state to connected and perform the connection logic, otherwise, go to (10).
  3. Once we got a Subject, we create our wrapper Subscriber as before that references the Subject.
  4. We'd like to disconnect eagerly once a terminal event has been received, therefore, we call replaceConnection() with the known connection object. Depending on what kind of races one wish to tolerate, you can swap the call on the subject with the replacement: this way, subscribers racing with the termination event will be added to the next connection instead of the current.
  5. We then set the parent onto the connection. If there was a concurrent disconnect, this will unsubscribe the parent Subscriber immediately. Depending on how a connect-disconnect race should be handled, one can quit if isUnsubscribed() is true, don't even try to subscribe to the source but return a unsubscribed Subscription or retry the connection attempt. The latter requires a similar loop as in the basic example.
  6. We set the unsubscribe action to replace the connection.
  7. We set the subject on the current connection, which may trigger the early birds' subscription to the Subject.
  8. Before the parent Subscriber is connected, we call the callback with the SerialSubscription (not the Subscriber!) to allow synchronous cancellation.
  9. Then we subscribe the parent Subscriber to the source Observable.
  10. If the tryConnect() returned false, we simply call the callback with the SerialSubscription of the current connection. Note here that this has to be non-null and thus the need of indirection around the parent Subscriber of (3).


Conclusion

In this blog post, I've detailed the requirements of ConnectableObservables and showed two simpler variants of implementing one.

However, one would expect request coordination from a ConnectableObservable which neither of the Multicast or MulticastSelector supports.

Looking at them wasn't in vain, because they feature construction approaches that will come in handy with the next part of this mini-series.

So far, operators and classes were low to medium complexity due to the fact that the event and method call "streams" were not really stepping on each other. Next, however, we step up on the complexity ladder and look at how one can coordinate requests within a ConnectableObservable.

This is, in my opinion, a master-level implementation task and if understood, it opens the door to the most complex operator implementations in RxJava. Stay tuned!


Nincsenek megjegyzések:

Megjegyzés küldése