kubeflow/pipelines 是 Kubeflow 社区新近开源的端到端的 ML/DL 工作流系统。近些年来,随着深度学习带来的 AI 领域的繁荣,对 ML/DL 业务的端到端支持成为了工业界关注的一个热点。本文主要从使用与实现的角度,分析 kubeflow/pipelines 的功能,以及对其的看法。

核心概念

kubeflow/pipelines 实现了一个工作流模型。所谓工作流,或者称之为流水线(pipeline),可以将其当做一个有向无环图(DAG)。其中的每一个节点,在 kubeflow/pipelines 的语义下被称作组件(component)。组件在图中作为一个节点,其会处理真正的逻辑,比如预处理,数据清洗,模型训练等等。每一个组件负责的功能不同,但有一个共同点,即组件都是以 Docker 镜像的方式被打包,以容器的方式被运行的。这也是与 kubeflow 社区的 Run ML on Kubernetes 这一愿景相统一的。

流水线

实验(experiment)是一个工作空间,在其中可以针对流水线尝试不同的配置。运行(run)是流水线的一次执行,用户在执行的过程中可以看到每一步的输出文件,以及日志。步(step)是组件的一次运行,步与组件的关系就像是运行与流水线的关系一样。步输出工件(step output artifacts)是在组件的一次运行结束后输出的,能被系统的前端理解并渲染可视化的文件。

步输出工件的可视化

在介绍完核心的概念之后,kubeflow/pipelines 的功能便容易理解了,它就是一个支持自定义的流水线系统,用户可以通过它定义自己的机器学习流水线,并且在执行时输出一定的用于可视化过程的文件,系统的前端会将其可以理解的文件进行可视化,方便用户跟踪学习过程。

实现第一个流水线

这里有一个基于 jupyterhub 的例子:KubeFlow Pipeline Using TFX OSS Components,利用 TensorFlow 作为示例,实现了一个完整的 kubeflow 流水线。

流水线的定义可以分为两步,首先是定义组件:

def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:0.0.42',
        arguments = [
            '--transformed-data-dir', transformed_data_dir,
            '--schema', schema,
            '--learning-rate', learning_rate,
            '--hidden-layer-size', hidden_layer_size,
            '--steps', steps,
            '--target', target,
            '--preprocessing-module', preprocess_module,
            '--job-dir', training_output,
        ],
        file_outputs = {'train': '/output.txt'}
    )

这里的例子定义了一个组件,其负责模型的训练。这一组件只是简单地定义了一个 Docker 容器,利用了镜像中已有的 TensorFlow 框架进行训练。在更加高级的用法中,组件可以从镜像开始完全自定义。有时用户需要自定义其使用的组件,这里介绍一下自定义的方式。首先需要打包一个 Docker 镜像,这个镜像是组件的依赖,每一个组件的运行,就是一个 Docker 容器。其次需要为其定义一个 python 函数,描述组件的输入输出等信息,这一定义是为了能够让流水线理解组件在流水线中的结构,有几个输入节点,几个输出节点,等等。接下来组件的使用就与普通的组件并无二致了。

实现流水线的第二步,就是根据定义好的组件组成流水线:

# The pipeline definition
@dsl.pipeline(
  name='TFX Taxi Cab Classification Pipeline Example',
  description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
    output,
    project,
    column_names=dsl.PipelineParam(name='column-names', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),
    key_columns=dsl.PipelineParam(name='key-columns', value='trip_start_timestamp'),
    train=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),
    evaluation=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),
    ...):
    ...
    preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output)
    training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output)
    ...

在流水线中,由输入输出关系会确定图上的边以及方向。在上例中,training 的输入是 preprocess 的输出,因此 preprocess 会有一条边指向 training,代表两者的拓扑顺序。在定义好流水线后,可以通过 python 中实现好的流水线客户端提交到系统中运行。

实现思路

虽然 kubeflow/pipelines 的使用略显复杂,但它的实现其实并不麻烦。整个的架构可以分为五个部分,分别是 ScheduledWorkflow CRD 以及其 operator,流水线前端,流水线后端, Python SDK 和 persistence agent。ScheduledWorkflow CRD 扩展了 argoproj/argo 的 Workflow 定义,其 API 在此处可见。这也是流水线项目中的核心部分,它负责真正地在 Kubernetes 上按照拓扑序创建出对应的容器完成流水线的逻辑。

Python SDK 负责构造出流水线,并且根据流水线构造出 ScheduledWorkflow 的 YAML 定义,随后将其作为参数传递给流水线系统的后端服务。后端服务依赖关系存储数据库(如 MySQL)和对象存储(如 Amazon S3),处理所有流水线中的 CRUD 请求。前端负责可视化整个流水线的过程,以及获取日志,发起新的运行等。Persistence agent 负责把数据从 Kubernetes Master 的 etcd 中 sync 到后端服务的关系型数据库中,其实现的方式与 CRD operator 类似,通过 informer 来监听 Kubernetes apiserver 对应资源实现。

结语

kubeflow/pipelines 的介绍基本到此为止,本身这是一个值得参考的流水线系统。其中最令我称道的地方是关于每一步输出工件的渲染支持上。这样的设定能够支持每一个组件都可以输出一定程度的可视化,非常有助于帮助工程师了解在运行中的状态。但是不得不说,这是一个非常复杂的系统,看上去更像是谷歌实现给自己的工程师用的工具,在其中能嗅到 ksonnet 的味道。至今笔者仍然不能理解自定义组件的应用场景,在有了 Docker 镜像的支持下,这一层抽象有点多余的感觉。

License

  • This article is licensed under CC BY-NC-SA 3.0.
  • Please contact me for commercial use.

评论