技术飞速发展,机器学习正在成为各个企业的核心竞争优势之一。除了在处于风口浪尖的计算机视觉方向的应用,可能更能产生直接的价值的一个方向是在智能推荐领域。比如广告推荐,如果我们有一个更有效的算法,更精准的向用户推荐了某个广告,用户的广告点击将为企业直接带来收益。
然而在推荐领域,我们面临的是与当前的深度学习颇有不同的问题。这些不同主要体现在:
- 超大的数据量
- 领域专家人工设计的特征
- 极致的在线服务性能需求
为了解决这个技术上非常有挑战的问题,一般情况下,我们要考虑的方案都是借助于大数据的工具。自Google的两篇经典论文发表以来,大数据相关生态发展至今已经十多年过去了,虽然一直都有新的思想的产生,但是很多经典的工具已趋于成熟。大数据的相关工具应对大数据的挑战应当是理所应当的选择。
近期,在一个客户的项目上,我们有机会参加到了一个类似场景下的机器学习应用中,帮助客户解决这一问题。我们选择的技术方案是基于Hadoop的Hive大规模分布式结构化数据存储系统,及高性能Spark分布式计算引擎。以它们为核心来处理数据和训练模型。对于模型的线上服务,我们选择了mleap,mleap是专门针对Spark机器学习库MLLib的高性能服务化而设计的。
在前期的工作中,我们用spark在一个小的数据集上面实现并验证过了所有的功能。但是由于我们的测试环境资源非常有限,无法针对超大规模数据进行测试。在将系统上线到一个类生产环境之后,我们终于面临了亿级数据处理的问题。虽然我们选择的技术方案理论上是可以直接支持这样级别的数据的,但是在实际运行之后,还是遇到了颇多问题,在这里总结并与大家分享一下。
构建快速反馈环
TDD教会我们快速反馈的重要性。一旦我们有了一个快速反馈的机制,我们就能快速修正问题,快速前进。但是在超大数据的场景下,问题的复杂性往往使得我们难以获取快速反馈。比如,一个spark的应用,可能需要运行数十分钟到数小时,而我们遇到的问题可能在数十分钟或数小时之后才会出现。
在这样的场景下,我们如何尽可能的构建快速反馈环就显得更为重要了。试想,如果我们修改一行代码,需要花两个小时才能部署到环境中,然后应用运行再需要两个小时才能重现问题,那么我们一次修改就需要花费4个小时的时间。一来一回,可能一周或者一个月过去了,我们也没有能解决所有问题。
这样的场景对于我们的技术经验及技术功底的要求无疑都非常高。然而在我看来,最关键的还是在于构建快速反馈环。
为了构建这个反馈环我们做了哪些事情呢?
打造环境和工具。在做大规模数据的测试之前,我们能预料到潜在的(几乎是必然的)会产生不少的代码修改。那么搭建一套专用的测试环境就显得非常重要。有了这套环境,在必要时,我们可以采用非常规手段尽快部署我们的修改。同时我们需要自动化一切可以自动化的事情,将各种部署的步骤都编写成可以一键执行的脚本,这就为高效工作奠定了基础。为了实现尽可能的自动化,我们编写了多个自动化的工具脚本。工欲善其事,必先利其器。这些准备工作为后续的性能优化奠定了基础。
打通Spark监控页面及日志。同时由于我们所使用的是一套类生产环境,出于数据保密的要求,我们无法通过网络直接访问到这些数据,而和这些数据部署在一起的大数据集群自然也不例外了。为了访问这些数据,我们需要首先通过某一专用vpn连接到一个内网,然后通过windows的远程桌面连接到某一台内网跳板机,最后我们从跳板机来发起访问。在这样的网络模式中,不仅由于网速慢而导致操作卡顿严重,而且复制粘贴操作也被严格的限制了。客户的安全限制无疑成为了一个阻碍我们快速诊断问题的障碍。在这样的情况下,我们果断的快速实现了一个代理机制,将yarn上的Spark监控页面及相关日志通过代理暴露到我们可访问的网络中来。这样一来,我们就可以快速查询到我们的spark应用的状态了。我们后续遇到的所有问题几乎都是靠Spark监控页面及日志辅助解决的。
启动Spark历史应用日志服务。Spark不仅可以在运行时提供一个内容丰富的监控页面,其实它也可以将运行时的监控数据保存下来供后续分析和查看(详情见这里)。我们只需要在运行应用时配置spark.eventLog.enabled
为true
,spark.eventLog.dir
配置为某一个hdfs
路径即可。有了这些日志之后,我们运行./sbin/start-history-server.sh
启动日志服务工具,就可以在浏览器web页面上面看到相应的历史日志了。当我们的应用失败的时候,Yarn的Spark监控页面(driver节点上面启动的一个web服务)也会相应退出。这给我们分析这些失败的应用带来了困难。有了Spark历史应用日志服务,我们也就不会再有这样的问题。
了解基本的Spark优化方案
在开始我们的具体问题之前,有必要先了解一下基本的Spark优化手段有哪些。
首先,Spark应用也是一个普通的Java应用,所以所有的java程序优化手段都是适用的,例如如何合理利用缓存,如何优化数据结构和内存等。
其次,我们需要了解基本的分布式程序运行原理,简单来说就是Map-Reduce
(后续简称MR)算法的基本原理,Spark应用最终都会以一系列MR任务的方式执行。
然后,我们需要了解一般情况下分布式程序工作的瓶颈所在。一般而言,运行某个分布式应用时,我们会拥有非常多的cpu并行执行代码,但是这些cpu分布在不同的物理机上,所以在这些cpu间共享数据和调度任务会成为一个问题。
这里的问题会大致表现为:
- 磁盘IO – 多机并行数据读写时是否充分利用了多个磁盘进行数据访问,是否读取了尽可能近的数据存储,是否避免了没有必要的写盘操作等;
- 网络IO – 在reduce操作时任务间交换的数据有多少,如何在不同的物理机上的进程共享一些大的对象等;
- 调度规模 – 是否因为任务调度慢而导致应用慢,比如任务数量过多。
以上述几点为基础可以扩展出非常广泛的内容,各种资料也非常多,在这里就不重复赘述了。在这次的Spark应用优化中,我参考过的比较成体系的优化资料有:
- 官方的Spark调优指南
- 来自美团的Spark优化指南 – 初级篇及高级篇
- Java应用GC调优指南
从这些资料中,我们可以将Spark应用的优化方式整理为如下几点:
- 算法性能优化
- 使用map-side的计算(在map过程进行预计算),如使用
reduceByKey
/aggregateByKey
替代groupByKey
- 使用带Partitions的API进行计算(一次函数调用处理一个Partition),如
mapPartitions
替代map
- 使用手动添加前缀的方式优化由于数据倾斜带来的性能问题
- 使用map-side的计算(在map过程进行预计算),如使用
- 并行度优化
- 调用
repartition
API设置分区数量 - 设置默认的
shuffle
操作之后的分区数量 - 数据量变化之后,调用
coalesce
重设分区数量
- 调用
- 缓存优化
- 调用
cache
persist
及unpersist
以便控制哪一个RDD
需要缓存 - 缓存
RDD
时考虑使用序列化缓存,进一步考虑压缩
- 调用
- 内存优化
- 使用更节约内存的数据结构:如避免使用java的包装类型(boxed),避免使用内置的
Map
List
等数据结构(会创建额外的Entry
对象)等 - 使用广播变量:对于某个只读的大对象,在一个
Executor
内部共享,而不是每个task
都复制一份 - 调整spark管理的内存大小:配置
spark.memory
相关参数 - 调整JVM的新生代和老生代内存比例
- gc优化:使用
G1
垃圾收集器
- 使用更节约内存的数据结构:如避免使用java的包装类型(boxed),避免使用内置的
- 其他有用的优化方式
- 资源:配置
executor
的数量,每个executor
的核数及内存,driver
的核数和内存 - 调度:配置是否重启一个较慢的任务,设置
spark.speculation
相关参数 - IO:使用节约空间的序列化方式,如配置
kryo
序列化,调整本地化程度等待时间spark.locality.wait
参数
- 资源:配置
后文中针对每个问题的定位和优化均会从以上几点来进行考虑。
解决大数据场景下的问题
为了大家能理解后续的问题,对于我们的目标Spark应用,我整理了一个简单的数据处理流程图如下。
整个系统基于Spark MLLib构建。得益于Spark MLLib
中的Pipeline
抽象,我们可以将通用的数据处理过程建模为一个一个算子。比如原始数据中有一些字符串类型的分类特征(如按照出生时间可以分为80后、90后、00后等),我们一般会先将其数值化。这通过一个字符串索引算子(StringIndexer)就可以实现。再比如,我们如果要将某一个特征进行正则化,我们可以通过正则算子(Normalizer)来实现。将这些算子从前到后依次串联就可以实现一系列的可复用的处理过程。
某些算子需要预先做一些统计工作,比如为了实现正则化,我们需要知道当前特征的最大值最小值。计算这样的统计类信息的过程称为算子的fit
过程,而真正执行计算的过程称为transform
。
以上简述了Spark MLLib
中最基本的抽象,想了解更多的同学们可以移步这里。
为了将数据处理为机器学习模型需要的数据格式,我们构建了一个类似下面的Pipeline
。将一系列的数据(特征)转换为一个稀疏向量(上千万维)。
StringIndexer(特征列a b c -> a_idx b_idx c_idx)
-> OneHot(特征列a_idx b_idx c_idx -> a_idx_oh b_idx_oh c_idx_oh)
-> MultiHot(特征列ma -> ma_mh)
-> MultiHot(特征列mb -> mb_mh)
-> MultiHot...
-> VectorAssembler(特征列a_idx_oh b_idx_oh c_idx_oh ma_mh mb_mh ... -> final_result)
(上述括号内的内容表示将某几个特征进行计算,输出对应箭头后的特征列,如StringIndexer
算子处理特征列a后,输出特征列a_idx。)
这里用到的几个算子的功能简述如下:
StringIndexer
算子:提取字符串数据表示的所有分类,然后对某一个数据进行数值化编码,如,当我们一共有分类a
b
c
,则某一个分类b
将编码为数值1
(从0开始编码)OneHot
算子:将一个数值数据进行OneHot编码,转换为一个稀疏向量,如,当我们一共有4个元素时,数值3将编码为[0, 0, 0, 1, 0]
MultiHot
算子:将一个多值字符串数据编码为一个稀疏向量,如,对于某一特征喜好
,所有的喜好
类型有a
b
c
d
,则数据b c
将编码为[0, 1, 1, 0]
VectorAssembler
算子:将多个向量合并为一个大的向量,如数据[0, 0, 1]
[1, 0]
将合并为向量[0, 0, 1, 1, 0]
在应对亿级数据时,这样的Spark应用遇到了哪些问题,而我们又是如何一步一步解决的呢?
计算特别慢,最后出现OOM问题
由于我们已经在一个小的数据集上面实现并验证过了所有的功能,一开始我们直接将这个Pipeline
应用于亿级数据。启动应用之后,我们发现计算特别慢,最后在某一个MultiHot
算子出现OOM的问题,导致整个应用失败。
资源优化
我们的Spark应用,处理的数据规模在一亿左右,特征列的数量在120左右,大部分特征以字符串的形式存储,分区数8000,整个数据量占用空间约200G。这并不是一个特别大的数据集,按道理以Spark的设计可以轻松应对才是,但是如果不经优化,在这样的数据集情况下,很多性能问题都会显现出来。
首先我们想到的是优化资源,这是最简单的方式,修改应用启动配置即可。于是我们分配了100个executor,每个executor分配64GB内存加8个计算核心,总分配内存6TB左右。但是结果却不尽如人意,程序执行依然很慢。全部依赖资源是不现实的,我们开始着手优化应用实现。
缓存优化
经过上述优化之后,我们发现虽然我们分配了足够多的资源,但是应用还是会在后面几个MultiHot
算子报错OOM退出。这是为什么呢?检查Spark监控页面下的Storage
页面,我们发现有多个缓存的RDD,其中好几个RDD占用了1TB的内存。这不OOM才怪呢。
(此处的截图仅为了演示创建,非实际的图,实际的图会显示缓存大小1TB左右,分区数8000)
检查代码发现,在流程图步骤5
保存样例数据时,我们添加了一个非预期的dataset.cache()
,这导致每一个算子执行完毕之后都会缓存起来。去掉这个cache之后,测试,OOM问题消失。虽然这个问题没有了,但是应用运行到VectorAssembler.fit
的时候会卡住,等待数小时依然无响应。
虽然没有了MultiHot
算子的OOM问题,但是我们发现算子执行速度特别慢。这也是一个亟待优化的问题。否则为了重现上面的应用无响应问题,我们需要等待超过1小时的时间。这对于我们而言,太慢了。于是我们开始分析为什么算子执行速度特别慢。
StringIndexer
优化
查看Spark监控页面,我们发现在StringIndexer
算子出现了特别多的名为countByValue
的Job
,每个Job
执行都比较慢。
(此处的截图仅为了演示创建,非实际的图,实际的action执行时间在数分钟)
我们知道每个Job
会对应一个action
操作。这就表示我们的StringIndexer
算子针对每一特征列触发了一个action
操作。由于我们在这个算子上面将会处理上百个特征列,所以就出现了特别多的这样的操作。这些操作具体是做什么呢?回顾上述的StringIndexer
的处理过程可知,我们需要先统计每一个特征列的所有分类(fit
过程),然后构造一个字典来实现数据转换(transform
过程)。这里的CountByValue
操作看来是用于统计每一个特征列的所有分类了。一查代码果然如此。
但是为何会这么慢呢?难道每次操作都会重新去hive表读取数据?检查代码发现,果然是没有对输入的数据集进行缓存,在Spark监控页面下的Storage
页面中也未发现任何缓存的数据。在流程图步骤1
读取输入数据集后,添加缓存代码dataset.cache()
,再进行测试。这次CountByValue
速度立即提升了数十倍,达到数秒到数十秒的级别。这时再观察Spark监控页面下的Storage
页面,将发现其缓存了一个HiveTableScan
的RDD
。
经过上述优化,整个StringIndexer
算子的fit
过程还是会花费5分钟左右。是否还可以从算法层面优化这个算子呢?实际上我们可以通过添加前缀的方式设计如下算法来解决这个问题:
dataset.select("a", "b", "c").rdd
.flatMap(row => row.toSeq.zipWithIndex.map { case (v, i) => s"${i}:${v}" }) // 拼接列索引前缀
.countByValue
.keys
.groupBy(key => key.substring(0, key.indexOf(":"))) // 按照列索引分组
.toSeq
.map {
case (colIdx, categories) =>
(Map(0-> "a", 1-> "b", 2-> "c").get(colIdx.toInt).get,
categories.map(cate => cate.substring(cate.indexOf(":") + 1)).toSeq)
}
// for testing
// 1. prepare data from beeline
// create table tt (a int, b varchar, c varchar, d double);
// insert into tt values(1, '1', '2', 3), (2, '2', '3', 4), (3, '3', '4', 5), (4, '4', '5', 6), (5, '5', 6', 7');
// 2. run the code above, got:
// ArrayBuffer((c,ArrayBuffer(4, 5, 6, 2, 3)), (b,ArrayBuffer(4, 5, 1, 2, 3)), (a,ArrayBuffer(4, 5, 1, 2, 3)))
通过添加前缀的方式,我们将多列合并为了一列,那么也就将针对每一特征列的计算,转换为了针对一个列的计算。完成计算之后,在driver端我们再根据之前的拼接规则按列进行拆分即可。
经过这样的算法优化之后,StringIndexer
算子的fit
过程优化到了1分半左右。这里如果我们发现得到的分类数太多,在最后一步map操作中,我们还可以考虑并行的对values
进行处理以提升性能。
MultiHot
优化
在之前的Pipeline
中,我们可以看到我们组合了多个MultiHot
算子,这在实际使用中,不仅带来了易用性的问题(常常有数十个列需要进行处理),而且性能也不高。如何优化这个算子呢?我们可以参考StringIndexer
类似的算法来进行处理。
主要算法代码如下:
val dict = dataframe.select("ma", "mb").rdd
.flatMap(row => row.toSeq.zipWithIndex.flatMap { case (v, i) => v.toString.split(" ").map(vj => s"${i}:${vj}") }) // 每一个值都拼接一个列索引前缀
.countByValue
.keys
.zipWithIndex
.toMap
// for testing
// 1. prepare data from beeline
// create table tt1 (ma string, mb string);
// insert into tt1 values("a b c", "1 2 3"), ("b c d", "2 3 4"), ("c d e", "3 4 5");
// 2. run the code above, got:
// scala.collection.immutable.Map[String,Int] = Map(0:b -> 0, 0:e -> 1, 1:4 -> 2, 1:3 -> 3, 0:a -> 4, 1:2 -> 5, 1:1 -> 6, 0:c -> 7, 0:d -> 8, 1:5 -> 9)
经过这里的处理之后,我们就得到一个所有列的所有可能值的一个大的字典。在进行数据转换时,我们首先需要同样的拼接一个列索引前缀,然后再按照字典查询并填充向量值。主要代码如下:
val dictSize = dict.size
dataframe.select("ma")
.rdd
.map(x => Vectors.sparse(dictSize, x.getString(0).split(" ").map(xi => (dict(s"0:${xi}"), 1.0)).toSeq)) // 拼接一个列索引前缀后再查询上述字典
// for testing
// run the code above, got:
// Array[org.apache.spark.ml.linalg.Vector] = Array((10,[0,4,7],[1.0,1.0,1.0]), (10,[0,7,8],[1.0,1.0,1.0]), (10,[1,7,8],[1.0,1.0,1.0]))
实现了这个优化之后,不仅使得整个fit
计算过程变快了数倍,而且由于我们的算子可以支持同时处理多个特征列,这带来了很大的易用性提升。
对于这里的实现,有经验的同学们可能已经发现另一个优化点,那就是这个dict
变量。这个变量可能非常大,在我们的场景中,它内部可能有上千万个元素。由于map函数中使用到了这个变量,如果不做任何处理,我们将会序列化一个非常大的task到其他executor中执行,这将是非常低效的操作。这里我们可以将这个dict转化为一个广播变量,然后在map函数中引用这个广播变量。
主要代码如下:
val dictBC = dataframe.