Kubeflow项目使得在Kubernetes上部署机器学习的工作流更加容易,可移植并且可扩展。其中Pipeline是Kubeflow的一个核心模块,可以定义机器学习各个步骤的工作流,按照编排来进行执行。
我在学习Kubeflow的过程中,发现网上关于Pipeline方面的介绍比较少,官网的例子则比较晦涩,而且大都是要结合GCP环境来部署的。对于我们个人学习来说,没有一个比较简明的教程。我研究了一下,实现了在个人电脑上用kubeadm部署了一个k8s集群,运行Kubeflow,并以预测泰坦尼克号生还者这一个经典的机器学习任务为例,实现了一个pipeline来完成从数据特征转换,模型训练到数据预测的全过程。
主要包括几个步骤:
- 安装专门的 SDK
- Python 定义好 Pipeline
- SDK 构建 pipeline 的包,最后通过 UI 上传
环境准备
在自己在开发环境安装Pipelines SDK
pip3 install kfp --upgrade --user
echo "export PATH=$PATH:~/.local/bin" >> ~/.bashrc
source ~/.bashrc
which dsl-compile
数据准备
首先需要准备Titantic的数据集,在网上下载之后,我把数据集保存到了本地目录。要使pipeline能访问数据,我是通过创建一个Mount到本地目录的PV来实现的,之后就可以在pipeline里面通过给component attach pvolume的方式来访问数据了。以下是创建PV的yaml:
apiVersion: v1
kind: PersistentVolume
metadata:
name: my-pv
spec:
capacity:
storage: 1Gi
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Delete
storageClassName: local-storage
local:
path: /home/abc/data
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- abc-desktop
数据特征转换
这里略去对数据集的探索的过程。根据数据探索的结果,我们需要填补Age, Embarked这2个特征的缺失值,删去缺失值过多的Cabin特征,从Name中提取生成新的特征Title,以及把Sex, Embarked, Title这3个特征转为类别值。具体的生成pipeline component的Python代码如下,程序名称为titantic_preprocess_component.py,运行后即可生成一个yaml文件:
import kfp.components as comp
def titantic_preprocess(
train_dataset: comp.InputTextFile(str),
test_dataset: comp.InputTextFile(str),
processed_train_dataset: comp.OutputBinaryFile(str),
processed_test_dataset: comp.OutputBinaryFile(str)):
import pandas as pd
import re
from sklearn import preprocessing
import pickle
# Based on the EDA reulst to fill the missing age by title
age_title = {
'Mr': 32.368090452261306,
'Mrs': 35.898148148148145,
'Miss': 21.773972602739725,
'Master': 4.574166666666667,
'Dr': 42.0,
'Ms': 21.773972602739725
}
df_train = pd.read_csv(train_dataset, header=0)
df_test = pd.read_csv(test_dataset, header=0)
# Get the title from the Name feature and generate a new Title feature
title_regex = re.compile(r'.*, ([^\.]*).*')
def getTitle(x):
result = title_regex.search(x)
if result:
return result.group(1)
else:
return ''
df_train['Title'] = df_train['Name'].map(getTitle)
df_test['Title'] = df_test['Name'].map(getTitle)
# Fill the null value of age
for t in age_title.keys():
df_train.loc[df_train[(df_train['Title']==t)&(df_train['Age'].isnull())].index, 'Age'] = age_title[t]
df_test.loc[df_test[(df_test['Title']==t)&(df_test['Age'].isnull())].index, 'Age'] = age_title[t]
# Drop the cabin feature
df_train = df_train.drop(['Cabin'], axis=1)
df_test = df_test.drop(['Cabin'], axis=1)
# Two record of Embarked feature missing, fill with the most frequent value
df_train.loc[df_train[df_train['Embarked'].isnull()].index, 'Embarked'] = 'S'
# Drop the PassengerId, Name, Ticket features, as no use for model training
df_train = df_train.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
df_test = df_test.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
# Conver the Sex, Embarked, Title feature to category type
le_sex = preprocessing.LabelEncoder()
le_sex.fit(df_train['Sex'])
df_train['Sex_cat'] = le_sex.transform(df_train['Sex'])
df_test['Sex_cat'] = le_sex.transform(df_test['Sex'])
le_embarked = preprocessing.LabelEncoder()
le_embarked.fit(df_train['Embarked'])
df_train['Embarked_cat'] = le_embarked.transform(df_train['Embarked'])
df_test['Embarked_cat'] = le_embarked.transform(df_test['Embarked'])
le_title = preprocessing.LabelEncoder()
le_title.fit(df_train['Title'])
df_train['Title_cat'] = le_title.transform(df_train['Title'])
df_test.loc[df_test[df_test['Title']=='Ms'].index, 'Title'] = 'Miss'
df_test.loc[df_test[df_test['Title']=='Dona'].index, 'Title'] = 'Mrs'
df_test['Title_cat'] = le_title.transform(df_test['Title'])
# Drop the Sex, Embarked, Title features
df_train = df_train.drop(['Sex', 'Embarked', 'Title'], axis=1)
df_test = df_test.drop(['Sex', 'Embarked', 'Title'], axis=1)
# Output the processed train and test dataset
pickle.dump(df_train, processed_train_dataset)
pickle.dump(df_test, processed_test_dataset)
comp.create_component_from_func(
titantic_preprocess,
base_image='gzroy/ml_baseimage',
output_component_file='titantic_preprocess_component.yaml')
模型训练
特征转换完成后,就可以建立模型来进行训练了。这里我用随机森林来进行训练。程序名称为titantic_train_component.py,运行后同样生成一个yaml文件:
import kfp.components as comp
def titantic_train(
dataset: comp.InputBinaryFile(str),
model: comp.OutputBinaryFile(str),
predict_result: comp.OutputTextFile(str)):
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import numpy as np
# Load the preprocessed data
df_train = pickle.load(dataset)
# Random split the train and test data
X_train, X_test, y_train, y_test = train_test_split(
df_train[['Pclass', 'Sex_cat', 'Age', 'SibSp', 'Parch', 'Embarked_cat', 'Title_cat']],
df_train['Survived'],
test_size=0.2,
random_state=123
)
# Use the random forest classifer to train the model
rfc = RandomForestClassifier()
rfc.fit(X_train, y_train)
# Output the prediction on test dataset
y_pred = rfc.predict(X_test)
test_result = np.concatenate([y_test.to_numpy().reshape([-1,1]), y_pred.reshape([-1,1])], axis=-1)
test_result_df = pd.DataFrame(test_result, columns=['Label', 'Prediction'])
test_result_df.to_csv(predict_result, header=True, index=False)
# Output the model
pickle.dump(rfc, model)
comp.create_component_from_func(
titantic_train,
base_image='gzroy/ml_baseimage',
output_component_file='titantic_train_component.yaml')
评估性能
模型训练完成之后,我们需要评估模型的性能,因此这里准备了一个component,根据模型训练