kubeflow系列教程1-使用Pipeline的全过程

0 / 1021

Kubeflow项目使得在Kubernetes上部署机器学习的工作流更加容易,可移植并且可扩展。其中Pipeline是Kubeflow的一个核心模块,可以定义机器学习各个步骤的工作流,按照编排来进行执行。

我在学习Kubeflow的过程中,发现网上关于Pipeline方面的介绍比较少,官网的例子则比较晦涩,而且大都是要结合GCP环境来部署的。对于我们个人学习来说,没有一个比较简明的教程。我研究了一下,实现了在个人电脑上用kubeadm部署了一个k8s集群,运行Kubeflow,并以预测泰坦尼克号生还者这一个经典的机器学习任务为例,实现了一个pipeline来完成从数据特征转换,模型训练到数据预测的全过程。

主要包括几个步骤:

  • 安装专门的 SDK
  • Python 定义好 Pipeline
  • SDK 构建 pipeline 的包,最后通过 UI 上传

环境准备

在自己在开发环境安装Pipelines SDK

pip3 install kfp --upgrade --user

echo "export PATH=$PATH:~/.local/bin" >> ~/.bashrc

source ~/.bashrc

which dsl-compile

数据准备

首先需要准备Titantic的数据集,在网上下载之后,我把数据集保存到了本地目录。要使pipeline能访问数据,我是通过创建一个Mount到本地目录的PV来实现的,之后就可以在pipeline里面通过给component attach pvolume的方式来访问数据了。以下是创建PV的yaml:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: my-pv
spec:
  capacity:
    storage: 1Gi
  volumeMode: Filesystem
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Delete
  storageClassName: local-storage
  local:
    path: /home/abc/data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - abc-desktop

数据特征转换

这里略去对数据集的探索的过程。根据数据探索的结果,我们需要填补Age, Embarked这2个特征的缺失值,删去缺失值过多的Cabin特征,从Name中提取生成新的特征Title,以及把Sex, Embarked, Title这3个特征转为类别值。具体的生成pipeline component的Python代码如下,程序名称为titantic_preprocess_component.py,运行后即可生成一个yaml文件:

import kfp.components as comp
 
def titantic_preprocess(
    train_dataset: comp.InputTextFile(str),
    test_dataset: comp.InputTextFile(str),
    processed_train_dataset: comp.OutputBinaryFile(str),
    processed_test_dataset: comp.OutputBinaryFile(str)):
 
    import pandas as pd
    import re
    from sklearn import preprocessing
    import pickle
 
    # Based on the EDA reulst to fill the missing age by title
    age_title = {
        'Mr': 32.368090452261306,
        'Mrs': 35.898148148148145,
        'Miss': 21.773972602739725,
        'Master': 4.574166666666667,
        'Dr': 42.0,
        'Ms': 21.773972602739725
    }
 
    df_train = pd.read_csv(train_dataset, header=0)
    df_test = pd.read_csv(test_dataset, header=0)
 
    # Get the title from the Name feature and generate a new Title feature
    title_regex = re.compile(r'.*, ([^\.]*).*')
    def getTitle(x):
        result = title_regex.search(x)
        if result:
            return result.group(1)
        else:
            return ''
    df_train['Title'] = df_train['Name'].map(getTitle)
    df_test['Title'] = df_test['Name'].map(getTitle)
 
    # Fill the null value of age
    for t in age_title.keys():
        df_train.loc[df_train[(df_train['Title']==t)&(df_train['Age'].isnull())].index, 'Age'] = age_title[t]
        df_test.loc[df_test[(df_test['Title']==t)&(df_test['Age'].isnull())].index, 'Age'] = age_title[t]
 
    # Drop the cabin feature
    df_train = df_train.drop(['Cabin'], axis=1)
    df_test = df_test.drop(['Cabin'], axis=1)
 
    # Two record of Embarked feature missing, fill with the most frequent value
    df_train.loc[df_train[df_train['Embarked'].isnull()].index, 'Embarked'] = 'S'
 
    # Drop the PassengerId, Name, Ticket features, as no use for model training
    df_train = df_train.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
    df_test = df_test.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
 
    # Conver the Sex, Embarked, Title feature to category type
    le_sex = preprocessing.LabelEncoder()
    le_sex.fit(df_train['Sex'])
    df_train['Sex_cat'] = le_sex.transform(df_train['Sex'])
    df_test['Sex_cat'] = le_sex.transform(df_test['Sex'])
    le_embarked = preprocessing.LabelEncoder()
    le_embarked.fit(df_train['Embarked'])
    df_train['Embarked_cat'] = le_embarked.transform(df_train['Embarked'])
    df_test['Embarked_cat'] = le_embarked.transform(df_test['Embarked'])
    le_title = preprocessing.LabelEncoder()
    le_title.fit(df_train['Title'])
    df_train['Title_cat'] = le_title.transform(df_train['Title'])
    df_test.loc[df_test[df_test['Title']=='Ms'].index, 'Title'] = 'Miss'
    df_test.loc[df_test[df_test['Title']=='Dona'].index, 'Title'] = 'Mrs'
    df_test['Title_cat'] = le_title.transform(df_test['Title'])
 
    # Drop the Sex, Embarked, Title features
    df_train = df_train.drop(['Sex', 'Embarked', 'Title'], axis=1)
    df_test = df_test.drop(['Sex', 'Embarked', 'Title'], axis=1)
 
    # Output the processed train and test dataset
    pickle.dump(df_train, processed_train_dataset)
    pickle.dump(df_test, processed_test_dataset)
 
comp.create_component_from_func(
    titantic_preprocess, 
    base_image='gzroy/ml_baseimage',
    output_component_file='titantic_preprocess_component.yaml')

模型训练

特征转换完成后,就可以建立模型来进行训练了。这里我用随机森林来进行训练。程序名称为titantic_train_component.py,运行后同样生成一个yaml文件:

import kfp.components as comp
 
def titantic_train(
    dataset: comp.InputBinaryFile(str),
    model: comp.OutputBinaryFile(str),
    predict_result: comp.OutputTextFile(str)):
 
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    import numpy as np
 
    # Load the preprocessed data
    df_train = pickle.load(dataset)
    
    # Random split the train and test data
    X_train, X_test, y_train, y_test = train_test_split(
        df_train[['Pclass', 'Sex_cat', 'Age', 'SibSp', 'Parch', 'Embarked_cat', 'Title_cat']], 
        df_train['Survived'], 
        test_size=0.2, 
        random_state=123
    )
 
    # Use the random forest classifer to train the model
    rfc = RandomForestClassifier()
    rfc.fit(X_train, y_train)
 
    # Output the prediction on test dataset
    y_pred = rfc.predict(X_test)
    test_result = np.concatenate([y_test.to_numpy().reshape([-1,1]), y_pred.reshape([-1,1])], axis=-1)
    test_result_df = pd.DataFrame(test_result, columns=['Label', 'Prediction'])
    test_result_df.to_csv(predict_result, header=True, index=False)
 
    # Output the model
    pickle.dump(rfc, model)
 
comp.create_component_from_func(
    titantic_train, 
    base_image='gzroy/ml_baseimage',
    output_component_file='titantic_train_component.yaml')

评估性能

模型训练完成之后,我们需要评估模型的性能,因此这里准备了一个component,根据模型训练