AWS re:Invent 2019 上,SageMaker 发布不少新功能,其中包括:Deep Graph Library 支持,机器学习训练流程管理能力(Experiment),自动机器学习工具 Autopilot,数据处理和模型评估的管理(Processing),模型自动监控工具 Monitor,模型训练过程的调试工具 Debugger,ML 场景下的 IDE Studio。这篇文章介绍并分析了这些新特性的功能,同时夹杂一点个人对这些特性的看法和观点。

Deep Graph Library 支持

Deep Graph Library(以下称作 DGL) 是一个用来构建 GNN 的 Python 库。DGL 目前支持 PyTorch 和 MXNet,它为 GNN 的构建提供了高层的 API。

import dgl
import torch as th

g = dgl.DGLGraph()
g.add_nodes(5)                          # add 5 nodes
g.add_edges([0, 0, 0, 0], [1, 2, 3, 4]) # add 4 edges 0->1, 0->2, 0->3, 0->4
g.ndata['h'] = th.randn(5, 3)           # assign one 3D vector to each node
g.edata['h'] = th.randn(4, 4)           # assign one 4D vector to each edge

上面的示例来自 DGL,其构建了一个简单的 GNN 网络,它对 GCN 等都有对应的支持。目前 SageMaker 通过其 Estimator 接口支持发起一次 DGL 训练。这一特性并无太多值得称道之处,算是一个常规的特性。

机器学习训练管理能力(Experiment)

众所周知,机器学习的模型训练是一个迭代的过程。对于一个模型的训练,往往有多次的调参-训练的迭代。Experiment 就是对这一迭代过程的抽象。在介绍训练之前,首先介绍一下 Trial。Trial 是一次训练的过程。其中包括数据预处理,模型训练,模型评估等。而 Experiment,是由多个 Trial 组成的一个集合,比如一组相关的训练任务等。因此 Experiment 跟 Google Vizier 中的 Study,Katib 中的 Experiment 概念类似。

利用 SageMaker 提供的 SDK,Experiment 可以发起多次 Trial,并且提供一些数据的比对等功能。

from smexperiments.experiment import Experiment

# Create experiment.
mnist_experiment = Experiment.create(
    experiment_name="mnist-hand-written-digits-classification", 
    description="Classification of mnist hand-written digits", 
    sagemaker_boto_client=sm)

# Create trials
for i, num_hidden_channel in enumerate([2, 5, 10, 20, 32]):
    trial_name = f"cnn-training-job-{num_hidden_channel}-hidden-channels-{int(time.time())}"
    cnn_trial = Trial.create(
        trial_name=trial_name, 
        experiment_name=mnist_experiment.experiment_name,
        sagemaker_boto_client=sm,
    )
    cnn_trial.add_trial_component(tracker.trial_component)

    cnn_training_job_name = "cnn-training-job-{}".format(int(time.time()))
    
    # Create one PyTorch or Tensorflow job.
    estimator = PyTorch(
        entry_point='mnist.py',
        role=role,
        sagemaker_session=sess,
        framework_version='1.1.0',
        train_instance_count=1,
        train_instance_type='ml.p3.2xlarge',
        hyperparameters={
            'hidden_channels': num_hidden_channels
        },
        metric_definitions=[
            {'Name':'train:loss', 'Regex':'Train Loss: (.*?);'},
            {'Name':'test:loss', 'Regex':'Test Average loss: (.*?),'},
            {'Name':'test:accuracy', 'Regex':'Test Accuracy: (.*?)%;'}
        ]
    )

    # Training.
    estimator.fit(
        inputs={'training': inputs}, 
        job_name=cnn_training_job_name,
        experiment_config={
            "ExperimentName": mnist_experiment.experiment_name, 
            "TrialName": cnn_trial.trial_name,
            "TrialComponentDisplayName": "Training",
        }
    )

上述代码是一次完整的训练过程。这次训练暴力搜索了 num_hidden_channel 的取值,并且所有的训练都归属于一个 Experiment。可以看到,其中的概念与 katib Experiment 非常类似。不过其采取的思路与 NNI 更相似,为用户提供了一个 SDK。这样的方式对用户的代码有一定的侵入性。

自动机器学习工具 Autopilot

SageMaker 之前已经有了一个做模型调优的功能 Model Tuning,它支持进行超参数优化。而新退出的这一功能 Autopilot,支持更多的特性:

  • 支持 tabular 格式的输入,可以自动地进行模型输入的预处理工作
  • 传统机器学习算法的自动模型选择(Automatic algorithm selection)
  • 自动超参数优化
  • 分布式训练
  • 自动的集群大小调整

通过下面的代码,就可以发起一次基于 Autopilot 的训练。看上去就跟 autosklearn 非常接近。

auto_ml_job_name = 'automl-dm-' + timestamp_suffix
print('AutoMLJobName: ' + auto_ml_job_name)

sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name,
                      InputDataConfig=input_data_config,
                      OutputDataConfig=output_data_config,
                      RoleArn=role)

candidates = sm.list_candidates_for_auto_ml_job(AutoMLJobName=auto_ml_job_name, SortBy='FinalObjectiveMetricValue')['Candidates']

model_arn = sm.create_model(Containers=best_candidate['InferenceContainers'],
                            ModelName=model_name,
                            ExecutionRoleArn=role)

ep_config = sm.create_endpoint_config(EndpointConfigName = epc_name,
                                      ProductionVariants=[{'InstanceType':'ml.m5.2xlarge',
                                                           'InitialInstanceCount':1,
                                                           'ModelName':model_name,
                                                           'VariantName':variant_name}])

create_endpoint_response = sm.create_endpoint(EndpointName=ep_name,
                                              EndpointConfigName=epc_name)

训练一共会被分为如下步骤:

  • 数据切分,划分为训练集和验证集
  • 数据分析,推荐合理的流水线(应该是算法的意思)
  • 特征工程,其中包括数据转换等
  • 选择合适的流水线,调整超参数

除此之外,Autopilot 会生成两个 Jupyter Notebook:

job = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
job_data_notebook = job['AutoMLJobArtifacts']['DataExplorationNotebookLocation']
job_candidate_notebook = job['AutoMLJobArtifacts']['CandidateDefinitionNotebookLocation']

print(job_data_notebook)
print(job_candidate_notebook)

s3://<PREFIX_REMOVED>/notebooks/SageMakerAutopilotCandidateDefinitionNotebook.ipynb
s3://<PREFIX_REMOVED>/notebooks/SageMakerAutopilotDataExplorationNotebook.ipynb

这两个 notebook 中的第一个记录了搜索到的候选模型的一些信息,而且所有的代码都是可用的。第二个记录了数据集相关的信息,不确定有没有特征工程有关的内容。

总体来说,是一个主打自动模型选择的功能,这一功能在传统机器学习算法领域会更有价值。

数据处理和模型评估的管理

在模型训练的流水线中,数据的准备工作是比较麻烦的。SageMaker 基于 sklearn 的 data transform 功能,提供了一种内置的数据处理功能的支持。

from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_count=1,
                                     instance_type='ml.m5.xlarge')

from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_processor.run(
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
    inputs=[ProcessingInput(
        source='dataset.csv',
        destination='/opt/ml/processing/input')],
    outputs=[ProcessingOutput(source='/opt/ml/processing/output/train'),
        ProcessingOutput(source='/opt/ml/processing/output/validation'),
        ProcessingOutput(source='/opt/ml/processing/output/test')]
)

上述代码是其中的输入输出配置。本质上,它是利用容器来实现的,在 SDK 定义的函数中,用户需要指定运行的代码文件,输入和输出,就可以进行标准化的处理。其中 preprocessing.py 就是用户提供的预处理代码:

import pandas as pd
from sklearn.model_selection import train_test_split
# Read data locally 
df = pd.read_csv('/opt/ml/processing/input/dataset.csv')
# Preprocess the data set
downsampled = apply_mad_data_science_skills(df)
# Split data set into training, validation, and test
train, test = train_test_split(downsampled, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)
# Create local output directories
try:
    os.makedirs('/opt/ml/processing/output/train')
    os.makedirs('/opt/ml/processing/output/validation')
    os.makedirs('/opt/ml/processing/output/test')
except:
    pass
# Save data locally
train.to_csv("/opt/ml/processing/output/train/train.csv")
validation.to_csv("/opt/ml/processing/output/validation/validation.csv")
test.to_csv("/opt/ml/processing/output/test/test.csv")
print('Finished running processing job')

基本来说,是一个对容器的应用。值得一提的是,Amazon 对于资源采取了简化的处理,分成了大中小等不同资源容量的机器,这里用的就是 ml.m5.xlarge。这种思路值得参考,它可以简化用户申请资源的配置选择,但是也一定程度上限制了用户的自由。

既然是使用容器来实现的,这一功能同样支持自定义容器,其中差异就是需要提供一个镜像的路径。

from sagemaker.processing import ScriptProcessor
script_processor = ScriptProcessor(image_uri='123456789012.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spacy-container:latest',
                role=role,
                instance_count=1,
                instance_type='ml.m5.xlarge')

模型服务自动监控工具

模型服务上线后,如何对它的效果和请求进行分析,是一个困难的问题。模型服务自动监控功能,就是通过对上线的服务进行监控和分析,来更好的了解模型服务情况。它的功能可以概括为:

  • 查看模型服务的历史输入输出
  • 查看模型服务的表现和统计数据

其中对数据的最简单的观察就是查看输入和输出,DataCaptureConfig 的配置支持这一特性:

data_capture_configuration = {
    "EnableCapture": True,
    "InitialSamplingPercentage": 100,
    "DestinationS3Uri": s3_capture_upload_path,
    "CaptureOptions": [
        { "CaptureMode": "Output" },
        { "CaptureMode": "Input" }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"],
       "JsonContentTypes": ["application/json"]
}
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialInstanceCount':1,
        'InitialVariantWeight':1,
        'ModelName':model_name,
        'VariantName':'AllTrafficVariant'
    }],
    DataCaptureConfig = data_capture_configuration)

在利用 DataCaptureConfig 创建好 Endpoint 后,可以查看其历史输入输出:

    "endpointInput":{
        "observedContentType":"text/csv",
        "mode":"INPUT",
        "data":"132,25,113.2,96,269.9,107,229.1,87,7.1,7,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,1",
        "encoding":"CSV"
     },
     "endpointOutput":{
        "observedContentType":"text/csv; charset=utf-8",
        "mode":"OUTPUT",
        "data":"0.01076381653547287",
        "encoding":"CSV"}
     },
    "eventMetadata":{
        "eventId":"6ece5c74-7497-43f1-a263-4833557ffd63",
        "inferenceTime":"2019-11-22T08:24:40Z"},
        "eventVersion":"0"}

除了简单的输入和输出之外,可以通过 S3 上的三个对象 constraints.json,statistics.json 和 constraints_violations.json 查看更抽象的数据分析结果:

{
  "version" : 0.0,
  "features" : [ {
    "name" : "Churn",
    "inferred_type" : "Integral",
    "completeness" : 1.0
  }, {
    "name" : "Account Length",
    "inferred_type" : "Integral",
    "completeness" : 1.0
  }
  ]
}

比如 constraints.json 会显示推导的数据的类型等信息。而下列的 statistics.json 展示了数据的分布情况和统计信息。

{
    "name" : "Day Mins",
    "inferred_type" : "Fractional",
    "numerical_statistics" : {
      "common" : {
        "num_present" : 2333,
        "num_missing" : 0
      },
      "mean" : 180.22648949849963,
      "sum" : 420468.3999999996,
      "std_dev" : 53.987178959901556,
      "min" : 0.0,
      "max" : 350.8,
      "distribution" : {
        "kll" : {
          "buckets" : [ {
            "lower_bound" : 0.0,
            "upper_bound" : 35.08,
            "count" : 14.0
          }, {
            "lower_bound" : 35.08,
            "upper_bound" : 70.16,
            "count" : 48.0
          } ],
          "sketch" : {
            "parameters" : {
              "c" : 0.64,
              "k" : 2048.0
            },
            "data" : [ [ 178.1, 160.3 ...] ]
          }
        }
      }
    }
}

模型训练过程的调试工具

刚刚提到的模型服务的监控工具,而模型训练的过程调试也一直是个很困难的事情,传统软件开发有 gdb 等 debug 工具,而模型训练目前还没有完善的解决方案。SageMaker 提出了自己的尝试。Debugger 是一个 Python SDK。它提供了如下功能:

  • 提供 TensorFlow 的 Session Hook,用来收集数据和训练指标,如权重,梯度等
  • 提供 SaveConfig 配置,可以让用户指定收集频率
  • (可选)Optimizer 的封装,用来支持收集梯度
  • (可选)ReductionConfig,用来收集部分 Tensor 而不是全部

基本来说,就是围绕 SessionHook 的增强与优化,其示例如下。

reduc = smd.ReductionConfig(reductions=['mean'], abs_reductions=['max'], norms=['l1'])

hook = smd.SessionHook(out_dir=args.debug_path,
                       include_collections=['weights', 'gradients', 'losses'],
                       save_config=smd.SaveConfig(save_interval=args.debug_frequency),
                       reduction_config=reduc)

with tf.name_scope('initialize'):
    # 2-dimensional input sample
    x = tf.placeholder(shape=(None, 2), dtype=tf.float32)
    # Initial weights: [10, 10]
    w = tf.Variable(initial_value=[[10.], [10.]], name='weight1')
    # True weights, i.e. the ones we're trying to learn
    w0 = [[1], [1.]]
with tf.name_scope('multiply'):
    # Compute true label
    y = tf.matmul(x, w0)
    # Compute "predicted" label
    y_hat = tf.matmul(x, w)
with tf.name_scope('loss'):
    # Compute loss
    loss = tf.reduce_mean((y_hat - y) ** 2, name="loss")
    hook.add_to_collection('losses', loss)

optimizer = tf.train.AdamOptimizer(args.lr)
optimizer = hook.wrap_optimizer(optimizer)
optimizer_op = optimizer.minimize(loss)

hook.set_mode(smd.modes.TRAIN)

with tf.train.MonitoredSession(hooks=[hook]) as sess:
    for i in range(args.steps):
        x_ = np.random.random((10, 2)) * args.scale
        _loss, opt = sess.run([loss, optimizer_op], {x: x_})
        print (f'Step={i}, Loss={_loss}')

不过值得一提的是,在 SageMaker Debugger 的开源代码中,它给出了和博文不同的 API 使用方式:

import sagemaker as sm
from sagemaker.debugger import rule_configs, Rule, CollectionConfig

# Choose a built-in rule to monitor your training job
rule = Rule.sagemaker(
    rule_configs.exploding_tensor(),
    # configure your rule if applicable
    rule_parameters={"tensor_regex": ".*"},
    # specify collections to save for processing your rule
    collections_to_save=[
        CollectionConfig(name="weights"),
        CollectionConfig(name="losses"),
    ],
)

# Pass the rule to the estimator
sagemaker_simple_estimator = sm.tensorflow.TensorFlow(
    entry_point="script.py",
    role=sm.get_execution_role(),
    framework_version="1.15",
    py_version="py3",
    # argument for smdebug below
    rules=[rule],
)

sagemaker_simple_estimator.fit()
tensors_path = sagemaker_simple_estimator.latest_job_debugger_artifacts_path()

import smdebug as smd
trial = smd.trials.create_trial(out_dir=tensors_path)
print(f"Saved these tensors: {trial.tensor_names()}")
print(f"Loss values during evaluation were {trial.tensor('CrossEntropyLoss:0').values(mode=smd.modes.EVAL)}")

核心思路差不多,但是 API 变动挺大的,不知道是哪个版本更老一些。整体而言,是一个蛮有趣的功能,这种方式,通过对用户代码的侵入性修改,可以把训练过程的一些训练指标推送到远端。与其他如 MLFlow 等收集方式的实现相比,它直接提供了 SessionHook,稍微友好一些。

ML 场景下的 IDE

刚刚我们提到的 Experiment 特性,使得 SageMaker 有了自己独立的逻辑概念。而算法科学家喜爱的 Jupyter Notebook, 是不能原生地展示这些概念的,它并不能“理解”一个 Experiment 会与多个 Trials 相互关联。尽管 SageMaker 在文章里把 Motivation,但从我个人的角度,我认为最大的动机就是支持 Experiment 和 Trials 等新的特性。

所以,带着这样的思想,来看下它的 UI:

SageMaker Studio
SageMaker Studio

可以看到,基本都是围绕 Trial 和 Experiment 展开的。SageMaker 把 Jupyter 作为了本地模型开发的事实标准,对它进行了增强和修改,代表了业界的一种探索方向。不过我们也可以看到,有越来越多除了 Jupyter 之外的其他选择,所以这样的思路是否奏效,需要时间的检验了。

参考文档

License

  • This article is licensed under CC BY-NC-SA 3.0.
  • Please contact me for commercial use.

评论