Reservoir Sampling for Streaming Data
I have been interested in streaming data algorithms for some time. These algorithms assume that data arrive sequentially over time and/or that the data set is too large to fit into memory for random access. Perhaps the most widely-known streaming algorithm is HyperLogLog, which calculates the approximate number of distinct values in a stream, with fixed memory use. In this post, I will discuss a simple algorithm for randomly sampling from from a data stream.
Let the values of the stream be \(x_1, x_2, x_3, \ldots\); we do not need to assume that the stream ever terminates, although it may. The reservoir algorithm samples \(k\) random items from the stream (without replacement) in the sense that after seeing \(n\) data points, the probability that any individual data point is in the sample is \(\frac{k}{n}\). This algorithm only requires one pass through the stream, and uses storage proportional to \(k\) (not the total, possibly infinite, size of the stream).
The reservoir algorithm is so named because at each step it updates a “reservoir” of candidate samples. We will denote the reservoir of candidate samples by \(R\). We will use \(R_t\) to denote the state of \(R\) after observing the first \(t\) data points. We think of \(R\) as a vector of length \(k\), so \(R_t[0]\) is the first candidate sample after \(t\) data points have been seen, \(R_t[1]\) is the second, \(R_t[k - 1]\) is the last, etc. It is important that \(k\) is small enough that the reservoir vectors can be stored in memory (or at least accessed reasonably quickly on disk).
We initialize the first reservoir, \(R_k\) with the first \(k\) data points we see. At this point, we have a random sample (without replacement) of the first \(k\) data points from the stream.
Suppose now that we have seen \(t - 1\) elements and have a reservoir of sample candidates \(R_{t - 1}\). When we receive \(x_t\), we generate an integer \(i\) uniformly distributed in the interval \([1, t]\). If \(i \leq k\), we set \(R_t[i - 1] = x_t\); otherwise, we wait for the next data point.
Intuitively, this algorithm seems reasonable, because \(P(x_t \in R_t) = P(i \leq k) = \frac{k}{t}\), as we expect from a uniform random sample. What is less clear at this point is that for any \(s < t\), \(P(x_{s} \in R_t) = \frac{k}{t}\) as well. We will now prove this fact.
First, we calculate the probability that a candidate sample in the reservoir remains after another data point is received. We let \(x_s \in R_t\), and suppose we have observed \(x_{t + 1}\). The candidate sample \(x_s\) will be in \(R_{t + 1}\) if and only if the random integer \(i\) generated for \(x_{t + 1}\) is not the index of \(x_s\) in \(R_t\). Since \(i\) is uniformly distributed in the interval \([1, t + 1]\), we have that
\[P(x_s \in R_{t + 1}\ |\ x_s \in R_t) = \frac{t}{t + 1}.\]
Now, suppose that we have received \(n\) data points. First consider the case where \(k < s < n\). Then
\[P(x_s \in R_n) = P(x_s \in R_n\ |\ x_s \in R_s) \cdot P(x_s \in R_s).\]
The second term, \(P(x_s \in R_s)\), is the probability that \(x_s\) entered the reservoir when it was first observed, so
\[P(x_s \in R_s) = \frac{k}{s}.\]
To calculate the first term, \(P(x_s \in R_n\ |\ x_s \in R_s)\), we multiply the probability that \(x_s\) remains in the reservoir after each subsequent observation, yielding
\[ P(x_s \in R_n\ |\ x_s \in R_s) = \prod_{t = s}^{n - 1} P(x_s \in R_{t + 1}\ |\ x_s \in R_t) = \frac{s}{s + 1} \cdot \frac{s + 1}{s + 2} \cdot \cdots \cdot \frac{n - 1}{n} = \frac{s}{n}. \]
Therefore
\[P(x_s \in R_n) = \frac{s}{n} \cdot \frac{k}{s} = \frac{k}{n},\]
as desired.
Now consider the case where \(s \leq k\), so that \(P(x_s \in R_k) = 1\). In this case,
\[ P(x_s \in R_n) = P(x_s \in R_n\ |\ x_s \in R_k) = \prod_{t = k}^{n - 1} P(x_s \in R_{t + 1}\ |\ x_s \in R_t) = \frac{k}{k + 1} \cdot \frac{k + 1}{k + 2} \cdot \cdots \cdot \frac{n - 1}{n} = \frac{k}{n}, \]
as desired.
Below we give an implementation of this algorithm in Python.
import itertools as itl
import numpy as np
def sample_after(stream, k):
"""
Return a random sample ok k elements drawn without replacement from stream.
This function is designed to be used when the elements of stream cannot
fit into memory.
"""
= np.array(list(itl.islice(stream, k)))
r
for t, x in enumerate(stream, k + 1):
= np.random.randint(1, t + 1)
i
if i <= k:
- 1] = x
r[i
return r
xrange(1000000000), 10) sample(
> array([950266975, 378108182, 637777154, 915372867, 298742970, 629846773,
749074581, 893637541, 328486342, 685539979])
Generalizations
Vitter1 gives three generalizations of this simple reservoir sampling algorithm, all based on the following idea.
Instead of generating a random integer for each data point, we generate the number of data points to skip before the next candidate data point. Let \(S_t\) be the number of data points to advance from \(x_t\) before adding a candidate to the reservoir. For example, if \(S_t = 3\), we would ignore \(x_{t + 1}\) and \(x_{t + 2}\) and add \(x_{t + 3}\) to the reservoir.
The simple reservoir algorithm leads to the following distribution on \(S_t\):
\[P(S_t = 1) = \frac{k}{t + 1}\]
\[P(S_t = 2) = \left(1 - \frac{k}{t + 1}\right) \frac{k}{t + 2} = \frac{t - k + 1}{t + 1} \cdot \frac{k}{t + 2}\]
\[P(S_t = 3) = \left(1 - \frac{k}{t + 2}\right) \left(1 - \frac{k}{t + 1}\right) \frac{k}{t + 3} = \frac{t - k + 2}{t + 2} \cdot \frac{t - k + 1}{t + 1} \cdot \frac{k}{t + 3}\]
In general,
\[P(S_t = s) = k \cdot \frac{t! (t + s - (k + 1))!}{(t + s)! (t - k)!}.\]
Vitter gives three generalizations of the reservor algorithm, each based on different ways of sampling from the distribution of \(S_t\). These generalizations have the advantage of requiring the generation of fewer random integers than the simple reservoir algorithm given above.
This blog post is available as an IPython notebook here.
Vitter, Jeffrey S. “Random sampling with a reservoir.” ACM Transactions on Mathematical Software (TOMS) 11.1 (1985): 37-57.↩︎