[博客翻译]纯函数滑动窗口聚合算法


原文地址:https://byorgey.github.io/blog/posts/2024/11/27/stacks-queues.html


在 Haskell 中的竞技编程:栈、队列和单调滑动窗口

假设我们有一个长度为 (n) 的项目列表,并且我们想要在列表内考虑宽度为 (w) 的“窗口”(即连续的子序列)。

图示
通过暴力方法,我们可以在 (O(nw)) 时间内计算每个窗口的总和:只需生成所有窗口并分别求和。但当然,我们可以做得更好:跟踪当前窗口的总和;每当我们将窗口向右滑动一个元素时,我们可以加上右边进入窗口的新元素并减去左边滑出窗口的元素。使用这种“滑动窗口”技术,我们可以仅用 (O(n)) 的时间来计算每个窗口的总和,而不是 (O(nw))。

但如果要找到每个窗口的最大值呢?当然,(O(nw)) 的暴力算法仍然可行,但要在 (O(n)) 内完成则明显更复杂!我们无法像处理和一样使用相同的技巧,因为当元素从左侧滑出时,我们没有方法“减去”该元素。这实际上归结为这样一个事实:加法构成一个“群”(即带有逆元的单态),而 max 操作并不形成群。因此,更普遍的问题是:如何以 (O(n)) 的时间计算每个窗口的单态汇总?

今天我想展示一种解决此问题的方法,这是我最喜欢的竞争编程技巧之一,并且它非常适合功能性的上下文。在此过程中,我们还将看到如何实现简单而高效的函数式队列。

在我们深入队列之前,需要绕道了解一下栈。在 Haskell 中,栈非常简单。我们可以只使用一个列表,将列表的前端视为栈的顶部。然而,为了让事情变得更有趣(并且因为它将在后面非常有用),我们将实现“带注释的栈”。栈中的每个元素都将有一个“度量”,该度量是一个来自某个单态 (m) 的值。然后我们希望能够在 (O(1)) 时间内查询任何栈上所有度量的总计。例如,我们可能总是希望找到栈上所有元素的总和或最大值。

如果我们想实现由“群”注释的栈,我们可以这样做:

data GroupStack g a = GroupStack (a -> g) !g [a]

也就是说,一个 GroupStack 存储一个度量函数,该函数为每个类型为 a 的元素分配一个类型为 g 的度量(期望是“群”);一个代表栈上所有元素度量之和(通过群运算)的类型为 g 的值;以及实际的栈本身。推入时,我们只需要计算新元素的度量并将其添加到缓存的 g 值中;弹出时,我们减去被弹出元素的度量,就像这样:

push :: a -> GroupStack g a -> GroupStack g a
push a (GroupStack f g as) = GroupStack f (f a <> g) (a:as)

pop :: GroupStack g a -> Maybe (a, GroupStack g a)
pop (GroupStack f g as) = case as of
 [] -> Nothing
 (a:as') -> Just (a, GroupStack f (inv (f a) <> g) as')

但这对于单态来说显然行不通,因为在弹出操作中我们无法简单地减去除去的元素的度量。相反,我们需要能够“恢复”以前堆栈的状态。这听起来很像...我们可能可以用...一个栈来做到这一点!我们可以简单地在元素栈旁边存储一个度量栈;更好的方式是存储一个对栈。也就是栈上的每个元素都会与一个表示其自身及其下方所有元素总和的注释配对。这就是我们的带注释栈的表示方式:

{-# LANGUAGE BangPatterns #-}
module Stack where
data Stack m a = Stack (a -> m) !Int [(m, a)]

一个 Stack m a 存储三件事:

  1. 一个类型为 a -> m 的度量函数。顺便说一句,如果我们要能够为每个元素指定任意的度量,甚至可以不同时给相同的元素不同度量,也很容易做到:只要使用 (m, a) 对作为元素,并使用 fst 作为度量函数。
  2. 一个整数,表示栈的大小。虽然严格来说这不是必需的,特别是因为可以通过单态注释来跟踪大小;但由于经常需要大小,因此方便起见直接包含它是明智的选择。
  3. 上述的 (注释, 元素) 对栈。

注意,我们不能为 Stack m 编写 Functor 实例,因为 a 出现在 (a -> m) 中是反变的。但这很有道理:如果改变所有 a 的值,则缓存的度量就不再有效。

创建一个新的空栈时,我们需要指定度量函数;为了得到栈的度量,我们只需查找顶部的度量,或者对于空栈返回 mempty

new :: (a -> m) -> Stack m a
new f = Stack f 0 []

size :: Stack m a -> Int
size (Stack _ n _) = n

measure :: Monoid m => Stack m a -> m
measure (Stack _ _ as) = case as of
 [] -> mempty
 (m, _) : _ -> m

现在让我们实现 pushpop,这两个都很直接。

push :: Monoid m => a -> Stack m a -> Stack m a
push a s@(Stack f n as) = Stack f (n + 1) ((f a <> measure s, a) : as)

pop :: Stack m a -> Maybe (a, Stack m a)
pop (Stack f n as) = case as of
 [] -> Nothing
 (_, a) : as' -> Just (a, Stack f (n - 1) as')

最后,我们需要一个用于“反转”栈的函数,这很容易:

reverse :: Monoid m => Stack m a -> Stack m a
reverse (Stack f _ as) = foldl' (flip push) (new f) (map snd as)

即要反转一个栈,我们提取元素并通过 foldl' 将元素一次推入到一个使用相同度量函数的新栈上。

队列

现在我们已经有了带注释的栈,让我们转向队列。这是我的最爱技巧的核心部分:我们可以使用两个栈来实现队列,使插入和删除的操作在均摊情况下为 (O(1)) 时间;而且如果我们使用了带注释的栈,我们会自动获得带注释的队列!

首先是一些导入:

{-# LANGUAGE ImportQualifiedPost #-}
module Queue where
import Data.Bifunctor (second)
import Stack (Stack)
import Stack qualified as Stack

一个 Queue m a 仅仅包含两个栈,一个是前面的栈,一个是后面的栈。要创建一个新的队列,我们只是创建两个新的栈;要获取队列的大小,我们只需添加栈的大小;要获取队列的度量,我们只需合并栈的度量。这很简单。

type CommutativeMonoid = Monoid
data Queue m a = Queue {getFront :: Stack m a, getBack :: Stack m a} deriving (Show, Eq)

new :: (a -> m) -> Queue m a
new f = Queue (Stack.new f) (Stack.new f)

size :: Queue m a -> Int
size (Queue front back) = Stack.size front + Stack.size back

measure :: Commutative