kubeflow系列教程1-使用Pipeline的全过程

0 / 928

Kubeflow项目使得在Kubernetes上部署机器学习的工作流更加容易,可移植并且可扩展。其中Pipeline是Kubeflow的一个核心模块,可以定义机器学习各个步骤的工作流,按照编排来进行执行。

我在学习Kubeflow的过程中,发现网上关于Pipeline方面的介绍比较少,官网的例子则比较晦涩,而且大都是要结合GCP环境来部署的。对于我们个人学习来说,没有一个比较简明的教程。我研究了一下,实现了在个人电脑上用kubeadm部署了一个k8s集群,运行Kubeflow,并以预测泰坦尼克号生还者这一个经典的机器学习任务为例,实现了一个pipeline来完成从数据特征转换,模型训练到数据预测的全过程。

主要包括几个步骤:

  • 安装专门的 SDK
  • Python 定义好 Pipeline
  • SDK 构建 pipeline 的包,最后通过 UI 上传

环境准备

在自己在开发环境安装Pipelines SDK

pip3 install kfp --upgrade --user

echo "export PATH=$PATH:~/.local/bin" >> ~/.bashrc

source ~/.bashrc

which dsl-compile

数据准备

首先需要准备Titantic的数据集,在网上下载之后,我把数据集保存到了本地目录。要使pipeline能访问数据,我是通过创建一个Mount到本地目录的PV来实现的,之后就可以在pipeline里面通过给component attach pvolume的方式来访问数据了。以下是创建PV的yaml:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: my-pv
spec:
  capacity:
    storage: 1Gi
  volumeMode: Filesystem
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Delete
  storageClassName: local-storage
  local:
    path: /home/abc/data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - abc-desktop

数据特征转换

这里略去对数据集的探索的过程。根据数据探索的结果,我们需要填补Age, Embarked这2个特征的缺失值,删去缺失值过多的Cabin特征,从Name中提取生成新的特征Title,以及把Sex, Embarked, Title这3个特征转为类别值。具体的生成pipeline component的Python代码如下,程序名称为titantic_preprocess_component.py,运行后即可生成一个yaml文件:

import kfp.components as comp
 
def titantic_preprocess(
    train_dataset: comp.InputTextFile(str),
    test_dataset: comp.InputTextFile(str),
    processed_train_dataset: comp.OutputBinaryFile(str),
    processed_test_dataset: comp.OutputBinaryFile(str)):
 
    import pandas as pd
    import re
    from sklearn import preprocessing
    import pickle
 
    # Based on the EDA reulst to fill the missing age by title
    age_title = {
        'Mr': 32.368090452261306,
        'Mrs': 35.898148148148145,
        'Miss': 21.773972602739725,
        'Master': 4.574166666666667,
        'Dr': 42.0,
        'Ms': 21.773972602739725
    }
 
    df_train = pd.read_csv(train_dataset, header=0)
    df_test = pd.read_csv(test_dataset, header=0)
 
    # Get the title from the Name feature and generate a new Title feature
    title_regex = re.compile(r'.*, ([^\.]*).*')
    def getTitle(x):
        result = title_regex.search(x)
        if result:
            return result.group(1)
        else:
            return ''
    df_train['Title'] = df_train['Name'].map(getTitle)
    df_test['Title'] = df_test['Name'].map(getTitle)
 
    # Fill the null value of age
    for t in age_title.keys():
        df_train.loc[df_train[(df_train['Title']==t)&(df_train['Age'].isnull())].index, 'Age'] = age_title[t]
        df_test.loc[df_test[(df_test['Title']==t)&(df_test['Age'].isnull())].index, 'Age'] = age_title[t]
 
    # Drop the cabin feature
    df_train = df_train.drop(['Cabin'], axis=1)
    df_test = df_test.drop(['Cabin'], axis=1)
 
    # Two record of Embarked feature missing, fill with the most frequent value
    df_train.loc[df_train[df_train['Embarked'].isnull()].index, 'Embarked'] = 'S'
 
    # Drop the PassengerId, Name, Ticket features, as no use for model training
    df_train = df_train.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
    df_test = df_test.drop(['PassengerId', 'Name', 'Ticket'], axis=1)
 
    # Conver the Sex, Embarked, Title feature to category type
    le_sex = preprocessing.LabelEncoder()
    le_sex.fit(df_train['Sex'])
    df_train['Sex_cat'] = le_sex.transform(df_train['Sex'])
    df_test['Sex_cat'] = le_sex.transform(df_test['Sex'])
    le_embarked = preprocessing.LabelEncoder()
    le_embarked.fit(df_train['Embarked'])
    df_train['Embarked_cat'] = le_embarked.transform(df_train['Embarked'])
    df_test['Embarked_cat'] = le_embarked.transform(df_test['Embarked'])
    le_title = preprocessing.LabelEncoder()
    le_title.fit(df_train['Title'])
    df_train['Title_cat'] = le_title.transform(df_train['Title'])
    df_test.loc[df_test[df_test['Title']=='Ms'].index, 'Title'] = 'Miss'
    df_test.loc[df_test[df_test['Title']=='Dona'].index, 'Title'] = 'Mrs'
    df_test['Title_cat'] = le_title.transform(df_test['Title'])
 
    # Drop the Sex, Embarked, Title features
    df_train = df_train.drop(['Sex', 'Embarked', 'Title'], axis=1)
    df_test = df_test.drop(['Sex', 'Embarked', 'Title'], axis=1)
 
    # Output the processed train and test dataset
    pickle.dump(df_train, processed_train_dataset)
    pickle.dump(df_test, processed_test_dataset)
 
comp.create_component_from_func(
    titantic_preprocess, 
    base_image='gzroy/ml_baseimage',
    output_component_file='titantic_preprocess_component.yaml')

模型训练

特征转换完成后,就可以建立模型来进行训练了。这里我用随机森林来进行训练。程序名称为titantic_train_component.py,运行后同样生成一个yaml文件:

import kfp.components as comp
 
def titantic_train(
    dataset: comp.InputBinaryFile(str),
    model: comp.OutputBinaryFile(str),
    predict_result: comp.OutputTextFile(str)):
 
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    import numpy as np
 
    # Load the preprocessed data
    df_train = pickle.load(dataset)
    
    # Random split the train and test data
    X_train, X_test, y_train, y_test = train_test_split(
        df_train[['Pclass', 'Sex_cat', 'Age', 'SibSp', 'Parch', 'Embarked_cat', 'Title_cat']], 
        df_train['Survived'], 
        test_size=0.2, 
        random_state=123
    )
 
    # Use the random forest classifer to train the model
    rfc = RandomForestClassifier()
    rfc.fit(X_train, y_train)
 
    # Output the prediction on test dataset
    y_pred = rfc.predict(X_test)
    test_result = np.concatenate([y_test.to_numpy().reshape([-1,1]), y_pred.reshape([-1,1])], axis=-1)
    test_result_df = pd.DataFrame(test_result, columns=['Label', 'Prediction'])
    test_result_df.to_csv(predict_result, header=True, index=False)
 
    # Output the model
    pickle.dump(rfc, model)
 
comp.create_component_from_func(
    titantic_train, 
    base_image='gzroy/ml_baseimage',
    output_component_file='titantic_train_component.yaml')

评估性能

模型训练完成之后,我们需要评估模型的性能,因此这里准备了一个component,根据模型训练之后对验证集的预测值进行准确率的评分:

import kfp.components as comp
from typing import NamedTuple
 
def produce_metrics(
    predict_file: comp.InputTextFile(str),
    mlpipeline_metrics_path: comp.OutputPath('Metrics')
):
    import json
    import pandas as pd
    from sklearn.metrics import accuracy_score
 
    df = pd.read_csv(predict_file, header=0)
    accuracy = accuracy_score(df['Label'], df['Prediction'])
    metrics = {
        'metrics': [{
            'name': 'accuracy-score', # The name of the metric. Visualized as the column name in the runs table.
            'numberValue':  accuracy, # The value of the metric. Must be a numeric value.
            'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        }]
    }
 
    with open(mlpipeline_metrics_path, 'w') as f:
        json.dump(metrics, f)
 
comp.create_component_from_func(
    produce_metrics, 
    base_image='gzroy/ml_baseimage',
    output_component_file='metrics_component.yaml')

测试集数据预测

模型训练后就可以对测试集的数据进行预测了,代码如下:

import kfp.components as comp
 
def titantic_predict(
    dataset: comp.InputBinaryFile(str),
    model: comp.InputBinaryFile(str),
    predict_result: comp.OutputTextFile(str)):
 
    import pandas as pd
    import pickle
 
    # Load the preprocessed data
    df_test = pickle.load(dataset)
    # Load the model
    rfc = pickle.load(model)
    # Predict
    predict = rfc.predict(df_test[['Pclass', 'Sex_cat', 'Age', 'SibSp', 'Parch', 'Embarked_cat', 'Title_cat']])
    df_test['Prediction'] = predict
    df_test.to_csv(predict_result, header=True, index=False)
 
comp.create_component_from_func(
    titantic_predict, 
    base_image='gzroy/ml_baseimage',
    output_component_file='titantic_predict_component.yaml')

生成模块的基础镜像

在以上模块中都需要基于基础镜像来运行,以下是构造镜像的Dockerfile:

FROM ubuntu:18.04
RUN apt update \
&& apt install python3.8 -y \
&& apt install python-pip3 -y \
&& rm /usr/bin/python3 \
&& ln -s /usr/bin/python3.8 /usr/bin/python3 \
&& pip3 install pandas -y \
&& pip3 install sklearn -y

生成Pipeline

现在可以定义一个Pipeline,加载之前创建的模块的yaml文件,按照模块的运行顺序进行编排。这里定义了一个produce_data_op,用于生成一个简单的模块,访问pipeline volume里面的数据集,并传给下一个模块:

import kfp
from kfp import dsl
 
@kfp.dsl.pipeline(
    name='Titantic training pipeline',
    description='My machine learning pipeline'
)
def titantic_pipeline():
    vop = dsl.VolumeOp(
        name="volume_creation",
        resource_name="titantic_pvc",
        storage_class="local-storage",
        modes=["ReadWriteOnce"],
        size="1Gi",
        volume_name="my-pv"
    )
 
    def produce_data_op(volume):
        return dsl.ContainerOp(
            name="Titantic-Data",
            image="ubuntu:18.04",
            file_outputs={
                'train_dataset': '/data/titantic/train.csv',
                'test_dataset': '/data/titantic/test.csv'
            },
            pvolumes={"/data": volume}
    )
 
    produce_data_task = produce_data_op(vop.volume)
 
    preprocess_op = kfp.components.load_component_from_file('titantic_preprocess_component.yaml')
    preprocess_task = preprocess_op(
        produce_data_task.outputs['train_dataset'], 
        produce_data_task.outputs['test_dataset'])
 
    train_op = kfp.components.load_component_from_file('titantic_train_component.yaml')
    train_task = train_op(preprocess_task.outputs['processed_train_dataset'])
 
    metrics_op = kfp.components.load_component_from_file('metrics_component.yaml')
    metrics_task = metrics_op(train_task.outputs['predict_result'])
 
    predict_op = kfp.components.load_component_from_file('titantic_predict_component.yaml')
    predict_task = predict_op(preprocess_task.outputs['processed_test_dataset'], train_task.outputs['model'])

之后运行以下命令来把pipeline.py编译为yaml文件并打包

python3 titantic_preprocess_component.py
python3 titantic_train_component.py
python3 metrics_component.py
python3 titantic_predict_component.py
dsl-compile --py pipeline.py --output pipeline.tar.gz

运行Pipeline
在Kubeflow的dashboard中创建一个Pipeline,上传之前打包的pipeline文件,然后创建一个运行即可。