kubeflow系列教程2-实现从本地机器挂载阿里云NAS的Pipelines工作流

0 / 804

可体验到

  • 在Kubernetes中创建共享存储
  • 基于Kubeflow Pipelines的机器学习工作流

前置知识

  • Kubernetes Volume
  • 创建NAS存储
  • 了解Python语言
  • 手写数字识别训练

创建外网NAS

  1. 首先你要有一个专有网络VPC

    通常只要你购置了一个ECS云服务器,便会有一个VPC,你的云服务器便是在对应得VPC网络里面(如果你之后购置得NAS创建得专有网络挂载点所用得VPC和你的云服务器在同一个区域,并且使用同一个专有网络,便可以直接挂载)

  2. 接着你需要购买一个NAS

    这边笔者在华北2区(北京)购买了一个NAS。(存储包是需要额外购置得,新买得NAS一般没有存储包,可以自行购买合适得存储包)
    点击管理进去创建挂载点,转下一步

  3. 创建专有网络挂载点

    这一步创建专有网络类型得挂载点,VPC选择之前第一步指定的VPC。

  4. 申请一个弹性公网IP

    申请完之后记得购买共享带宽

  5. 购买NAT网关

    购买网关的时候VPC ID 就使用第一步中指定的VPC。

  6. 配置NAT网关

  1. 测试

    上面的配置如果一切正常,就可以在其它云平台的服务器上用公网IP挂载这个NAS了

mount -t nfs 公网IP:/ /mnt

开发Pipeline

目前提交运行pipelines有2种方法,本质都是使用sdk编译pipelines组件:

  1. 在notebook中使用sdk提交pipelines至服务中心,直接可以在ui中查看pipelines实验运行进度。
  2. 将pipelines组件打成zip包通过ui上传至服务中心,同样可以在ui查看实验运行进度。

kubeflow虽然开源了一段时间,但是一些功能与google云平台耦合依然较重,官方提供的例子在本地k8s集群中使用有诸多坑,也不方便新手理解。下面我将以tensorflow经典的手写数字识别算法为例,在本地k8s集群实践pipelines。

划分业务流程制作业务镜像

为了方便理解pipelines流程组件,我在这里将实验流程分成了数据加载、模型训练、测试数据预测三个环节(实际工作中你可以依据自己的喜好划分流程)
各流程的组件结构如下图所示:
6.png

在构建pipelines前先简单介绍一下三个环节的业务代码:

数据加载环节(load_data.py):

from __future__ import absolute_import, division, print_function, \
    unicode_literals
 
import argparse
import numpy as np
 
 
# load data
def load_data(path):
    with np.load(path) as f:
        x_train, y_train = f['x_train'], f['y_train']
        x_test, y_test = f['x_test'], f['y_test']
 
    return (x_train, y_train), (x_test, y_test)
 
 
# do data transform
def transform(output_dir, file_name):
    x_train_name = 'x_train.npy'
    x_test_name = 'x_test.npy'
    y_train_name = 'y_train.npy'
    y_test_name = 'y_test.npy'
    (x_train, y_train), (x_test, y_test) = load_data(output_dir + file_name)
    print("### loading data done.")
 
    x_train, x_test = x_train / 255.0, x_test / 255.0
    np.save(output_dir + x_train_name, x_train)
    np.save(output_dir + x_test_name, x_test)
    np.save(output_dir + y_train_name, y_train)
    np.save(output_dir + y_test_name, y_test)
    print("### data transform done.")
 
    with open(output_dir + 'train_test_data.txt', 'w') as f:
        f.write(output_dir + x_train_name + ',')
        f.write(output_dir + x_test_name + ',')
        f.write(output_dir + y_train_name + ',')
        f.write(output_dir + y_test_name)
    print("### write train and test data name to: train_test_data.txt done.")
 
 
def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description='Kubeflow MNIST load data script')
    parser.add_argument('--data_dir', type=str, required=True, help='local file dir')
    parser.add_argument('--file_name', type=str, required=True, help='local file to be input')
    args = parser.parse_args()
    return args
 
 
def run():
    args = parse_arguments()
    transform(args.data_dir, args.file_name)
 
 
if __name__ == '__main__':
    run()

这个环节的业务代码主要实现了:
从本地读取源数据 -> 特征变换 -> 分割训练集 -> 存储数据集到本地

  • 代码中使用argparse,用户可以通过rest api的方式传入路径和数据参数运行代码。
  • 代码后续的运行将在容器中进行,在这里使用NFS挂载k8s集群路径作为文件存储的固定地址。
  • 代码中将分割数据集的绝对路径写入train_test_data.txt 为的是方便后续环节引用,同时便于与后续环节构成流程关系。

测试数据预测环节(predict.py):

from __future__ import absolute_import, division, print_function, \
    unicode_literals
 
import argparse
import numpy as np
import pandas as pd
import tensorflow as tf
 
 
def predict(output_dir, model_file, data_file):
    """
    all file use absolute dir
    :param output_dir:
    :param model_file: `model.txt` absolute dir
    :param data_file: `train_test_data.txt` absolute dir
    :return:
    """
    with open(model_file, 'r') as f:
        line = f.readline()
    model = tf.keras.models.load_model(line)
 
    with open(data_file, 'r') as f:
        line = f.readline()
        data_list = line.split(',')
    with open(data_list[1], 'rb') as f:
        x_test = np.load(f)
    with open(data_list[3], 'rb') as f:
        y_test = np.load(f)
 
    pre = model.predict(x_test)
    model.evaluate(x_test, y_test)
    df = pd.DataFrame(data=pre,
                      columns=["prob_0", "prob_1", "prob_2", "prob_3", "prob_4", "prob_5", "prob_6", "prob_7", "prob_8",
                               "prob_9"])
    y_real = pd.DataFrame(data=y_test, columns=["real_number"])
    result = pd.concat([df, y_real], axis=1)
    result.to_csv(output_dir + 'result.csv')
    print("### save predict result file: result.csv")
 
    with open(output_dir + 'result.txt', 'w') as f:
        f.write(output_dir + 'result.csv')
    print("### write result path and name to: result.txt done.")
 
 
def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description='Kubeflow MNIST predict model script')
    parser.add_argument('--data_dir', type=str, required=True, help='local file dir')
    parser.add_argument('--model_file', type=str, required=True, help='a file write trained model absolute dir')
    parser.add_argument('--data_file', type=str, required=True, help='la file write train and test data absolute dir')
    args = parser.parse_args()
    return args
 
 
def run():
    args = parse_arguments()
    predict(args.data_dir, args.model_file, args.data_file)
 
 
if __name__ == '__main__':
    run()

这个环节的业务代码主要实现了:
依据train环节生成的 model.txt文件获取模型文件地址 -> 测试数据集预测 -> 模型性能评估 -> 保留预测结果文件

  • 代码中使用argparse,用户可以通过rest api的方式传入路径和数据参数运行代码。
  • 使用模型预测测试数据集,预测结果存储到集群挂载卷

制作业务代码镜像

构建pipelines组件前需要将业务代码打包成docker镜像,在这里以 load_data 组件为例介绍一下打包业务代码镜像:
准备好dockerfile:包含业务代码、打包依赖的基础镜像、程序入口

FROM tensorflow/tensorflow:1.14.0-py3
ADD load_data.py .
ENTRYPOINT ["python", "load_data.py"]

使用dockerfile打包业务代码镜像(build_image.sh):

#!/usr/bin/env bash
 
docker build -t mnist-load_data:v0.0.1 .

tips:依赖的tensorflow基础镜像,大家去dockerhub不限版本随意拉取一个就好了

更多工程细节和流程结构参考本例项目地址:https://www.aiqianji.com/openoker/mnist_stage.git

A.在notebook中提交pipelines

在notebook中可以使用pipelines python SDK实现编译pipelines同时将pipelines提交运行,构建pipelines组件代码如下(client.py):

import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcp
 
client = kfp.Client()
from kubernetes import client as k8s_client
 
EXPERIMENT_NAME = 'mnist_op'
exp = client.create_experiment(name=EXPERIMENT_NAME)
 
 
class load_dataOp(dsl.ContainerOp):
    """load raw data from tensorflow, do data transform"""
 
    def __init__(self, data_dir, file_name):
        super(load_dataOp, self).__init__(
            name='load_data',
            image='mnist-load_data:v0.0.1',
            arguments=[
                '--file_name', file_name,
                '--data_dir', data_dir,
            ],
            file_outputs={
                'data_file': data_dir + 'train_test_data.txt'
            })
 
 
class trainOp(dsl.ContainerOp):
    """train keras model"""
 
    def __init__(self, data_dir, data_file):
        super(trainOp, self).__init__(
            name='train',
            image='mnist-train:v0.0.1',
            arguments=[
                '--data_dir', data_dir,
                '--data_file', data_file,
            ],
            file_outputs={
                'model_file': data_dir + 'model.txt'
            })
 
 
class predictOp(dsl.ContainerOp):
    """get predict by trained model"""
 
    def __init__(self, data_dir, model_file, data_file):
        super(predictOp, self).__init__(
            name='predict',
            image='mnist-predict:v0.0.1',
            arguments=[
                '--data_dir', data_dir,
                '--model_file', model_file,
                '--data_file', data_file
            ],
            file_outputs={
                'result_file': data_dir + 'result.txt'
            })
 
 
@dsl.pipeline(
    name='MnistStage',
    description='shows how to define dsl.Condition.'
)
def MnistTest():
    data_dir = '/DATA/nfs-data/kubeflow-pv1/'
    file_name = 'mnist.npz'
    load_data = load_dataOp(data_dir, file_name).add_volume(k8s_client.V1Volume(name='mnist-pv',
                                                                                nfs=k8s_client.V1NFSVolumeSource(
                                                                                    path='/DATA/nfs-data/kubeflow-pv1/',
                                                                                    server='10.5.188.249'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))
 
    train = trainOp(data_dir, load_data.outputs['data_file']).add_volume(k8s_client.V1Volume(name='mnist-pv',
                                                                                             nfs=k8s_client.V1NFSVolumeSource(
                                                                                                 path='/DATA/nfs-data/kubeflow-pv1/',
                                                                                                 server='10.5.188.249'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))
 
    predict = predictOp(data_dir, train.outputs['model_file'], load_data.outputs['data_file']).add_volume(
        k8s_client.V1Volume(name='mnist-pv',
                            nfs=k8s_client.V1NFSVolumeSource(
                                path='/DATA/nfs-data/kubeflow-pv1/',
                                server='10.5.188.249'))).add_volume_mount(
        k8s_client.V1VolumeMount(mount_path='/DATA/nfs-data/kubeflow-pv1/', name='mnist-pv'))
 
 
compiler.Compiler().compile(MnistTest, 'mnist.tar.gz')
run = client.run_pipeline(exp.id, 'wbliu4', 'mnist.tar.gz')

在pipelines代码中完成了load_data、train、predict三个环节实例化,以此实现pipelines组件的构造。
在组件中需引用各自业务代码的镜像、明确业务代码需要的rest api 传入参数、业务代码生成的文件和路径
最后在 MnistTest 显示实现pipelines各个组件的关联:
例如在本例中,train组件需要引用load_data组件生成的 train_test_data.txt文件获取训练、测试数据集地址;
predict组件需要引用load_data组件生成的 train_test_data.txt文件获取测试数据集地址、train组件生成的model.txt文件获取模型文件的地址;
notebook中运行完代码后会生成一个pipelines实验链接,通过链接可以访问到实验:
1.png

查看DAG结构:
2.png

pipelines各组件运行结果:
3.png

4.png

5.png

至此使用notebook提交pipelines实验成功!!!

B.将pipelines打包上传至UI

将业务代码打包成镜像
编译压缩工作流程代码为.tar.gz文件
将压缩文件上传至kubeflow ui后,k8s集群将解压缩该压缩文件中的yaml配置文件,然后起一个pipeline工作流任务。
在pipelines的ui界面可以看到pipeline信息和运行状态。