2015. május 13., szerda

Operator concurrency primitives: producers (part 4)

Introduction


After implementing a relatively complicated producer, it is time to do something simpler and work out an optimization to the introductory RangeProducer: an emission fast-path for essentially unlimited requests.

In RxJava, initial request values of Long.MAX_VALUE is treated like an infinite request, and triggers many fast-path emissions, just like in the old times before backpressure. In such cases, there is no real need to account for requests and value production any more (but unsubscription still has to be handled).

A Fast-path for the RangeProducer

I'm going to omit all but the request() method in the listing as the rest of the RangeProducer class is not affected.


    // ... same as before
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        if (BackpressureUtils.getAndAddRequest(this, n) != 0) {
            return;
        }
        if (n == Long.MAX_VALUE) {                                // (1)
            if (child.isUnsubscribed()) {
                return;
            }
            int i = index;                                        // (2)
            int k = remaining;
            while (k != 0) {
                child.onNext(i);
                if (child.isUnsubscribed()) {                     // (3)
                    return;
                }
                i++;                                              // (4)
                k--;
            }
            if (!child.isUnsubscribed()) {
                child.onCompleted();
            }
            return;                                               // (5)
        }

        long r = n;
        for (;;) {
    // ... same as before


The fast-path works as follows:

  1. We check if the requested amount is Long.MAX_VALUE once the requester thread has successfully made the transition from 0 to it. If n is less than this, we continue with the original slow-path.
  2. We read the status of the producer into local variables. Note that if emissions happened on the slow path, the values will reflect the continuation point and nicely resume from there in case of an infinite request comes in (and wins).
  3. We eagerly check if the child subscriber is still interested in receiving values.
  4. We increment the index counter and decrement the remaining counter.
  5. After all values and the completion event has been emitted, we simply quit and do not adjust the underlying request amount at all. This makes sure calls to request after a finished run can never reach the slow- and fast-path loops due to BackpressureUtils.getAndAddRequest won't ever transition from 0 to some value.
Note that a small request followed by a request of Long.MAX_VALUE doesn't really happen in RxJava. Operators either start with backpressure on or off, so one doesn't need to worry about getting stuck in the slow path because a concurrent infinite request arrives during the iteration and r = addAndGet(-e); may decrement it to below the max value.

Implementing an array-backed producer

RxJava features the from() operator with an overload where you can pass in an array of some T, but internally, the array gets wrapped into a list and then iterated over in the producer. This can seem unnecessary since given the known-length array, one could get rid of the creation of Iterators and index into the array directly. (You may think JIT will optimize this out and make the iterator stack-allocated, but the arbitrary code down the onNext() path might prevent this). Alternatively, since from() doesn't support primitive arrays, one may need to write an operator that can do it.

The structure of the RangeProducer is a fine candidate for this purpose: we can use index to keep track the current position in the array and check it against the length of the array to determine when to quit.


public final class ArrayProducer 
extends AtomicLong implements Producer {
    /** */
    private static final long serialVersionUID = 1L;
    final Subscriber child;
    final int[] array;                                        // (1)
    int index;
    public ArrayProducer(Subscriber child, 
            int[] array) {
        this.child = child;
        this.array = array;
    }
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        if (BackpressureUtils
                .getAndAddRequest(this, n) != 0) {
            return;
        }
        final int[] a = this.array;
        final int k = a.length;                               // (2)
        if (n == Long.MAX_VALUE) {
            if (child.isUnsubscribed()) {
                return;
            }
            int i = index;
            while (i != k) {                                  // (3)
                child.onNext(a[i]);
                if (child.isUnsubscribed()) {
                    return;
                }
                i++;
            }
            if (!child.isUnsubscribed()) {
                child.onCompleted();
            }
            return;
        }
        long r = n;
        for (;;) {
            if (child.isUnsubscribed()) {
                return;
            }
            int i = index;
            int e = 0;
            
            while (r > 0 && i != k) {
                child.onNext(a[i]);
                if (child.isUnsubscribed()) {
                    return;
                }
                i++;
                if (i == k) {                               // (4)
                    child.onCompleted();
                    return;
                }
                e++;
                r--;
            }
            index = i;
            
            r = addAndGet(-e);
            
            if (r == 0) {
                return;
            }
        }
    }
}

int[] array = new int[200];
Observable<Integer> source = Observable.create(child -> {
    if (array.length == 0) {
        child.onCompleted();
        return;
    }
    ArrayProducer ap = new ArrayProducer(child, array);
    child.setProducer(ap);
});
source.subscribe(System.out::println);


  1. The index field stays but we need to hold onto the array itself and don't need a remaining counter: the index value will run up to the length of the array.
  2. The condition to complete the run is when the local index reaches the lenght of the array, stored locally in the constant k. Note that we don't want to decrement it this time.
  3. In the fast-path, we loop if index has not yet reached the end of the array.
  4. On the slow-path, however, after incrementing the local index, we immediately check if the end of the array has been reached and emit an onCompleted(). Note that the slow path still wouldn't work with an empty array here.

Conclusion

In this blog post, I've shown how one can add fast-path to simpler producers such as RangeProducer and how it can be turned into a producer that is capable of emitting elements from a (primitive) array without the extra work and allocation.

So far, I was dealing with producers that either knew exactly how many elements they whish to emit or producers that didn't really know or care about it. However, operators exist that have to deal with multiple sources and as such, with multiple other producers while pretending the child subscriber is only dealing with a single one. In the next post about producers, I'm going to describe what I call the producer-arbiter producer which gives the opportunity to switch producers in a way that backpressure request accounting is not violated in the process.



Nincsenek megjegyzések:

Megjegyzés küldése