Median of a stream of integer

Given that integers are read from a data stream. Find median of elements read so for in efficient way.

For example, median of the stream, A = [1, 5, 3, 2, 6, 2, 3] is = 3.

Note that we need to find the running median at any time of the stream. That is each time a new number shows up to the stream we need to update the median. For example,

  
       A = [1], median = 1
       A = [1,5], median = (5+1)/2 = 3
       A = [1,5,3], median = 3
       A = [1,5,3,2], median = (2+3)/2 = 2
       A = [1,5,3,2,6], median = 3
       A = [1,5,3,2,6,2], median = (2+3)/2 = 2
       A = [1,5,3,2,6,2,3], median = 3

Observe that at a particular time if there are odd number of elements in then median is the middle element in the sorted order of the stream. That is half of the stream is less than the current median and half of them are greater than the current median. If there are even number of numbers then median is the average of middle two elements such that half of the numbers are less than the median and half of them are greater.

So, the idea is to use some data structure that will maintain two lists of elements such that first list is less then or equal to current median and the second list is greater then or equal to the current median. If both list are of same size then the average of the top of the two lists is the median. Otherwise, median is top of the bigger list. Question is what data structure to use?

Using self-balancing BST
At every node of a AVL BST, maintain number of elements in the subtree rooted at that node. We can use a node as root of simple binary tree, whose left child is self balancing BST with elements less than root and right child is self balancing BST with elements greater than root. The root always holds current median. I discussed the implementation of an AVL tree in my previous post here.

Using Heap
Similar to balancing BST in above, we can use a max heap on left side to represent elements that are less than the current median, and a min heap on right side to represent elements that are greater than current median.

After processing an incoming element, the number of elements in heaps differ at most by 1 element. When both heaps are balanced, we pick average of heaps root data as the current median. When the heaps are not balanced, we select currrent median from the root of heap containing more elements.

The following is a O(lgn) time implementation for finding median of two sorted arrays using a min heap. Implementations for generic max/min heap could be found in my previous post. However I chose to use Java PriorityQueue to implement max and min heap. By default PriorityQueue behaves as minheap. So, we need to provide reverse natural order comparator to construct max heap.

// insert current element to the left or right heap and get median so far
public static int getMedian(final int current, final int med, final PriorityQueue<Integer> left, final PriorityQueue<Integer> right) {
	final int balance = left.size() - right.size();
    int median = med;

    // both heaps are of equal size.
    if (balance == 0) {
        // need to insert in left
        if (current < median) {
            left.offer(current);
            median = left.peek();
        }
        // need to insert in right
        else {
            right.offer(current);
            median = right.peek();
        }
    }
    // left heap is larger
    else if (balance > 0) {
        // need to insert in left
        if (current < median) {
            right.offer(left.poll());
            left.offer(current);
        }
        // need to insert in right
        else {
            right.offer(current);
        }

        median = (left.peek() + right.peek()) / 2;
    }
    // right heap is larger
    else if (balance < 0) {
        // need to insert in left
        if (current < median) {
            left.offer(current);
        }
        // need to insert in right
        else {
            left.offer(right.poll());
            right.offer(current);
        }

        median = (left.peek() + right.peek()) / 2;
    }

    return median;
}

public static int getStreamMedian(final int[] stream) {
    int median = 0;
    final PriorityQueue<Integer> left = new PriorityQueue<Integer>(16, Collections.reverseOrder());
    final PriorityQueue<Integer> right = new PriorityQueue<Integer>(16);

    for (int i = 0; i < stream.length; i++) {
        median = getMedian(stream[i], median, left, right);
    }
    return median;
}