Pyspark Word2Vec + jieba 训练词向量流程

0 / 549

摘要:用商品描述为语料库训练商品词向量为例,分享一下用pyspark自带word2vec+jieba分词训练词向量的流程.

工具:python,pyspark,jieba,pandas,numpy

数据格式:自定义词典,语料库均为pyspark dataframe,停用辞典不大,直接使用txt.

1 create spark

我的pyspark参数设置如下:

def create_spark():
    sparkconf = SparkConf('jianwangzhilai') \
        .setAppName("jianwangzhilai") \
        .set("spark.sql.catalogImplementation","hive") \
        .set("spark.dynamicAllocation.enabled", "false") \
        .set("spark.shuffle.service.enabled", "false") \
        .setExecutorEnv("JAVA_HOME", os.environ["JAVA_HOME"]) \
        .setExecutorEnv("HADOOP_HDFS_HOME", os.environ["HADOOP_HOME"]) \
        .setExecutorEnv("LD_LIBRARY_PATH", os.environ["LD_LIBRARY_PATH"] ) \
        .setExecutorEnv("CLASSPATH", os.environ["CLASSPATH"])

    sparkconf.set("spark.executor.instances", '64')) \
      .set("spark.executor.cores", '8' \
      .set("spark.cores.max",'512') \
      .set("spark.executor.memory",'10g') \
      .set("spark.driver.maxResultSize", "4g")
    
    spark=SparkSession.builder.enableHiveSupport()\
            .config(conf=sparkconf).getOrCreate()
    
    spark.sparkContext.setLogLevel('WARN')
    print('spark created...')
    return spark

设置有点琐碎,但大同小异,唯一需要注意的是,spark.driver.maxResultSize这个参数最好设置大于1g.

2 自定义词典,udf

此处自定义词典需要注意,pyspark存在和jieba不兼容的情况,需要先做如下操作:

##user dict
spark.sparkContext.addFile('./dict/dict.txt')

从pyspark中import如下部分:

from pyspark.sql.types import StringType,ArrayType
from pyspark.sql.functions import udf

定义udf,把jieba分词包装起来,返回一个pyspark可识别的arraytype,array中的基元素是stringtype的,这里需要注意,udf函数中只要jieba.dt.initialized发现没有加载自定义词典,都要重新加载一次,否则不会报错但是会出现分词没有使用自定义词典的情况:

def seg(x):
    if not jieba.dt.initialized:
        jieba.load_userdict('dict.txt')
    s=jieba.lcut(x, cut_all=False,HMM=False)
    s=[x for x in s if len(x)>1]
    return s 

            
seg_udf = udf(seg, ArrayType(StringType()))

3 语料库分词

商品的语料同样通过spark.sql读取

corpus_goods = spark.sql("select * from corpus_goods_description ").cache()

同样,格式为spark dataframe,包含一个主键商品id和一个商品描述description.

利用之前打包的udf,对商品描述进行分词,生成一个新列seg:

corpus_goods = corpus_goods.withColumn('seg',seg_udf(corpus_goods['description']))

4 停用词

停用词因为较少,这里直接保存成了txt格式,读取成list:

stop_words=open('./stopwords.txt', 'r', encoding='utf_8').readlines()
stop_words = [line.strip() for line in stop_words]

停用词去除可以自己写好,一起打包到之前的udf中,只要在seg函数中稍作改变:

if  len(word)>1 and word not in stop_words:
            words.append(word)

也可以通过pyspark自带模块进行去除:

from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="seg", outputCol="words", stopWords=stop_words)               
corpus_goods = remover.transform(corpus_goods)

这里推荐后一种方法.去除停用词后,基本可以进行训练了。

语料分词后,直接进行如下训练:

from pyspark.ml.feature import Word2Vec
w2v = Word2Vec(vectorSize=100, minCount=3,seed=123, numPartitions=64,inputCol="words", outputCol="result")
model = w2v.fit(corpus_goods)
model.getVectors().head(2)
model.getVectors().count()
##save
path = "./models/word2vec"
model.write().overwrite().save(path)

训练很简单,注意numPartitions参数,这个参数默认是1,如果使用默认参数,等于只有一个job进行fit,如果数据很大,这个过程将会非常漫长,这里我设置成和instances相同的大小,也可以设置成其他合适的大小,具体看机器配置.

minCount参数控制了词频,词频低于这个字段的将会被舍弃.vectorSize控制了向量的大小,一般超过50.

词向量训练完成后,得到了每个词的向量表示,此时需要把整个商品的描述也表示成向量,如果自己实现也可,但是pyspark直接一行搞定,速度飞快:

corpus_goods = model.transform(corpus_goods)

此时,corpus_goods数据框中,result字段就是商品描述的文本向量形式了,大工告成.之后可以进行相似度计算或者作为特征进入其他模型.

6 商品相似度计算

补充一下相似度计算的坑.

spark中暂时时发现通过笛卡尔积的形式计算所有商品的相似度.

df = corpus_goods .crossJoin(corpus_goods ).toDF('id1','result',\
                                                        'id2','result2')

之后通过定义udf,计算余弦夹角.

from scipy import spatial
@udf(returnType=FloatType())
def sim(x,y):
        return float(1-spatial.distance.cosine(x, y))

df = df.withColumn('sim',sim(df['result'],df['result2']))

这里直接引用了scipy的余弦距离,用1减去之后就是余弦相似度.

注意,必须用float函数包一下结果,因为如果直接返回计算出的余弦相似度,格式是numpy.float64的,udf中returnType=FloatType(),会引起不兼容,所以必须返回一个python的float格式.

最后相似度保存hive:

df.write.format('orc').mode('overwrite').saveAsTable(table_name)