FP-Growth原理

0 / 152

基本概念

关联规则挖掘的一个典型例子是购物篮分析。关联规则研究有助于发现交易数据库中不同商品(项)之间的联系,找出顾客购买行为模式,如购买了某一商品对购买其他商品的影响,分析结果可以应用于商品货架布局、货存安排以及根据购买模式对用户进行分类。

关联规则的相关术语如下:

(1)项与项集

这是一个集合的概念,在一篮子商品中的一件消费品即为一项(Item),则若干项的集合为项集,如{啤酒,尿布}构成一个二元项集。

(2)关联规则

一般记为的形式,X 为先决条件,Y 为相应的关联结果,用于表示数据内隐含的关联性。如:表示购买了尿布的消费者往往也会购买啤酒。

关联性强度如何,由三个概念——支持度、置信度、提升度来控制和评价。

例:有 10000 个消费者购买了商品,其中购买尿布 1000 个,购买啤酒 2000 个,购买面包 500 个,同时购买尿布和面包 800 个,同时购买尿布和面包 100 个。

(3)支持度(Support)

支持度是指在所有项集中{X, Y}出现的可能性,即项集中同时含有 X 和 Y 的概率:

该指标作为建立强关联规则的第一个门槛,衡量了所考察关联规则在“量”上的多少。通过设定最小阈值(minsup),剔除“出镜率”较低的无意义规则,保留出现较为频繁的项集所隐含的规则。

设定最小阈值为 5%,由于{尿布,啤酒}的支持度为 800/10000=8%,满足基本输了要求,成为频繁项集,保留规则;而{尿布,面包}的支持度为 100/10000=1%,被剔除。

(4)置信度(Confidence)

置信度表示在先决条件 X 发生的条件下,关联结果 Y 发生的概率:

这是生成强关联规则的第二个门槛,衡量了所考察的关联规则在“质”上的可靠性。相似的,我们需要对置信度设定最小阈值(mincon)来实现进一步筛选。

具体的,当设定置信度的最小阈值为 70% 时,置信度为 800/1000=80%,而的置信度为 800/2000=40%,被剔除。

(5)提升度(lift)

提升度表示在含有 X 的条件下同时含有 Y 的可能性与没有 X 这个条件下项集中含有 Y 的可能性之比:公式为 confidence(artichok => cracker)/support(cracker) = 80%/50% = 1.6。该指标与置信度同样衡量规则的可靠性,可以看作是置信度的一种互补指标。

事务数据

假设我们的 Transaction 数据库有 5 条交易数据,如下表,其中 abcde 为 5 个商品。假设设定 minSupport = 0.4,即要求至少共同出现 2 次。

id 购买的商品
1 a b d
2 b c d
3 a b e
4 a b c
5 b c d
表 1:交易数据

频繁项挖掘的技术演进

1.1 暴力求解
5 个 sku,一共有 2 的 5 次方种购买可能,如下图所示。暴力求解循环每种可能,全量扫描交易数据库计算购买次数,看其是否超过设定的 minSupport,进而判断是否是频繁项。

1.jpg

1.2 Apriori 算法
上述的暴力求解方式对每种可能都会进行数据库的全表扫描,效率低下。因此有学者提出了 Apriori 算法。Apriori 中文含义是先验的,因为算法基于这么一个先验知识:当购买组合 A 不是频繁项时,那么包含 A 的任何超集也必然不是频繁项。

通过这个先验知识,可以避免大量的无效数据库扫描,提高效率,提高的程度取决于交易数据和设置的 minsupport。示意图如下所示:
2.jpg

FP-Growth 算法

FP-Growth 算法更进一步,通过将交易数据巧妙的构建出一颗 FP 树,然后在 FP 树中递归的对频繁项进行挖掘。

FP-Growth 算法仅仅需要两次扫描数据库,第一次是统计每个商品的频次,用于剔除不满足最低支持度的商品,然后排序得到 FreqItems。第二次,扫描数据库构建 FP 树。

还是以上面的交易数据作为例子,接下来一步步的详细分析 FP 树的构建,和频繁项的递归挖掘。

2.1 统计频次
第一步,扫描数据库,统计每个商品的频次,并进行排序,显然商品 e 仅仅出现了一次,不符合 minSupport,剔除。最终得到的结果如下表:

商品 频次
b 5
a 3
c 3
d 3

表 2: 商品的出现频次

2.2 构建 FP 树
第二步,扫描数据库,进行 FP 树的构建。FP 树以 root 节点为起始,节点包含自身的 item 和 count,以及父节点和子节点。

首先是第一条交易数据,a b d,结合第一步商品顺序,排序后为 b a d,依次在树中添加节点 b,父节点为 root,最新的的频次为 1,然后节点 a,父节点为 a,频次为 1,最后节点 d,父节点为 b,频次为 1。如下图所示:
3.png

然后是第二条交易数据,排序后为:b c d。依次添加 b,树中已经有节点 b,因此更新频次加 1,然后是节点 c,b 节点当前只有子节点 d,因此新建节点 c,父节点为 b,频次为 1,最后是 d,父节点为 c,频次为 1。
4.png

后面三条交易数据的处理和前两条一样,就不详细阐述了,直接画出每次处理完的 FP 树示意图。
5.png

6.png

7.png

2.3 频繁项的挖掘
2.3.1 商品 b 频繁项的挖掘

首先是商品 b,首先 b 节点本身的频次符合 minSupport,所以是一个频繁项(b : 5),然后 b 节点往上找 subTree,只有跟节点,所以解锁,b 为前缀的频繁项只有一个:(b : 5)。

2.3.2 商品 a 频繁项的挖掘

然后是 a,显然 a 本身是个频繁项(a : 3),然后递归的获取 a 的子树,进行挖掘。子树构建方式如下:新建一个新的 FP 树,然后遍历树中所有的 a 节点,往上找,直到 root 节点,然后把当前路径上的非根节点添加到 subTree 中,每个节点的频次为当前遍历节点的频次。

因为 a 只有一个节点(a, 3),所以往上遍历得到节点 b,因此把 b 加入 subTree 中,频次为节点(a, 3)的频次 3。得到如下 subTree,显然在这个 subTree 中只能挖掘出频繁项(b : 3),然后别忘了这是 a 递归得到的子树,得拼上前缀 a,所以得到频繁项为(ab : 3)
8.png

此时的 subTree 只有一个节点(b, 3),不用进一步递归,因此商品 a 的频繁项挖掘结束,有两个频繁项为:(a : 3), (ab : 3)。

2.3.3 商品 c 频繁项的挖掘

商品 c 在 FP 树中包含两个节点,分别为: (c, 1), (c, 2)。显然 c 自身是个频繁项(c : 3),然后进行递归。(c, 1)节点往上路径得到如下节点:(a, 1), (b, 1)。节点(c, 2)往上得到(b, 2),上述三个节点可以构造出如下的 subTree:
9.png

subTree 中的节点(b, 3)符合 minsupport,拼上前缀 c 得到频繁项(bc : 3)。节点(a, 1)不满足要求,丢弃。

因此,c 挖掘出的频繁项为:(c:3), (cb : 3)

2.3.4 商品 d 频繁项的挖掘

同理,(d : 3)是一个频繁项,d 的 subTree 为:
10.png

子树首先挖出(c : 2),(b : 3),拼上前缀 d 得到(dc : 2),(db : 3),然后 subTree 中的节点 c 的 subTree 仅仅有根节点和节点(b, 2),拼上两个前缀得到(dcb : 2)

2.3.5 最终结果和验证

通过上述的挖掘过程,我们依次挖出了如下 9 个频繁项:(b : 5), (a : 3), (ab : 3), (c:3), (cb : 3), (d : 3), (dc : 2),(db : 3), (dcb : 2)

我们通过 pyspark 进行下验证,得到的结果和我们一步步推算的结果丝毫不差。

#encoding:utf-8

from pyspark.sql import SparkSession

ss = SparkSession.builder 
.appName("test_fp") 
.config("spark.executor.memory", "32G") 
.config("spark.driver.memory", "32G") 
.config("spark.python.worker.memory", "32G") 
.config("spark.default.parallelism", "4") 
.config("spark.executor.cores", "8") 
.config("spark.sql.shuffle.partitions", "500") 
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.broadcastTimeout","36000") 
.enableHiveSupport() 
.getOrCreate()

data = [["a", "b", "d"], ["b", "c", "d"], ["a", "b", "e"], ["a", "b", "c"],["b", "c", "d"]]
rdd = ss.sparkContext.parallelize(data, 2)
from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(rdd, 0.4, 2)
model.freqItemsets().collect()

# 结果如下:

#[FreqItemset(items=['a'], freq=3),
#FreqItemset(items=['a', 'b'], freq=3),
#FreqItemset(items=['b'], freq=5),
#FreqItemset(items=['d'], freq=3),
#FreqItemset(items=['d', 'b'], freq=3),
#FreqItemset(items=['c'], freq=3),
#FreqItemset(items=['c', 'd'], freq=2),
#FreqItemset(items=['c', 'd', 'b'], freq=2),
#FreqItemset(items=['c', 'b'], freq=3)]

Parallel FP-Growth

当交易的数据量级太大时,单机版本的 FP-Growth 内存将无法放下完整的 FP 树,并且单机效率较慢。如果能够实现并行化那就再好不过了,但是怎么将一颗完整的 FP 树拆分成多棵小树,分别在不同的机器节点上运行 FP-Growth,这些小树得出的结果汇总后不会丢失频繁项,这是关键所在。

在这样的背景下,Google 北京研究院相关人员提出了 PFP(Parallel FP-Growth),通过 3 次 Map-Reduce 操作对 FP-Growth 进行了并行化运行。目前各大开发包的实现也都是基于这一篇文章实现的,包括 spark 和 mahout。

通过前面一步步的构建 FP 树和频繁项挖掘过程,大家应该发现了,挖掘某一个商品的频繁项时,并不是所有的交易数据都是有用的,显然不包含此商品的交易数据肯定是冗余的。PFP 的关键就是将商品进行分堆到不同的机器节点上运行,每个节点获取和自己负责的节点相关的部分交易数据,然后后续的构建树和挖掘频繁项工作则是一样的。

第一个 map-reduce 用于统计每个 item 的频次,得到 frequentSets,功能类似于 wordCount。

第二个 map-reduce 实现分布式的 FP-Growth,是关键所在。mapper 负责生成 group-dependent transactions。首先将 frequentSets 进行分堆,假设分为两堆,得到如下表的 G_list。第一台机器负责 b 和 a 的频繁项挖掘,第二台机器负责 c 和 d 频繁项的挖掘。

商品 频次 group_id(后面简写为 gid)
b 5 1
a 3 1
c 3 2
d 3 2
表 3:G_list

然后开始划分交易数据,以第一条 a b d 为例,排序后为 b a d。从后往前遍历,首先是 d,gid 为 2,所以得到如下交易数据 <2, (b a d)>,第一项为 gid,第二个为对应的交易数据。第二个为 a,gid 为 1,得到 <1, (b a)>,因为 d 的频次小于 b 和 a,所以 d 的存在与否对以 b 和 a 为前缀的频繁项挖掘没影响,只对 d 为前缀的频繁项有作用。最后是 d,gid=2,因为已经有了 gid=2 的交易数据,因此不用处理。

通过上述步骤,可以生成各个 group_id 需要的交易数据,然后送个 reducer。Reducer 负责在这些交易数据上进行普通的 FP-Growth 操作,进行自己负责的 item 的频繁项的挖掘。

第三次 map-reduce 对上述不同分区产生的频繁项进行聚合得到最终结果。
总体示意图如下:
11.png

Spark mlib 实现核心方法解析

def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("Input data is not cached.")
    }
    val count = data.count()
    // 计算最小的频次
    val minCount = math.ceil(minSupport * count).toLong
    // 如果没设置分区数量,就使用训练数据的分区数
    val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
    val partitioner = new HashPartitioner(numParts)
    // 类似wordCount 实现每个item 频次的统计
    val freqItems = genFreqItems(data, minCount, partitioner)
    // 频繁项的挖掘
    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
    new FPGrowthModel(freqItemsets)
  }

考虑篇幅有限,统计 item 的频次比较简单,就不阐述了。下面介绍生成频繁项的方法:

private def genFreqItemsets[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      freqItems: Array[Item],
      partitioner: Partitioner): RDD[FreqItemset[Item]] = {
    val itemToRank = freqItems.zipWithIndex.toMap
    data.flatMap { transaction =>
      // 得到每个分区相关的交易数据
      genCondTransactions(transaction, itemToRank, partitioner)
      // 按照key进行聚合,key为group_id
    }.aggregateByKey(new FPTree[Int], partitioner.numPartitions)(
      // 用交易数据对树进行构建
      (tree, transaction) => tree.add(transaction, 1L),
      // 因为是key是group_id,所以不同item树可能不再一个分区中,
      // 所以不同分区,同一个group_id的树进行合并
      (tree1, tree2) => tree1.merge(tree2))
    .flatMap { case (part, tree) =>
      // 对不同gid下的树,进行频繁项的挖掘,第二个参数控制只挖自己gid负责的item
      tree.extract(minCount, x => partitioner.getPartition(x) == part)
    }.map { case (ranks, count) =>
      new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
    }
  }

然后是看怎么生成各个 group 相关的交易数据的。

private def genCondTransactions[Item: ClassTag](
      transaction: Array[Item],
      itemToRank: Map[Item, Int],
      partitioner: Partitioner): mutable.Map[Int, Array[Int]] = {
    val output = mutable.Map.empty[Int, Array[Int]]
    val filtered = transaction.flatMap(itemToRank.get)
    ju.Arrays.sort(filtered)
    val n = filtered.length
    var i = n - 1
    while (i >= 0) {
      val item = filtered(i)
      // 从后往前。得到item对应的group_id
      val part = partitioner.getPartition(item)
      if (!output.contains(part)) {
        // 如果输出没有这个group_id的数据,加进输出。key为group_id
        output(part) = filtered.slice(0, i + 1)
      }
      i -= 1
    }
    output
  }

FP 树的构建

def add(t: Iterable[T], count: Long = 1L): this.type = {
    require(count > 0)
    var curr = root
    curr.count += count
    t.foreach { item =>
      // summaries用于维护每个item的频次和关联的节点
      val summary = summaries.getOrElseUpdate(item, new Summary)
      summary.count += count
      // 如果当前curr指向节点的子节点没有item,则新建节点,父节点为curr,更新summary
      val child = curr.children.getOrElseUpdate(item, {
        val newNode = new Node(curr)
        newNode.item = item
        summary.nodes += newNode
        newNode
      })
      child.count += count
      // curr指向新加的节点
      curr = child
    }
    this
  }

频繁项的挖掘代码

def extract(
      minCount: Long,
      validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
    summaries.iterator.flatMap { case (item, summary) =>
      // summaries维护了所有item的count和节点,拿到前缀符合,频次也符合的节点
      if (validateSuffix(item) && summary.count >= minCount) {
        // 首先单节点自身是个频繁项,然后拼接递归挖掘出的频繁项
        Iterator.single((item :: Nil, summary.count)) ++
          // 先生成item对应的子树,然后递归的进行挖掘
          project(item).extract(minCount).map { case (t, c) =>
             // 子树挖掘的频繁项记得拼上递归前的前缀
            (item :: t, c)
          }
      } else {
        Iterator.empty
      }
    }
  }

子树的生成

private def project(suffix: T): FPTree[T] = {
    // 新生成一棵树
    val tree = new FPTree[T]
    if (summaries.contains(suffix)) {
      // 得到item的所有节点
      val summary = summaries(suffix)
      summary.nodes.foreach { node =>
        var t = List.empty[T]
        var curr = node.parent
        // 往上获取到root路径上的所有节点
        while (!curr.isRoot) {
          t = curr.item :: t
          curr = curr.parent
        }
        // 节点的频次都设置为当前node的频次
        tree.add(t, node.count)
      }
    }
    tree
  }

总结

本文通过简单案例入手,一步步的分析 FP-Growth 的原理,包括 FP 树的构建,频繁项的挖掘。然后分析并行 FP-Growth 以及其在 spark 中的实现。