Moving Average of Last N numbers in a Stream

Design a class to calculate moving average of last N numbers in a stream of real numbers

For example, if N=3 and the stream S=[2,3,4,1,2,-3,0,…] then moving averages at each each streamed number are = [2.0, 2.5, 3.0, 2.66, 2.33, 0, -0.33,…].

We need to design in a way that when a new element ins a stream comes then we can compute avg of last N elements in the stream in O(1) time. For example, if new streams 3 comes after the last stream 0, then moving avg would be (-3+0+3)/3 = 0.0.

Let’s first define a class for this problem. At any time we can add one number to the moving avg and any time we want to get the avg in o(1) time. So, we have two methods –

public interface MovingAverage{
    public void add(double num);
    public double getAvg();
}

First of all, let’s understand how we can compute moving average of all numbers we have seen so far in a stream.

Moving Average of All numbers in a stream

This is pure math. If we have seen total n numbers so far and current average is avg then when we add a new element the new average would be avg=(n*avg+new_element)/(n+1). Below is a pseudocode to find moving average of all numbers in a stream in O(1) time –

avg = 0;
n = 0;

def add(element):
  avg = (n*avg)/(n+1);
  n++;
end

def getAvg()
  return avg;
end

Moving Average of Last N Numbers
Now, how we can extend the solution to contains moving average of last N numbers? We can use the formula avg=(n*avg+new_element)/(n+1) upto N elements then what will happen? For example, if we need moving average of last N=3 elements from a stream = [2,3,4,5,6,…] then when we see 4 we have reached N=3 numbers and when we see next number 5 we need to compute average of last 3 i.e [3,4,5]. Similarly for next number 6 moving average will be avg of [4,5,6].

So, we can keep a list of N numbers and once we reach N numbers then every time we see a new element we can remove the first from the list and compute the average. But this is O(n) time. We need to do it O(1) time. We can use the same mathematical expression. If current moving average of last N element is A and the first of the elements is F then when we see latest new element L the moving average would be – A=(N*A-F+L)/N

Question, is how to keep tracking the first element? Certainly we can use a Queue of size N and dequeue the first element when the queue is full. Below is a pseudocode to do this –

Queue queue;
double avg;

def add(num) :
    double
    if(queue.size() == N) then
         avg = (N*avg-queue.dequeue()+num)/N
    else
         avg = (queue.size()*avg+num)/(queue.size()+1)
         queue.add(num);
    end
end

def getAvg() :
   return avg;
end

Can we do better without any special data structure?

Note that, this is a classical sliding window problem where we need to find sum of sliding window of size N And then average would be simple sliding_sum/N.

How we can implement sliding sum? We can use a circular buffer of size N to contains the sliding window. Each time we see an element we keep appending to the sliding window until window is full. At this point we need to slide the window head one position to right i.e. increment by one. How to keep track of sliding window head? As it is circular buffer we can increment head to head=(head+1)%N. Also note that at any position of head next when next element come the free slot for new element would be the the previous slot of head in the circular buffer i.e. empty_slot = (N+head-1)%N. See the below figures for understanding.

 
window of size 5 using circular buffer
free_slot  head
       |  |
       v  v
     |__|__|__|__|__|

       free_slot  head
               |  |
               v  v
     |__|__|__|__|__|

      head       free_slot
       |          |
       v          v 
     |__|__|__|__|__|

So, basically we can find moving average of last N elements by using a circular buffer of size N where we remove first element when buffer is full during an incoming new element. Below is O(1) time implementation of this above idea.

public static class MovingAvgLastN{
	int maxTotal;
	int total;
	double lastN[];
	double avg;
	int head;
	
	public MovingAvgLastN(int N){
		maxTotal = N;
		lastN = new double[N];
		avg = 0;
		head = 0;
		total = 0;
	}
	
	public void add(double num){
		double prevSum = total*avg;
		
		if(total == maxTotal){
			prevSum-=lastN[head];
			total--;
		}
		
		head = (head+1)%maxTotal;
		int emptyPos = (maxTotal+head-1)%maxTotal;
		lastN[emptyPos] = num;
		
		double newSum = prevSum+num;
		total++;
		avg = newSum/total;
	}
	
	public double getAvg(){
		return avg;
	}
}