模型训练的工作流程中,不同的环节所需要的资源类型和数量是不同的:数据处理使用CPU、模型训练需要GPU。调度系统需要给不同环节自动分配不同资源,分布式环境下的多个环节之间需要进行高效的数据流转。(流程编排) 基于大规模数据或大规模模型训练时,单台机器无法独立完成存储和计算任务,数据和模型需要被划分到多个节点上,节点之间要能够高效的传递信息,并保证算法的收敛性。(分布式机器学习理论) 同时调度多个分布式训练任务时,每个训练任务都由多个pod组成,k8s默认调度器是以pod为粒度调度,而不是以job为粒度调度,以pod为粒度的调度方式会在调度多任务时产生资源竞争,造成死锁。(gang-scheduling) 多团队共用同一个训练集群时,需要提供资源隔离机制;一个团队内多个成员共用,需要提供多优先级队列和抢占机制,并保证同一优先级下调度的公平性。(调度策略) 在公有云环境,当所有包年包月机器都被占用时,为训练任务自动开通按量付费实例;在训练过程中,为训练任务动态开通竞价实例(抢占实例)或使用潮汐资源,实现最大性价比。(elasticity) 在NUMA(非一致性内存访问)环境中,感知NUMA节点的拓扑结构,在调度时考虑亲和性。(topology awareness)
Pipeline Orchestrator(流程编排引擎):模型训练流程中的各个环节(component)可以被定义为一个DAG,DAG主要描述了每个component的输入输出和拓扑关系。Orchestrator会根据DAG定义的顺序依次执行各个component。在这个方向的开源工具有:Kubeflow Pipeline、Airflow、MLflow等。 Distributed Training Framework(分布式训练框架):Pipeline Orchestrator不会提供分布式训练功能,分布式训练需要在训练框架层面实现并行算法、提供通信机制。Tensorflow, PyTorch, Horovod框架基于不同并行化方法提供了各自的分布式训练API。 Training Job Operator:每个训练框架都有各自的operator,operator的主要作用是将一个包含多个worker的分布式训练作业拆分为不同的built-in workload,向pod中注入用来达成共识的参数,交给scheduler调度。Kubeflow社区提供了TFJob、PytorchJob、MPIJob等多种框架的operator。 Batch Job Scheduler:scheduler监听集群内新的pod,为pod找到合适的node调度上去。深度学习训练场景的调度器应该支持批量调度、公平调度等高级调度策略。但kubernetes一开始主要管理在线服务,原生的scheduler无法满足批处理任务的调度需求。目前在模型训练场景业界常用的开源调度器是Volcano。 Topology Manager:Topology Manager在v1.16作为alpha功能引入kubelet的组件,v1.18升级为beta版本。主要作用是在调度的最后阶段,当服务已经绑定node时,Topology Manager会从第三方Device Manager获取到亲和性调度建议,决定将服务绑定在哪个Numa节点上。
etcd:兼顾一致性和可用性的kv数据库,是k8s的数据库。 kube-apiserver:所有服务不会跟etcd直接建立连接,而是通过apiserver读写数据。 kube-controller-manager:负责容器编排。监听api server变更,发现待编排对象期望状态和实际状态的差异,将实际状态调整为期望状态。 kube-scheduler:负责调度。为新创建出来的pod寻找最合适的节点调度上去。
CNI(container networking interface):管理容器网络。 kubelet:接受scheduler调度过来的pod,保证pod中容器健康运行。 CRI(container runtime interface):kubelet通过cri跟容器运行时(比如docker、containerd)交互。 CSI(container storage interface):管理容器持久化存储。 Device Plugin:管理硬件设备,向kubernetes上报设备信息,在调度时给容器绑定为其分配的设备。
3.2.2 控制器模式
在机器人设计和自动化领域,控制器模式是非常常见的设计模式。控制器会不断检测期望状态(spec)和实际状态(status)的一致性,并控制系统达到期望状态。
定义CRD(Custom Resource Definition),下面以TFJob为例,可以看到CRD主要描述了资源的名称、分组及其yaml的字段。
实现自定义controller:自定义controller一般以deployment的形式启动。它会监听CRD类型的资源,运行你实现的control loop逻辑,将资源拆分成pod、service、configmap等细粒度资源,应用到k8s集群。下面是启动一个TFJob的yaml示例。
预选:找到能够被调度的node。node需要满足一些必要条件:剩余资源大于pod所申请的资源;污点;亲和性等策略。 优选:在满足要求的node里找到最适合的node。优选会考虑:选择调度后最空闲的节点;选择调度后资源使用最平均的节点;亲和性等策略。统一打分,选择分数最高的节点。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: output-parameter-
spec:
entrypoint: output-parameter
templates:
# 定义2个component的关系
- name: output-parameter
steps:
- - name: generate-parameter
template: whalesay
- - name: consume-parameter
template: print-message
arguments:
parameters:
- name: message
# 使用第一个step的输出作为当前step的输入
value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"
# 定义第一个component,用于生成字符串
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
# 打印字符串并写入文件
args: ["sleep 1; echo -n hello world > /tmp/hello_world.txt"]
outputs:
parameters:
# 将文件内作为当前步骤的参数输出
- name: hello-param
valueFrom:
default: "Foobar" # Default value to use if retrieving valueFrom fails. If not provided workflow will fail instead
path: /tmp/hello_world.txt
# 定义第二个component,用于打印字符串
- name: print-message
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
# 接受一个参数并在控制台打印参数
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
提供了一个sdk,支持通过python代码的方式定义DAG,而不是让算法工程师手撕yaml。 提供了step cache能力,当一个component第二次执行时,kfp的operator检测到镜像、输入、输出等参数都没有发生变化,不会再重新执行,而是直接使用上一次的结果。这个特性在模型训练场景非常有用,很多时候我们只是改了超参数重新训练,没必要重新执行数据处理的component。 提供了一个可视化页面,可以在训练过程中收集metrics、tensorboard,展示在页面上。
直接给python方法加装饰器:Building Python Function-based Components (https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/) 通过kfp提供的格式定义一个component yaml:Building Components (https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/) 直接使用k8s resource作为component:Manipulate Kubernetes Resources as Part of a Pipeline (https://www.kubeflow.org/docs/components/pipelines/sdk/manipulate-resources/)
之所以需要使用分布式机器学习,主要是为了解决数据量大和模型规模大两方面问题。对于数据太多的情况,需要将数据划分到多个节点上训练;对于模型规模大的情形,则需要将模型分配到不同节点训练。数据和模型的划分方法是我们关注的首要问题。 完成模型和数据划分后,每个工作节点需要根据分配给自己的局部训练数据和子模型进行训练。而局部训练产生的参数需要跟全局共享,才能达到并行训练的效果。因此参数共享时涉及到的通信拓扑结构、步调和频率是我们关注的第二个问题。 最后,无论使用哪种拓扑结构,都需要解决如何聚合来自不同工作节点的模型参数和梯度的问题。
随机采样:采用有放回的方式随机采样,为每个节点分配训练样本,保证局部数据和全局数据的独立同分布。缺点是全局采样代价较高。 置乱切分:先分好再训练,周期性打乱重新分一次。这种方法的缺点是,不能保证数据独立同分布,接近于有放回的随机采样,影响收敛效率。
逐层横向划分:也叫流水并行(Pipeline Parallelism)。优点是接口简单,缺点是处理不了单层参数单机放不下的情况;而且并行度较差。为了消除空闲时间(bubble),通常会并行处理多个microbatches,保证每个节点都在工作。
纵向划分:将单层参数划分到不同节点。这种方法的缺点是各个子模型之间依赖比较复杂,实现难度也更大。
常用的聚合方式主要是基于模型加和和投票的聚合方法,在异步通信的情况下,有一些部分聚合的优化方法。 另一种聚合方法是模型集成,为了避免模型集成带来的参数爆炸问题,在模型训练中也会利用知识蒸馏技术压缩模型。
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
is_chief = (FLAGS.task_index == 0)
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Compute
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# Horovod: initialize Horovod.
# 初始化 Horovod,启动相关线程和MPI线程
hvd.init()
# Horovod: pin GPU to be used to process local rank (one GPU per process)
# 依据 local rank 为不同的进程分配不同的GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
(mnist_images, mnist_labels), _ = \
tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank())
# 切分数据
dataset = tf.data.Dataset.from_tensor_slices(
(tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
tf.cast(mnist_labels, tf.int64))
)
dataset = dataset.repeat().shuffle(10000).batch(128)
mnist_model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, [3, 3], activation='relu'),
......
tf.keras.layers.Dense(10, activation='softmax')
])
# Horovod: adjust learning rate based on number of GPUs.
# 根据Worker的数量增加学习率的大小
scaled_lr = 0.001 * hvd.size()
opt = tf.optimizers.Adam(scaled_lr)
# Horovod: add Horovod DistributedOptimizer.
# 把常规TensorFlow Optimizer通过Horovod包装起来,进而使用 ring-allreduce 来得到平均梯度
opt = hvd.DistributedOptimizer(
opt, backward_passes_per_step=1, average_aggregated_gradients=True)
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=opt, metrics=['accuracy'],
experimental_run_tf_function=False)
callbacks = [
# 广播初始化,将模型的参数从第一个设备传向其他设备,以保证初始化模型参数的一致性
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.MetricAverageCallback(),
hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
# 只有设备0需要保存模型参数作为checkpoint
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
# Horovod: write logs on worker 0.
verbose = 1 if hvd.rank() == 0 else 0
# Train the model.
# Horovod: adjust number of steps based on number of GPUs.
mnist_model.fit(dataset, steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose)
# To run on 4 machines with 4 GPUs each:
$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
generateName: tfjob
namespace: your-user-namespace
spec:
tfReplicaSpecs:
PS:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/your-project/your-image
command:
- python
- -m
- trainer.task
- --batch_size=32
- --training_steps=1000
// TF_CONFIG
{
"cluster":{
"ps": [
"tfjob-ps-0.your-user-namespace.svc:2222",
"tfjob-ps-1.your-user-namespace.svc:2222"
],
"worker":[
"tfjob-worker-0.your-user-namespace.svc:2222",
"tfjob-worker-1.your-user-namespace.svc:2222"
]
},
"task":{
"type":"ps",
"index":0
}
}
死锁:下图展示了一个死锁的示例。集群中有2个job正在调度,已经分别成功调度了3个pod,还剩1个pod未能调度,此时集群已经没有剩余资源,两个job又都不愿意释放资源,导致两个job都无法成功启动。
资源碎片:下图展示了一个资源碎片的示例。集群内3个节点一共有6张卡,每个节点已经被调度上了1卡的pod,集群内剩余3张卡。任务队列里还有一个需要单机2卡的任务正在等待,尽管集群内共剩余3卡,但由于没有一个node能够满足pod申请的资源,导致pod无法被成功调度。
调度的公平性:在下面左图的示例中我们能看到,当2个job包含的pod数存在明显差异时,如果我们对所有用一视同仁的优先级调度,就会出现pod越多,占用资源也越多的问题。
亲和性与反亲和性:对于ps架构来说,我们希望调度可以将不同的ps pod调度到不同的node上,将ps和worker尽量调度到同一个node上。如果没有使用亲和性调度策略,会造成网络通信的压力,进而影响计算资源的利用率。
创建一个PodGroup类型的对象并应用到集群中。这个PodGroup CRD就是volcano调度的最小单位,它代表了一组强关联的pod集合。在PodGroup的yaml中,支持为其设置最小启动的pod数、最小资源申请量、关联使用的优先级队列。 将pod绑定podGroup、并将pod的schedulerName设置为volcano,保证volcano调度器的control loop可以发现并处理这个pod。
周期性的开启session,一个调度周期开始。 将没有被调度的Job发送到会话的待调度队列中。 遍历所有的待调度Job,按照定义的次序依次执行enqueue(入队)、allocate(分配)、preempt(抢占)、reclaim(回收)、backfill(预留)等动作,为每个Job找到一个最合适的节点。将该Job 绑定到这个节点。action中执行的具体算法逻辑取决于注册的plugin中各函数的实现。 关闭本次会话。
Gang Plugin:解决死锁问题。对PodGroup使用“All or nothing”策略。 Binpack Plugin:解决资源碎片问题。根据资源使用率计算节点权重,在优选阶段为pod选择打分最高的节点。 Priority、DRF Plugin:解决Job调度的公平性问题,支持以fair-share的形式共享资源。 Proportion Plugin:为不同团队划分资源使用比例。 Task-topology Plugin:提供亲和性和反亲和性配置策略。