Reservoir sampling(水塘抽样)

题目1:给出一个数据流,这个数据流的长度很大或者未知。并且对该数据流中数据只能访问一次。请写出一个随机选择算法,使得数据流中所有数据被选中的概率相等。

对于复杂问题一定要学会归纳总结,即从小例子入手,然后分析,得出结论,然后在证明。不然遇到一个抽象问题,不举例感觉这个问题,直接解还是比较难的。

对于此问题的难处就是数据流的长度未知,如果已知,so easy。现在进行归纳总结:

  1. 长度为1,只有一个数据,直接返回即可,此数据被返回的概率为1.
  2. 长度为2,当读取第一数据时,我们发现并不是最后一个数据,我们不能直接返回,因为数据流还没结束,继续读取,到第二数据的时候,发现已经结束。所以现在的问题就是等概率返回其中的一个,显然概率为0.5。所以此时我们可以生成一个0到1的随机数p,如果p小于0.5,返回第二个,如果大于0.5,返回第一个。显然此时两个数据被返回的概率是一样的。
  3. 长度为3,我们可以事先分析得到,为了满足题意,需要保证每个数据返回的概率都是1/3。接下来分析数据流,首先读取第一个数据,然后在读取第二个数据,此时可以按2)处理,保留一个数据,每个数据显然为1/2。此时读取第三个数据,发现到尾部了,为了满足题意,此时需要一1/3的概率决定是否取此数据。现在分析前两个数是否也是以1/3的概率返回,如果是则总体都满足。1或2留下的概率为1 - 1/3 = 2/3 ,而1与2用有的机会想等,则 2/3 * 1/2 = 1/3 所以,对长度为3的数据流,在读取第三个数据时,我们可以生成一个0到1的随机数p,如果p小于1/3,返回第三个数据,否则,返回前面两个pk留下的数据。
  4. 同理,4留下的概率为 1/4, 1,2,3留下概率为3/4, 1,2,3 机会均等,3/4 * 1/3 = 1/4

由上面的分析,我们可以得出结论,在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数。

下面用数学归纳法证明此结论。

  1. 当n=1时,第一个元素以1/1的概率返回,符合条件。
  2. 假设当n=k时成立,即每个元素都以1/k的概率返回,先证明n=k+1时,是否成立。
  3. 对于最后一个元素显然以1/k+1的概率返回,符合条件,对于前k个数据,被返回的概率为1/k * (1- 1/k+1)=1/k+1,满足题意。
  4. 综上所述,结论成立。

题目2: 对于题目1的要就变为,最后返回的结果长度为k,这就是水塘抽样

显然有了对题目1的理解,我们可以直接替换结论,只需把上面的1/n变为k/n即可。

在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数,跟k相比,道理是一样的。

可以以同样的方法证明之。

  1. 初始情况k<=n,出现在水库中的k个元素的出现概率都是一致的,都是1
  2. 第一步。第一步就是指,处理第k+1个元素的情况。分两种情况:元素全部都没有被替换;其中某个元素被第k+1个元素替换掉。
  3. 我们先看
    1. 情况2:第k+1个元素被选中的概率是k/(k+1)(根据公式k/i),所以这个新元素在水库中出现的概率就一定是k/(k+1)(不管它替换掉哪个元素,反正肯定它是以这个概率出现在水库中)。下面来看水库中剩余的元素出现的概率,也就是1-P(这个元素被替换掉的概率)。水库中任意一个元素被替换掉的概率是:(k/k+1)*(1/k)=1/(k+1),意即首先要第k+1个元素被选中,然后自己在集合的k个元素中被选中。那它出现的概率就是1-1/(k+1)=k/(k+1)。可以看出来,旧元素和新元素出现的概率是相等的。
    2. 情况1:当元素全部都没有替换掉的时候,每个元素的出现概率肯定是一样的,这很显然。但具体是多少呢?就是1-P(第k+1个元素被选中)=1-k/(k+1)=1/(k+1)。
  4. 归纳法:重复上面的过程,只要证明第i步到第i+1步,所有元素出现的概率是相等的即可。

spark 的工具类 SamplingUtils 水塘抽样源码

/**
* Reservoir sampling implementation that also returns the input size.
* 会返回样本总数
*
* @param input input size
* @param k reservoir size
* @param seed random seed
* @return (samples, input size)
*/
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Int) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}

// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
val rand = new XORShiftRandom(seed) // XORShiftRandom 比 jdk 的 Random 性能高 x3.5
while (input.hasNext) {
val item = input.next()
val replacementIndex = rand.nextInt(i)
if (replacementIndex < k) {
reservoir(replacementIndex) = item
}
i += 1
}
(reservoir, i)
}
}


- - - - - - - - End Thank For Your Reading - - - - - - - -