Sorting algorithm, Java stream

Asked

Viewed 259 times

6

When working with Stream I currently use the Sorted method to sort, example:

    Stream<Integer> stream = Stream.of(3, 2, 1);
    stream.sorted(); // saida será 123

What is the sorting algorithm for this method?

Have more efficient ways to sort using Stream?

  • 4

    It’s probably Timsort that’s used, but I can’t guarantee that information

1 answer

11


What is the sorting algorithm for this method?

I’m in Java 11.

First, let’s see how the method sorted() is declared on the interface Stream:

Stream<T> sorted();

Ok, that doesn’t help us. It’s an abstract method without default implementation. So to find the implementation, let’s look at the code of Stream.of(...):

@SafeVarargs
@SuppressWarnings("varargs") // Creating a stream from an array is safe
public static<T> Stream<T> of(T... values) {
    return Arrays.stream(values);
}

Now, the code of Arrays.stream(...):

public static <T> Stream<T> stream(T[] array) {
    return stream(array, 0, array.length);
}

And then:

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

Note that the values are now stored in a Spliterator.

Now the StreamSupport.stream(...):

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

The class java.util.stream.ReferencePipeline is not a public class, but its code can still be easily seen inside the JDK:

abstract class ReferencePipeline<P_IN, P_OUT>
        extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
        implements Stream<P_OUT>  {

So let’s see the inner class Head, which is finally the implementation of Stream we seek:

static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

The inner class Head inherits from the outside class ReferencePipeline and does not override the method sorted. So, let’s look at the superclass (ReferencePipeline):

@Override
public final Stream<P_OUT> sorted() {
    return SortedOps.makeRef(this);
}

The class SortedOps is also not public. Let’s see what method this is makeRef:

static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
    return new OfRef<>(upstream);
}

The class OfRef is internal to the SortedOps:

private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
    /**
     * Comparator used for sorting
     */
    private final boolean isNaturalSort;
    private final Comparator<? super T> comparator;

    /**
     * Sort using natural order of {@literal <T>} which must be
     * {@code Comparable}.
     */
    OfRef(AbstractPipeline<?, T, ?> upstream) {
        super(upstream, StreamShape.REFERENCE,
              StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
        this.isNaturalSort = true;
        // Will throw CCE when we try to sort if T is not Comparable
        @SuppressWarnings("unchecked")
        Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
        this.comparator = comp;
    }

    /**
     * Sort using the provided comparator.
     *
     * @param comparator The comparator to be used to evaluate ordering.
     */
    OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
        super(upstream, StreamShape.REFERENCE,
              StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
        this.isNaturalSort = false;
        this.comparator = Objects.requireNonNull(comparator);
    }

    @Override
    public Sink<T> opWrapSink(int flags, Sink<T> sink) {
        Objects.requireNonNull(sink);

        // If the input is already naturally sorted and this operation
        // also naturally sorted then this is a no-op
        if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
            return sink;
        else if (StreamOpFlag.SIZED.isKnown(flags))
            return new SizedRefSortingSink<>(sink, comparator);
        else
            return new RefSortingSink<>(sink, comparator);
    }

    @Override
    public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
                                             Spliterator<P_IN> spliterator,
                                             IntFunction<T[]> generator) {
        // If the input is already naturally sorted and this operation
        // naturally sorts then collect the output
        if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
            return helper.evaluate(spliterator, false, generator);
        }
        else {
            // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
            T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
            Arrays.parallelSort(flattenedData, comparator);
            return Nodes.node(flattenedData);
        }
    }
}

Note the Arrays.parallelSort(flattenedData, comparator);. This is where some ordination is being made:

@SuppressWarnings("unchecked")
public static <T> void parallelSort(T[] a, Comparator<? super T> cmp) {
    if (cmp == null)
        cmp = NaturalOrder.INSTANCE;
    int n = a.length, p, g;
    if (n <= MIN_ARRAY_SORT_GRAN ||
        (p = ForkJoinPool.getCommonPoolParallelism()) == 1)
        TimSort.sort(a, 0, n, cmp, null, 0, 0);
    else
        new ArraysParallelSortHelpers.FJObject.Sorter<>
            (null, a,
             (T[])Array.newInstance(a.getClass().getComponentType(), n),
             0, n, 0, ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
             MIN_ARRAY_SORT_GRAN : g, cmp).invoke();
}

And then we have the line TimSort.sort(a, 0, n, cmp, null, 0, 0); denouncing that the algorithm used is Timsort in some cases (in the case of if). If it falls on else, let’s see that ArraysParallelSortHelpers.FJObject.Sorter:

static final class FJObject {
    static final class Sorter<T> extends CountedCompleter<Void> {
        static final long serialVersionUID = 2446542900576103244L;
        final T[] a, w;
        final int base, size, wbase, gran;
        Comparator<? super T> comparator;
        Sorter(CountedCompleter<?> par, T[] a, T[] w, int base, int size,
               int wbase, int gran,
               Comparator<? super T> comparator) {
            super(par);
            this.a = a; this.w = w; this.base = base; this.size = size;
            this.wbase = wbase; this.gran = gran;
            this.comparator = comparator;
        }
        public final void compute() {
            CountedCompleter<?> s = this;
            Comparator<? super T> c = this.comparator;
            T[] a = this.a, w = this.w; // localize all params
            int b = this.base, n = this.size, wb = this.wbase, g = this.gran;
            while (n > g) {
                int h = n >>> 1, q = h >>> 1, u = h + q; // quartiles
                Relay fc = new Relay(new Merger<>(s, w, a, wb, h,
                                                  wb+h, n-h, b, g, c));
                Relay rc = new Relay(new Merger<>(fc, a, w, b+h, q,
                                                  b+u, n-u, wb+h, g, c));
                new Sorter<>(rc, a, w, b+u, n-u, wb+u, g, c).fork();
                new Sorter<>(rc, a, w, b+h, q, wb+h, g, c).fork();;
                Relay bc = new Relay(new Merger<>(fc, a, w, b, q,
                                                  b+q, h-q, wb, g, c));
                new Sorter<>(bc, a, w, b+q, h-q, wb+q, g, c).fork();
                s = new EmptyCompleter(bc);
                n = q;
            }
            TimSort.sort(a, b, b + n, c, w, wb, n);
            s.tryComplete();
        }
    }

And again, we see the TimSort.sort(a, b, b + n, c, w, wb, n); there.

Conclusion: The algorithm used is the Timsort.

Have more efficient ways to sort using Stream?

This depends very much on the distribution of the data. Without assuming anything about data distribution, the Timsort algorithm is an excellent stable algorithm with processing time Θ(n log n) and that tends to beat other algorithms that are also Θ(n log n). Timsort is based on Mergesort, but it tries to take advantage of partial data sorting whenever possible to optimize some things and can selectively use Insertionsort in certain small parts where this would be faster than Mergesort. This algorithm also takes advantage of several other possible optimizations.

That said, it is difficult to have a more efficient way of ordering a Stream whichever than in this way it serves to the general case. It’s hard to find any sort algorithm that can beat Timsort in the general case. Also, the given implementation still tries to parallelize this Timsort application if the array is large enough (this is what happens in the ArraysParallelSortHelpers.FJObject.Sorter) to try to get an even better performance.

However, if your Stream is not a Stream any and yes anything you know beforehand that has a specific structure or ordering from which you can take advantage for a better ordering, then in these specific cases, another form of better ordering may be possible. But there, each case is a case.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.