逑识

吾生也有涯,而知也无涯,以无涯奉有涯,其易欤?

0%

TensorFlow 2.x 分布式训练概览

导语」 在进行极复杂模型训练以及使用大规模数据进行训练时,往往会受限于单个节点的内存和算力,无法高效快速地完成模型的优化,而分布式训练正是解决上述问题的一剂良药,它可以充分利用集群中各节点的内存和算力资源,加速模型的训练和调优过程。

分布式训练模式

根据模型在单个节点训练过程中遇到的不同问题,其分布式训练可以分为模型并行 (Model Parallelism) 和数据并行 (Data Parallelism) 两种模式。

模型并行

模型并行是指将模型切分为多个部分并将各个部分放置到不同的节点进行训练的分布式模式。因为当模型很复杂,参数很多时,由于内存的限制,单个节点无法将整个模型加载并进行训练,所以需要将模型切割为更小的部分,并将各个部分运行在不同的节点上以完成训练。

模型并行的优点是可以减少各个节点间的通信代价,因为每个节点只会负责自己的变量更新而无需与其它节点进行数据交换。

其缺点也很明显,由于被切分的模型的各个部分之间有相互依赖的关系,在前向传播和反向传播的过程中,每个节点都需要等待其依赖节点计算完成才能开始计算,并没有实现完全地并行化训练,因此整体的计算效率不高。

所以除非模型真的特别大或者模型存在可以并行执行的部分,一般情况下都不推荐使用模型并行的分布式训练模式。在 TensorFlow 中模型并行也被称为图内复制 (In-graph Replication)。

数据并行

数据并行是指将输入数据进行切分并使用不同节点的模型副本进行训练的分布式模式。因为当数据量很大时,单个节点需要很长时间才能完成 1 轮训练,这对于动辄十几轮或几十轮的训练来说是不可忍受的,所以可以将数据进行划分并分配给多个节点,每个节点处理自己的一小部分数据,从而加快整体的训练速度。在 TensorFlow 中数据并行也被称为图间复制 (Between-graph Replication)。

在数据并行模式下,因为每个节点都持有一份模型的副本,所以在训练过程中需要各节点相互通信来更新整体的变量,以使所有节点的变量值保持最新的状态。根据更新方式的不同,数据并行又可以分为同步训练 (Synchronous Training) 和异步训练 (Asynchronous Training) 两种方式。

  1. 同步训练是指集群中所有节点都完成 1batch 的训练后,先聚合所有节点的计算得到的模型变量的梯度值,然后累加求和或求均值,接着用这个值作为最终梯度去更新每个节点的模型变量。同步训练要求每个节点的算力以及网络的带宽要均衡,不然就会出现某个节点拖慢整体训练速度的情况,所以相对而言,同步训练的训练速度要稍慢。
  2. 异步训练是指每个节点在完成 1batch 的训练后,无需等待其它节点,直接去更新模型变量的值。异步训练的训练速度要更快一些,但是它有一个很严重的问题就是梯度失效 (Stale Gradients) ,因为后完成训练的节点会发现模型变量的值已经被其它节点更新过了,此时该节点计算的梯度就过期失效了。由于梯度失效问题,可能会使得模型最终陷入次优解 (Sub-Optimal Training)。

TensorFlow 目前仅支持数据并行模式,在未来可能会支持模型并行模式。

分布式训练架构

这里介绍的分布式训练架构,主要是指数据并行模式下的分布式架构。目前较为常用的分布式训练架构主要有两种,分别为 Parameter Server 架构和 AllReduce 架构。

PS 架构

Parameter Server(PS) 架构是指带有参数服务器 (ps) 的分布式集群架构。在该架构中,集群中的部分节点会作为参数服务器,用于存储和更新模型的变量值,其他节点则用于计算模型变量的梯度,并与参数服务器进行交互。

Parameter Server 架构既可以是同步训练模式,也可以是异步训练模式。

在同步训练模式中, ps 节点会等待所有 worker 节点的一个 batch 训练结束,然后聚合这些 worker 节点的模型变量梯度并更新 ps 中的模型变量值,最后再把更新的值广播给所有的 worker 节点以更新它们的模型变量。

在异步训练模式中,所有 worker 节点都会单独去更新 ps 节点中的变量,然后获取最新变量值继续训练。

不过无论是同步还是异步训练模式,该架构中 ps 节点的带宽会成为模型训练的瓶颈,因为当 worker 节点数增加时, ps 节点的带宽压力将会成倍增长。虽然可以通过调整 ps 节点的数量来缓解带宽压力,但又会引入 ps 节点间的通信代价。

AllReduce 架构

AllReduce 架构是指不带有参数服务器的分布式集群架构。在该架构中,集群中的所有节点都作为 worker 来执行计算操作,该架构会在每个 batch 训练完成后使用 AllReduce 算法在所有 worker 节点间进行模型变量的同步更新。

目前应用于深度学习的 AllReduce 算法有多种,如 Ring AllReduce 以及 NCCL 等,它们分别适用于不同的使用场景,可根据实际训练情况进行选择。

使用 AllReduce 架构进行分布式训练,可以充分利用每个 worker 节点的带宽资源,但是当模型的参数量比较大时,节点的带宽可能会成为模型训练的瓶颈,常用的千兆网卡可能已经无法突破这一束缚,此时可以尝试使用万兆网卡来缓解这一问题。

TensorFlowParameter Server 架构目前仅支持异步训练模式, AllReuce 架构则采用同步训练模式。

TF 分布式训练策略

TensorFlowtf.distribute 模块下包含有一系列分布式训练策略,它们都是基于数据并行模式实现的。有些策略目前还在 experimental 模块下,表示它们是实验性质的策略,未来可能会发生变动,不过基本上其功能都已经比较成熟且完备了,可以尝试使用。考虑到 TPUStrategy 以及 OneDeviceStrategy 的特殊使用场景,这里不做详细介绍。

Mirrored

MirroredStrategy 是一种单机的同步的分布式训练策略。它支持在一台机器的多个 GPU 之间进行分布式训练,它会在每个 GPU 上创建一个模型副本,模型中的每个变量 (Variables) 都会进行镜像复制并放置到相应的 GPU 上,这些变量被称作镜像变量 (MirroredVariable)。

MirroredStrategy 策略通过 AllReduce 算法使得所有镜像变量在每个 GPU 之间保持同步更新, AllReduce 算法默认使用英伟达的 NcclAllReduce ,也可以通过 cross_device_ops 参数修改为其他的 AllReduce 算法,如 HierarchicalCopyAllReduce

MirroredStrategy 策略会自动使用所有能被 TensorFlow 发现的 GPU 来做分布式训练,如果只想使用部分的 GPU 则可以通过 devices 参数来指定。 MirroredStrategy 实例的创建代码如下所示:

mirrored_strategy = tf.distribute.MirroredStrategy(
devices=["/gpu:0", "/gpu:1"],
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce(),
)

如果 TensorFlow 没有发现 GPU 则默认会退化为使用 CPU 来进行训练。 MirroredStrategy 的典型使用场景为单机多 GPU

CentralStorage

CentralStorageStrategy 也是一种单机的同步的分布式训练策略。但与 MirroredStrategy 策略不同的是,它会将模型的所有变量保存在 CPU 内存上,而不是通过镜像复制的方式保存在每个 GPU 上,所有的计算操作则会在每个 GPU 上以同样的方式执行。

如果机器只有一个 GPU , 那么所有的变量和计算操作都会放在该 GPU 上。在对 CPU 上的变量进行更新前,该策略会先将所有 GPU 副本的上的变量梯度进行聚合,然后应用到 CPU 变量更新中。 CentralStorageStrategy 实例的创建代码如下所示:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

CentralStorageStrategy 策略在 CPUGPU 通信代价远低于 GPUGPU 之间的通信代价时,较为适用。

MultiWorkerMirrored

MultiWorkerMirroredStrategy 策略与 MirroredStrategy 策略很相似,可以理解为是 MirroredStrategy 策略的多机的同步的分布式训练版本,它也会在每一台机器上创建所有变量的副本。

MultiWorkerMirroredStrategy 策略中运行的每一个节点称为一个 worker ,该 worker 节点上可以包含零或多个 GPU 。多个 worker 节点之间使用 AllReduce 算法来保持模型变量的同步更新, TensorFlow 里将这一操作称为 CollectiveOpsCollectiveOps 会在 TensorFlow 模型运行时自动根据硬件,网络拓扑以及张量的大小来自动选择合适的 AllReduce 算法来进行网络通信以完成变量更新。

MultiWorkerMirroredStrategy 策略还实现了一些额外性能优化,比如将多个小张量上的 AllReductions 转为在一个大张量上的 AllReductions ,以减少 worker 节点间的通信时间。

MultiWorkerMirroredStrategy 策略目前有两种可供选择的 CollectiveOps 。 一种为 CollectiveCommunication.RING ,它使用 gRPC 作为通信层实现了基于AllReduce 操作。 另一种为 CollectiveCommunication.NCCL, 它使用了英伟达的 NCCL 库来实现 AllReduce 操作。在实际使用中,可以基于自己的运行环境选择合适的 CollectiveOps,或者使用 CollectiveCommunication.AUTO 交由 TensorFlow 运行时自行选择。 MultiWorkerMirroredStrategy 实例的创建代码如下所示:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
tf.distribute.experimental.CollectiveCommunication.RING)

MultiWorkerMirroredStrategy 策略因为要涉及到多个 worker 节点之间的通信交互,因此每个 worker 节点需要提前获知集群中各节点配置信息以便在变量更新时使用。

TensorFlow 中定义集群配置信息的标准方式是使用 TF_CONFIG 环境变量来实现的,该环境变量定义了集群中所有节点的配置信息,包括所有 worker 节点的网络地址,当前 worker 节点的索引 (index) 以及当前 worker 节点的角色 (type)。

如果所有 worker 节点都不包含 GPU ,则该策略会退化为使用 CPU 在多个 worker 节点间进行分布式训练。如果集群中的 worker 节点数量只有一个则该策略会退化为 MirroredStrategy 策略。

ParameterServer

ParameterServerStrategy 是一种多机的异步的分布式训练策略。所以它也需要提前指定 TF_CONFIG 环境变量信息,与 MultiWorkerMirroredStrategy 策略不同的是集群中的节点不全是 worker ,有一部分节点会被指定为 ps 用来存储变量信息。模型的每一个变量都会存储在一个 ps 节点上,所有的计算操作会在所有的 worker 节点上以同样的方式执行。 ParameterServerStrategy 实例的创建代码如下所示:

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

TF 分布式集群

集群定义

由多个安装有 TensorFlow 的节点即可组成一个 TensorFlow 分布式集群。集群节点的信息是基于 TF_CONFIG 环境变量进行配置的,这里的节点是以 ip:port 的形式定义的,因此它不独指一台物理节点,也就是说多个集群节点可以同时存在于同一物理节点,所以 TensorFlow 的分布式训练其实是弱化了集群的概念的。

不过在真实训练环境中,由于单机的算力毕竟有限,我们一般会使用多机组合的方式来进行分布式训练,从而形成了一般意义上所谓的分布式集群。在 TensorFlow 集群上执行分布式训练任务与在 Hadoop 集群上执行 MapReduce 任务比较相似。

一个典型的 TF_CONFIG 环境变量的值如下所示:

{
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
"ps": ["host4:port"],
"evaluator": ["host5:port"]
},
"task": {
"type": "worker",
"index": 0
}
}

它是一个 json 字符串。 cluster 中的信息表示集群中有 1chief 节点,2worker 节点, 1ps 节点以及 1evaluator 节点, task 中的信息表示当前节点是第 1worker 节点。

chief 节点的作用和 worker 节点大致相同,不过它还会做一些额外的工作,比如保存检查点文件 (checkpoints) 以及为 Tensorboard 记录日志文件等,如果不指定 cheif 节点,则默认会以 worker 列表中的第一个节点作为 chief 节点; worker 节点用来执行训练操作; ps 节点用来存储变量,只有在使用 ParameterServerStrategy 训练策略时才需要指定; evaluator 用来执行交叉验证操作,一般也是在使用 ParameterServerStrategy 策略时才会指定。

注意所有节点的 TF_CONFIG 环境变量中的 cluster 信息都是相同的,不同的地方在于 task 部分,而且所有角色 (task type) 的 index 必须从 0 开始,因为 TensorFlow 会根据该 indexcluster 下相应角色的列表中读取节点信息。

TF_CONFIG 环境变量可以写入到系统的环境变量中,但前提是该物理节点上只会同时启动一个集群节点实例,但实际情况中我们可能会在同一物理节点使用不同的端口启动多个节点实例,固定 TF_CONFIG 的值会就会失去灵活性。因此在大多数情况下,我们会在 python 程序中通过 os.environ["TF_CONFIG"] 来指定集群的信息以实现按需创建,TensorFlow 运行时会自动解析其中的信息并启动训练任务。

上面介绍的 4 中分布式训练策略中可以使用集群进行多机训练的只有 MultiWorkerMirroredStrategyParameterServerStrategy 策略。

TF 集群分布式训练

TensorFlow 集群分布式训练的基本流程如下:

  1. 不同节点使用不同的 TF_CONFIG 环境变量启动该节点的训练任务。TensorFlow 会自动解析 TF_CONFIG 环境变量的值,然后使用 ip:port 启动当前节点的 gRPC 服务,并监听其他节点的 gRPC 服务。
  2. 当所有节点的 gRPC 服务都准备就绪后,各 worker 节点开始使用自己的数据进行训练,全局训练开始。
  3. 每一个 batch 的训练结束之后, TensorFlow 会根据不同的分布式策略去更新所有 worker 节点的变量,更新完成后进行下一个 batch 的训练。
  4. 执行完所有的训练和评估操作后,训练结束,所有节点的 gRPC 服务随之关闭。

从上面的执行流程我们发现,集群分布式训练的难点在于每个节点的 TF_CONFIG 环境变量的构建,因为我们不能在每次训练时都去手动指定 ip 和端口(还需确定该端口是否被占用),一两个节点还可以忍受,可如果同时运行多个训练任务,并且每个任务都会使用几十个集群节点,那么手动构造这个环境变量的工作量是巨大的。

因此我们需要找到一种自动构建 TF_CONFIG 环境变量的方法,一些分布式训练框架可以为我们排忧解难。比如 360 推出的 XLearning 平台,使用 Yarn 来管理和分配集群资源,每次启动训练任务时,它会自动分配指定数量的资源,并定义好构建 TF_CONFIG 环境变量所需的信息,我们只需解析这些信息即可开始训练,这样可以大大减少训练前期的工作量,从而可以专注于具体的训练流程。

XLearning 只是众多分布式训练框架的一种,还有一些像 GoogleKubeflow 平台等也可实现类似的功能。而且它们的作用并不仅限于 TF_CONFIG 环境变量的构建,还可以进行 CPU 核心分配、内存分配以及数据划分等操作。利用这些框架可以让我们在使用 TensorFlow 进行分布式训练时更加得心应手。

TF 分布式训练实现

TensorFlow 支持使用高阶 API 的方式实现分布式训练,也支持使用自定义训练流程的方式来实现分布式训练。

Keras API

KerasTensorFlow 构建模型的高阶 API ,它已经与 TensorFlow 中的分布式训练策略无缝集成,也是官方推荐的实现分布式训练的首选方式。

上面介绍的 4 种分布式策略中,支持使用 Keras API 进行分布式训练的策略有 MirroredStrategyCentralStorageStrategy 以及 MultiWorkerMirroredStrategy , 而 ParameterServerStrategy 策略预计会在后续版本中支持。所以目前在使用 Keras 进行多机的分布式训练时,只能使用 MultiWorkerMirroredStrategy 分布式策略来完成。

使用 Keras 构建的本地模型训练的代码可以很容易的转为对应的分布式训练代码。只需要将模型的构建和编译 (compile) 操作放置于分布式策略实例的 scope 中即可,它可以支持所有类型的 Keras 模型,包括 SequentialFunctional 以及 Subclassed 。使用 Keras 进行分布式训练示例代码段如下所示:

import tensorflow as tf
import numpy as np

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1, ))])
model.compile(loss='mse', optimizer='sgd')

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)

其中 strategy 可以是支持 Keras 分布式训练的任一种策略实例。 strategy.scope() 表明了将要分布式运行的代码块,注意 fit 方法不能包含在内。 fit 操作可以与本地训练保持一致,无需进行其它的修改。

使用 Keras 进行分布式训练时,其训练和评估的分布式策略是一致的,也就是说模型的评估操作也是多 worker 节点共同参与执行的。对于 Keras 模型的输入数据, TensorFlow 会自动根据数据的来源(文本文件或数值)使用不同的策略来自动进行数据切分,使得每个 worker 节点训练整体数据的一部分,当然也可以通过 tf.data.Options 来控制输入数据不被切分。

Estimator API

EstimatorTensorFlow 构建模型的另一种高阶 API ,它原生支持异步的 Parameter Server 分布式训练策略。与 Keras 一样,它也与 TensorFlow 中的分布式训练策略进行了集成,不过只得到了有限的支持。使用 Estimator 构建的代码可以同时进行本地和分布式训练,无需进行额外的修改。在进行分布式训练时,它默认使用的分布式策略为 ParameterServerStrategy

Keras 中使用 strategy.scope 进行分布式训练不同, Estimator 通过将 strategy 实例传递给 RunConfig 来使用不同的分布式训练策略。 Estimator 模块中包含有一些预定义的模型,可以直接使用,比如线性回归模型等,如果要自定义模型,需要先定义好模型的构造函数 model_fn

这里以预定义的 Estimator 模型 LinearRegressor 为例,介绍它使用 MirroredStrategy 策略进行分布式训练的实现方式,示例代码如下所示:

import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
train_distribute=strategy,
eval_distribute=strategy,
)
regressor = tf.estimator.LinearRegressor(
feature_columns=[tf.feature_column.numeric_column('feats')],
optimizer='SGD',
config=config,
)

def input_fn():
dataset = tf.data.Dataset.from_tensors(({"feats": [1.]}, [1.]))
return dataset.repeat(1000).batch(10)
tf.estimator.train_and_evaluate(
regressor,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn),
)

注意与 Keras 不同, Estimator 可以通过 RunConfig 方法的 train_distributeeval_distribute 参数分别定义训练和评估的分布式策略,这种方式更加灵活。

而且在使用 Estimator 进行分布式训练时,模型的输入数据不会被自动切分,完全自主可控,我们可以自定义输入数据的处理方式,比如可以使用 dataset.shard 方法进行切分,或者对于从文件读取的数据,可以把提前把文件进行切分并分配给不同的 worker 节点。

需要注意的是,如果所有的 worker 节点都训练完全相同的数据,推荐使用不同的种子 (seed) 对输入数据的 dataset 进行 shuffle ,避免所有节点的梯度下降都向着同一方向,从而导致收敛的时间变长以及最终的训练结果变差。

目前 TensorFlow 2.x 的分布式训练策略对 Estimator 的支持都比较有限,基本的训练和评估可以正常执行,但一些高级的功能比如 scaffold 等都无法支持,而且还可能包含一些潜在 bug 。短期来看,官方团队不打算花费太多精力来更好地完善基于 Estimator 的分布式训练,他们会更专注于为 Keras 以及自定义的分布式训练方式提供技术支持,因此这里不推荐使用 Estimator 来实现分布式训练。

自定义分布式训练

在大多数情况下,使用高阶 API (KerasEstimator) 即可轻松地实现模型的分布式训练,而且仅需要改动少量的本地代码。但是也有一些情况下,你可能会希望对训练流程进行精细地掌控以实现更大的灵活性,此时就需要利用 tf.distribute.Strategy 自定义分布式训练流程了。比如在进行强化学习的模型训练时, TensorFlow 的高阶 API 可能会不太适用。

可以使用 tf.distribute.Strategy 类下的一系列核心方法来支持自定义的分布式训练流程。首先与 Keras 的分布式训练类似,在 strategy.scope() 下创建模型以及优化器 (optimizers);然后构建输入的数据,并使用 tf.distribute.Strategy.experimental_distribute_dataset 方法来分配数据;接着利用 tf.GradientTape() 定义训练的流程;最后遍历输入数据并开始训练。示例代码如下:

import tensorflow as tf

strategy = tf.distribute.MirroredStrategy()
global_batch_size = 128
with strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1, ))])
optimizer = tf.keras.optimizers.SGD()

dataset = tf.data.Dataset.from_tensors(
([1.], [1.])).repeat(1000).batch(global_batch_size)
dist_dataset = strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(dist_inputs):
def step_fn(inputs):
features, labels = inputs

with tf.GradientTape() as tape:
logits = model(features, training=True)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
logits=logits,
labels=labels,
)
loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)

grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
return cross_entropy

per_example_losses = strategy.run(step_fn, args=(dist_inputs, ))
mean_loss = strategy.reduce(
tf.distribute.ReduceOp.MEAN,
per_example_losses,
axis=0,
)
return mean_loss

with strategy.scope():
for inputs in dist_dataset:
print(train_step(inputs))

注意代码中使用了 strategy.run 来执行一个 batch 的训练,使用 strategy.reduce 来聚合各个节点训练的结果。当将 apply_gradients 方法放置于 strategy.scope() 下时,它的更新行为与本地训练相比发生了一些变化,它会先聚合所有 worker 节点的梯度并且求和,然后再使用该值去更新各节点的模型变量。

参考资料

  1. TensorFlow 分布式训练官方文档
  2. TensorFlow 分布式训练原理与实践
  3. 分布式 TensorFlow 入门教程
  4. AllReduce 算法的前世今生
  5. XLearning 项目地址

欢迎关注我的其它发布渠道