Given an array $ A = (a_1, a_2, \dots, a_n)$ , the number of inversions in $ A$ is the number of index pairs $ i,j$ ($ i < j$ ) such that $ a_i > a_j$ . We can find the number of inversions in an array by running it through a slightly modified mergesort, which runs in $ \Theta(n \log n)$ . Even better, we can deploy a natural mergesort which runs in $ \Theta(n \log m)$ time where $ m$ is the number of runs in the input array. As a reminder, a run is a contiguous subsequence that is ascending or strictly descending. Needless to say, $ m \leq \lceil n/2 \rceil$ . Without further ado, let’s proceed to code:
BruteForceInversionCounter.java
package net.coderodde.util; import java.util.Comparator; import java.util.Objects; import static net.coderodde.util.Utils.NATURAL_ORDER; import static net.coderodde.util.Utils.checkIndices; /** * This class implements a brute force inversion counting algorithm that runs in * quadratic time. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public final class BruteForceInversionCounter { public static <T> int count(T[] array, int fromIndex, int toIndex, Comparator<? super T> comparator) { Objects.requireNonNull(array); Objects.requireNonNull(comparator); checkIndices(array.length, fromIndex, toIndex); int inversions = 0; for (int i = fromIndex; i < toIndex; ++i) { for (int j = i + 1; j < toIndex; ++j) { if (comparator.compare(array[i], array[j]) > 0) { inversions++; } } } return inversions; } public static <T> int count(T[] array, int fromIndex, int toIndex) { return count(array, fromIndex, toIndex, NATURAL_ORDER); } public static <T> int count(T[] array, Comparator<? super T> comparator) { Objects.requireNonNull(array); return count(array, 0, array.length, comparator); } public static <T> int count(T[] array) { return count(array, NATURAL_ORDER); } private BruteForceInversionCounter() {} }
MergesortInversionCounter.java
package net.coderodde.util; import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import static net.coderodde.util.Utils.NATURAL_ORDER; import static net.coderodde.util.Utils.checkIndices; /** * This class implements a modification of merge sort that sorts an input array * range and returns the number of inversions in the input range. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public final class MergesortInversionCounter { public static <T> int count(T[] array, int fromIndex, int toIndex, Comparator<? super T> comparator) { Objects.requireNonNull(array); checkIndices(array.length, fromIndex, toIndex); int rangeLength = toIndex - fromIndex; if (rangeLength < 2) { return 0; } T[] aux = Arrays.copyOfRange(array, fromIndex, toIndex); return count(aux, array, 0, fromIndex, rangeLength, comparator); } private static <T> int count(T[] sourceArray, T[] targetArray, int sourceOffset, int targetOffset, int rangeLength, Comparator<? super T> comparator) { if (rangeLength < 2) { return 0; } int halfRangeLength = rangeLength >>> 1; int inversions = count(targetArray, sourceArray, targetOffset, sourceOffset, halfRangeLength, comparator); inversions += count(targetArray, sourceArray, targetOffset + halfRangeLength, sourceOffset + halfRangeLength, rangeLength - halfRangeLength, comparator); return inversions + merge(sourceArray, targetArray, sourceOffset, targetOffset, halfRangeLength, rangeLength - halfRangeLength, comparator); } public static <T> int count(T[] array, int fromIndex, int toIndex) { return count(array, fromIndex, toIndex, NATURAL_ORDER); } public static <T> int count(T[] array, Comparator<? super T> comparator) { Objects.requireNonNull(array); return count(array, 0, array.length); } public static <T> int count(T[] array) { return count(array, NATURAL_ORDER); } private static <T> int merge(T[] sourceArray, T[] targetArray, int sourceOffset, int targetOffset, int leftRunLength, int rightRunLength, Comparator<? super T> comparator) { int inversions = 0; int leftRunIndex = sourceOffset; int leftRunEndIndex = sourceOffset + leftRunLength; int rightRunIndex = sourceOffset + leftRunLength; int rightRunEndIndex = rightRunIndex + rightRunLength; int targetIndex = targetOffset; while (leftRunIndex < leftRunEndIndex && rightRunIndex < rightRunEndIndex) { if (comparator.compare(sourceArray[rightRunIndex], sourceArray[leftRunIndex]) < 0) { inversions += leftRunEndIndex - leftRunIndex; targetArray[targetIndex++] = sourceArray[rightRunIndex++]; } else { targetArray[targetIndex++] = sourceArray[leftRunIndex++]; } } System.arraycopy(sourceArray, leftRunIndex, targetArray, targetIndex, leftRunEndIndex - leftRunIndex); System.arraycopy(sourceArray, rightRunIndex, targetArray, targetIndex, rightRunEndIndex - rightRunIndex); return inversions; } private MergesortInversionCounter() {} }
NaturalMergesortInversionCounter.java
package net.coderodde.util; import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import static net.coderodde.util.Utils.NATURAL_ORDER; import static net.coderodde.util.Utils.checkIndices; /** * This class implements a modification of the natural mergesort that counts * inversion in the input array range. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public final class NaturalMergesortInversionCounter { public static <T> int count(T[] array, int fromIndex, int toIndex) { return count(array, fromIndex, toIndex, NATURAL_ORDER); } public static <T> int count(T[] array) { Objects.requireNonNull(array); return count(array, 0, array.length); } public static <T> int count(T[] array, Comparator<? super T> comparator) { Objects.requireNonNull(array); return count(array, 0, array.length, comparator); } public static <T> int count(T[] array, int fromIndex, int toIndex, Comparator<? super T> comparator) { Objects.requireNonNull(array); Objects.requireNonNull(comparator); checkIndices(array.length, fromIndex, toIndex); int rangeLength = toIndex - fromIndex; if (rangeLength < 2) { return 0; } RunLengthQueueBuilder<T> runLengthQueueBuilder = new RunLengthQueueBuilder<>(array, fromIndex, toIndex, comparator); RunLengthQueue runLengthQueue = runLengthQueueBuilder.run(); T[] bufferArray = Arrays.copyOfRange(array, fromIndex, toIndex); T[] sourceArray; T[] targetArray; int sourceOffset; int targetOffset; int mergePasses = getNumberOfMergePasses(runLengthQueue.size()); if ((mergePasses & 1) == 1) { // Odd amount of merge passes over the entire input array range. // Set the buffer array as the source array so that the sorted // result ends in in the input array. sourceArray = bufferArray; targetArray = array; sourceOffset = 0; targetOffset = fromIndex; } else { sourceArray = array; targetArray = bufferArray; sourceOffset = fromIndex; targetOffset = 0; } int runsLeftInCurrentMergePass = runLengthQueue.size(); int offset = 0; int inversions = 0; // While there are runs to merge, iterate: while (runLengthQueue.size() > 1) { int leftRunLength = runLengthQueue.dequeue(); int rightRunLength = runLengthQueue.dequeue(); inversions += merge(sourceArray, targetArray, sourceOffset + offset, targetOffset + offset, leftRunLength, rightRunLength, comparator); runLengthQueue.enqueue(leftRunLength + rightRunLength); runsLeftInCurrentMergePass -= 2; offset += leftRunLength + rightRunLength; switch (runsLeftInCurrentMergePass) { case 1: int lastRunLength = runLengthQueue.dequeue(); // In the target array, this 'unmarried' run might be // in the form of two unmerged runs. System.arraycopy(sourceArray, sourceOffset + offset, targetArray, targetOffset + offset, lastRunLength); runLengthQueue.enqueue(lastRunLength); // FALL THROUGH! case 0: runsLeftInCurrentMergePass = runLengthQueue.size(); offset = 0; T[] tmpArray = sourceArray; sourceArray = targetArray; targetArray = tmpArray; int tmpOffset = sourceOffset; sourceOffset = targetOffset; targetOffset = tmpOffset; break; } } return inversions; } /** * This static inner class implements an algorithm for scanning the input * array range and constructing a run length queue. * * @param <T> the array component type. */ private static final class RunLengthQueueBuilder<T> { /** * The array holding the range to sort. */ private final T[] inputArray; /** * The starting inclusive index into the array range to scan for runs. */ private final int fromIndex; /** * The ending exclusive index into the array range to scan for runs. */ private final int toIndex; /** * The array component comparator. */ private final Comparator<? super T> comparator; /** * The index to the first array component belonging to the run currently * being scanned. */ private int head; /** * The smaller index into the array component pair currently scanned. */ private int left; /** * The larger index into the array component pair currently scanned. */ private int right; /** * The index (inclusive) of the very last array component in the input * array range. */ private final int last; /** * Indicates whether the previous run was descending. We need this since * after reversing a descending run, it may turn out that this run may * be simply extended by the following run. */ private boolean previousRunWasDescending = false; /** * The run length queue being built. */ private final RunLengthQueue runLengthQueue; RunLengthQueueBuilder(T[] inputArray, int fromIndex, int toIndex, Comparator<? super T> comparator) { this.inputArray = inputArray; this.fromIndex = fromIndex; this.toIndex = toIndex; this.comparator = comparator; this.left = fromIndex; this.right = fromIndex + 1; this.last = toIndex - 1; int rangeLength = toIndex - fromIndex; this.runLengthQueue = new RunLengthQueue((rangeLength >>> 1) + 1); } /** * Builds an entire run length queue over the input array range. * * @return a run length queue. */ RunLengthQueue run() { while (left < last) { head = left; if (comparator.compare(inputArray[left++], inputArray[right++]) <= 0) { // The next run is ascending: scanAscendingRun(); } else { // The next run is descending: scanDescendingRun(); } ++left; ++right; } handleLastElement(); return runLengthQueue; } // Adds a recently scanned run to the run queue. private void addRun() { if (previousRunWasDescending) { if (comparator.compare(inputArray[head - 1], inputArray[head]) <= 0) { runLengthQueue.extendLastRun(right - head); } else { runLengthQueue.enqueue(right - head); } } else { runLengthQueue.enqueue(right - head); } } // Scans an ascending run. private void scanAscendingRun() { while (left != last && comparator.compare(inputArray[left], inputArray[right]) <= 0) { ++left; ++right; } addRun(); previousRunWasDescending = false; } // Scans a strictly descendign run. We require strictness in order to // sort stably. If we were not, the reversal of a descending run would // reorder two possible adjacent array components. private void scanDescendingRun() { while (left != last && comparator.compare(inputArray[left], inputArray[right]) > 0) { ++left; ++right; } reverseRun(); addRun(); previousRunWasDescending = true; } /** * Reverses the recently scanned (descending) run. */ private void reverseRun() { for (int i = head, j = left; i < j; i++, j--) { T tmp = inputArray[i]; inputArray[i] = inputArray[j]; inputArray[j] = tmp; } } // Handles a possible leftover component. private void handleLastElement() { if (left == last) { // Once here, we have a leftover component. if (comparator.compare(inputArray[last - 1], inputArray[last]) <= 0) { runLengthQueue.extendLastRun(1); } else { runLengthQueue.enqueue(1); } } } } /** * This static inner class implements a simple queue of integers used to * represent the run sequence in the array to sort. */ private static final class RunLengthQueue { /** * The minimum capacity of the storage array. */ private static final int MINIMUM_CAPACITY = 256; /** * Stores the run lengths. */ private final int[] storage; /** * The index of the array component that will be dequeued next. */ private int head; /** * The index of the array component to which the next run length will * be set. */ private int tail; /** * The current number of run lengths stored in this queue. */ private int size; /** * A bit mask used for simpler modulo calculation (at least at the level * of hardware). */ private final int mask; /** * Creates a run length queue large enough to hold maximum of * {@code capacity} elements. * * @param capacity the requested capacity, may be increased in the * constructor. */ RunLengthQueue(int capacity) { capacity = ceilPowerOfTwo(capacity); this.mask = capacity - 1; this.storage = new int[capacity]; } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append('['); String separator = ""; for (int i = 0; i < size; ++i) { stringBuilder.append(separator); separator = ", "; stringBuilder.append(storage[i]); } stringBuilder.append(']'); return stringBuilder.toString(); } /** * Enqueues a given run length to the tail of this queue. * * @param runLength the run length to enqueue. */ void enqueue(int runLength) { storage[tail] = runLength; tail = (tail + 1) & mask; size++; } /** * Dequeues the run length from the head of this queue. * * @return the run length stored in the head of this queue. */ int dequeue() { int ret = storage[head]; head = (head + 1) & mask; size--; return ret; } /** * Extends the last run length in the queue by {@code length} units. * * @param length the length of the extension. */ void extendLastRun(int length) { storage[(tail - 1) & mask] += length; } /** * Returns the number of run lengths stored in this queue. * * @return the number of run lengths. */ int size() { return size; } /** * Returns a smallest power of two no less than {@code number}. * * @param number the number to ceil. * @return a smallest power of two no less than {@code number}. */ private static int ceilPowerOfTwo(int number) { int ret = Integer.highestOneBit(number); return ret != number ? (ret << 1) : ret; } } /** * Computes the required number of merge passes needed to sort an input * array range containing {@code runs} runs. * * @param runs the number of runs in the input array range. * @return the number of required merge passes. */ private static int getNumberOfMergePasses(int runs) { return 32 - Integer.numberOfLeadingZeros(runs - 1); } private static <T> int merge(T[] sourceArray, T[] targetArray, int sourceOffset, int targetOffset, int leftRunLength, int rightRunLength, Comparator<? super T> comparator) { int leftRunIndex = sourceOffset; int rightRunIndex = leftRunIndex + leftRunLength; int leftRunEndIndex = rightRunIndex; int rightRunEndIndex = rightRunIndex + rightRunLength; int targetIndex = targetOffset; int inversions = 0; while (leftRunIndex != leftRunEndIndex && rightRunIndex != rightRunEndIndex) { if (comparator.compare(sourceArray[rightRunIndex], sourceArray[leftRunIndex]) <0) { inversions += leftRunEndIndex - leftRunIndex; targetArray[targetIndex++] = sourceArray[rightRunIndex++]; } else { targetArray[targetIndex++] = sourceArray[leftRunIndex++]; } } System.arraycopy(sourceArray, leftRunIndex, targetArray, targetIndex, leftRunEndIndex - leftRunIndex); System.arraycopy(sourceArray, rightRunIndex, targetArray, targetIndex, rightRunEndIndex - rightRunIndex); return inversions; } private NaturalMergesortInversionCounter() {} }
Utils.java
package net.coderodde.util; import java.util.Comparator; /** * This class contains generic facilities. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public final class Utils { public static final Comparator NATURAL_ORDER = new Comparator() { @Override public int compare(Object o1, Object o2) { return ((Comparable) o1).compareTo(o2); } }; public static void checkIndices(int arrayLength, int fromIndex, int toIndex) { if (fromIndex < 0) { throw new IndexOutOfBoundsException( "fromIndex(" + fromIndex + ") < 0"); } if (toIndex > arrayLength) { throw new IndexOutOfBoundsException( "toIndex(" + toIndex + ") > " + "arrayLength(" + arrayLength + ")"); } if (fromIndex > toIndex) { throw new IndexOutOfBoundsException( "fromIndex(" + fromIndex + ") > toIndex(" + toIndex + ")"); } } }
AbstractInversionCounterTest.java
package net.coderodde.util; import java.util.Arrays; import java.util.Random; import org.junit.Test; import static org.junit.Assert.*; /** * This abstract test class implements all the actual unit tests. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public abstract class AbstractInversionCounterTest { private static final int TEST_ITERATIONS = 100; private static final int MAXIMUM_ARRAY_LENGTH = 1000; protected final InversionCounter<Integer> inversionCounter; private final Random random = new Random(); public AbstractInversionCounterTest( InversionCounter<Integer> inversionCounter) { this.inversionCounter = inversionCounter; } @Test public void test() { for (int iteration = 0; iteration < TEST_ITERATIONS; iteration++) { int length = random.nextInt(MAXIMUM_ARRAY_LENGTH + 1); int fromIndex = random.nextInt(Math.max(1, length / 10)); int toIndex = length - random.nextInt( Math.max(1, (length - fromIndex) / 10)); Integer[] array1 = getRandomIntegerArray(length, -length / 2 - 10, +length / 2 + 10, random); Integer[] array2 = array1.clone(); assertEquals(BruteForceInversionCounter.count(array1, fromIndex, toIndex, Integer::compareTo), inversionCounter.count(array2, fromIndex, toIndex, Integer::compareTo)); Arrays.sort(array1, fromIndex, toIndex); assertTrue(Arrays.equals(array1, array2)); } } /** * Creates a random integer array. * * @param length the desired length of the array. * @param minValue the minimum integer value. * @param maxValue the maximum integer value. * @param random the random number generator. * @return a randomly generated integer array. */ private Integer[] getRandomIntegerArray(int length, int minValue, int maxValue, Random random) { Integer[] array = new Integer[length]; for (int i = 0; i < length; ++i) { array[i] = randomValue(minValue, maxValue, random); } return array; } /** * Returns a random integer value from the range {@code minValue, * minValue + 1, ..., maxValue - 1, maxValue}, according to the uniform * distribution. * * @param minValue the minimum integer value. * @param maxValue the maximum integer value. * @param random the random number generator. * @return a random integer value within the range. */ private Integer randomValue(int minValue, int maxValue, Random random) { return minValue + random.nextInt(maxValue - minValue + 1); } }
InversionCounter.java
package net.coderodde.util; import java.util.Comparator; /** * Defines the most specific API for inversion counting algorithms. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) * @param <T> the array component type. */ @FunctionalInterface public interface InversionCounter<T> { public int count(T[] array, int fromIndex, int toIndex, Comparator<? super T> comparator); }
MergesortInversionCounterTest.java
package net.coderodde.util; /** * This unit test tests the correctness of the mergesort-based inversion * counter. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public class MergesortInversionCounterTest extends AbstractInversionCounterTest { public MergesortInversionCounterTest() { super(MergesortInversionCounter::count); } }
NaturalMergesortInversionCounterTest.java
package net.coderodde.util; /** * This unit test tests the correctness of the mergesort-based inversion * counter. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 30, 2017) */ public class NaturalMergesortInversionCounterTest extends AbstractInversionCounterTest { public NaturalMergesortInversionCounterTest() { super(MergesortInversionCounter::count); } }
MyBenchmark.java
package net.coderodde; import java.util.Arrays; import java.util.Random; import java.util.concurrent.TimeUnit; import net.coderodde.util.MergesortInversionCounter; import net.coderodde.util.NaturalMergesortInversionCounter; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; /** * This class implements benchmark for inversion counter algorithms. * * @author Rodion "rodde" Efremov * @version 1.6 (Dec 31, 2017) */ public class MyBenchmark { private static final int ARRAY_LENGTH = 1_000_000; private static final int MINIMUM_INTEGER_VALUE = -100_000; private static final int MAXIMUM_INTEGER_VALUE = +100_000; private static final int RUN_LENGTH_IN_PRESORTED_ARRAY = 2000; @State(Scope.Thread) public static class MyRandomState { private final Random random = new Random(); Integer[] array; @Setup(Level.Trial) public void createRandomArray() { array = createRandomIntegerArray(ARRAY_LENGTH, MINIMUM_INTEGER_VALUE, MAXIMUM_INTEGER_VALUE, random); } } @State(Scope.Thread) public static class MyPresortedState { private final Random random = new Random(); Integer[] array; @Setup(Level.Trial) public void createPresortedArray() { array = createPresortedIntegerArray(ARRAY_LENGTH, MINIMUM_INTEGER_VALUE, MAXIMUM_INTEGER_VALUE, random); } } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void testMergesortOnRandomArray(MyRandomState state) { MergesortInversionCounter.count(state.array); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void testMergesortOnPresortedArray(MyPresortedState state) { MergesortInversionCounter.count(state.array); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void testNaturalMergesortOnRandomArray(MyRandomState state) { NaturalMergesortInversionCounter.count(state.array); } @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void testNaturalMergesortOnPresortedArray(MyPresortedState state) { NaturalMergesortInversionCounter.count(state.array); } private static Integer[] createRandomIntegerArray(int length, int minimumIntegerValue, int maximumIntegerValue, Random random) { Integer[] array = new Integer[length]; for (int i = 0; i < length; ++i) { array[i] = getRandomIntegerValue(minimumIntegerValue, maximumIntegerValue, random); } return array; } private static Integer[] createPresortedIntegerArray(int length, int minimumIntegerValue, int maximumIntegerValue, Random random) { Integer[] randomArray = createRandomIntegerArray(length, minimumIntegerValue, maximumIntegerValue, random); for (int i = 0; i < length; i += RUN_LENGTH_IN_PRESORTED_ARRAY) { Arrays.sort(randomArray, i, Math.min(length, i + RUN_LENGTH_IN_PRESORTED_ARRAY)); } return randomArray; } private static Integer getRandomIntegerValue(int minimumIntegerValue, int maximumIntegerValue, Random random) { return minimumIntegerValue + random.nextInt(maximumIntegerValue - minimumIntegerValue + 1); } public static void main(String[] args) throws RunnerException { Options options = new OptionsBuilder() .include(MyBenchmark.class.getSimpleName()) .warmupIterations(5) .measurementIterations(10) .forks(1) .build(); new Runner(options).run(); } }
Benchmark figures
Benchmark Mode Cnt Score Error Units MyBenchmark.testMergesortOnPresortedArray avgt 10 317.640 ± 187.535 ms/op MyBenchmark.testMergesortOnRandomArray avgt 10 168.408 ± 37.833 ms/op MyBenchmark.testNaturalMergesortOnPresortedArray avgt 10 39.191 ± 17.872 ms/op MyBenchmark.testNaturalMergesortOnRandomArray avgt 10 50.237 ± 11.120 ms/op
Info
The actual counters live here. The benchmark lives here.
Critique request
Please tell me anything that comes to mind, be it related to algorithms, unit testing or benchmarking.