「导语」 在进行极复杂模型训练以及使用大规模数据进行训练时,往往会受限于单个节点的内存和算力,无法高效快速地完成模型的优化,而分布式训练正是解决上述问题的一剂良药,它可以充分利用集群中各节点的内存和算力资源,加速模型的训练和调优过程。
分布式训练模式
根据模型在单个节点训练过程中遇到的不同问题,其分布式训练可以分为模型并行 (Model Parallelism
) 和数据并行 (Data Parallelism
) 两种模式。
模型并行
模型并行是指将模型切分为多个部分并将各个部分放置到不同的节点进行训练的分布式模式
。因为当模型很复杂,参数很多时,由于内存的限制,单个节点无法将整个模型加载并进行训练,所以需要将模型切割为更小的部分,并将各个部分运行在不同的节点上以完成训练。
模型并行的优点是可以减少各个节点间的通信代价,因为每个节点只会负责自己的变量更新而无需与其它节点进行数据交换。
其缺点也很明显,由于被切分的模型的各个部分之间有相互依赖的关系,在前向传播和反向传播的过程中,每个节点都需要等待其依赖节点计算完成才能开始计算,并没有实现完全地并行化训练,因此整体的计算效率不高。
所以除非模型真的特别大或者模型存在可以并行执行的部分,一般情况下都不推荐使用模型并行的分布式训练模式。在 TensorFlow
中模型并行也被称为图内复制 (In-graph Replication
)。
数据并行
数据并行是指将输入数据进行切分并使用不同节点的模型副本进行训练的分布式模式
。因为当数据量很大时,单个节点需要很长时间才能完成 1
轮训练,这对于动辄十几轮或几十轮的训练来说是不可忍受的,所以可以将数据进行划分并分配给多个节点,每个节点处理自己的一小部分数据,从而加快整体的训练速度。在 TensorFlow
中数据并行也被称为图间复制 (Between-graph Replication
)。
在数据并行模式下,因为每个节点都持有一份模型的副本,所以在训练过程中需要各节点相互通信来更新整体的变量,以使所有节点的变量值保持最新的状态。根据更新方式的不同,数据并行又可以分为同步训练 (Synchronous Training
) 和异步训练 (Asynchronous Training
) 两种方式。
- 同步训练是指集群中所有节点都完成
1
个batch
的训练后,先聚合所有节点的计算得到的模型变量的梯度值,然后累加求和或求均值,接着用这个值作为最终梯度去更新每个节点的模型变量。同步训练要求每个节点的算力以及网络的带宽要均衡,不然就会出现某个节点拖慢整体训练速度的情况,所以相对而言,同步训练的训练速度要稍慢。 - 异步训练是指每个节点在完成
1
个batch
的训练后,无需等待其它节点,直接去更新模型变量的值。异步训练的训练速度要更快一些,但是它有一个很严重的问题就是梯度失效 (Stale Gradients
) ,因为后完成训练的节点会发现模型变量的值已经被其它节点更新过了,此时该节点计算的梯度就过期失效了。由于梯度失效问题,可能会使得模型最终陷入次优解 (Sub-Optimal Training
)。
TensorFlow
目前仅支持数据并行模式,在未来可能会支持模型并行模式。
分布式训练架构
这里介绍的分布式训练架构,主要是指数据并行
模式下的分布式架构。目前较为常用的分布式训练架构主要有两种,分别为 Parameter Server
架构和 AllReduce
架构。
PS 架构
Parameter Server(PS)
架构是指带有参数服务器 (ps
) 的分布式集群架构。在该架构中,集群中的部分节点会作为参数服务器,用于存储和更新模型的变量值,其他节点则用于计算模型变量的梯度,并与参数服务器进行交互。
Parameter Server
架构既可以是同步训练模式,也可以是异步训练模式。
在同步训练模式中, ps
节点会等待所有 worker
节点的一个 batch
训练结束,然后聚合这些 worker
节点的模型变量梯度并更新 ps
中的模型变量值,最后再把更新的值广播给所有的 worker
节点以更新它们的模型变量。
在异步训练模式中,所有 worker
节点都会单独去更新 ps
节点中的变量,然后获取最新变量值继续训练。
不过无论是同步还是异步训练模式,该架构中 ps
节点的带宽会成为模型训练的瓶颈,因为当 worker
节点数增加时, ps
节点的带宽压力将会成倍增长。虽然可以通过调整 ps
节点的数量来缓解带宽压力,但又会引入 ps
节点间的通信代价。
AllReduce 架构
AllReduce
架构是指不带有参数服务器的分布式集群架构。在该架构中,集群中的所有节点都作为 worker
来执行计算操作,该架构会在每个 batch
训练完成后使用 AllReduce
算法在所有 worker
节点间进行模型变量的同步更新。
目前应用于深度学习的 AllReduce
算法有多种,如 Ring AllReduce
以及 NCCL
等,它们分别适用于不同的使用场景,可根据实际训练情况进行选择。
使用 AllReduce
架构进行分布式训练,可以充分利用每个 worker
节点的带宽资源,但是当模型的参数量比较大时,节点的带宽可能会成为模型训练的瓶颈,常用的千兆网卡可能已经无法突破这一束缚,此时可以尝试使用万兆网卡来缓解这一问题。
TensorFlow
中 Parameter Server
架构目前仅支持异步训练模式, AllReuce
架构则采用同步训练模式。
TF 分布式训练策略
TensorFlow
的 tf.distribute
模块下包含有一系列分布式训练策略,它们都是基于数据并行
模式实现的。有些策略目前还在 experimental
模块下,表示它们是实验性质的策略,未来可能会发生变动,不过基本上其功能都已经比较成熟且完备了,可以尝试使用。考虑到 TPUStrategy
以及 OneDeviceStrategy
的特殊使用场景,这里不做详细介绍。
Mirrored
MirroredStrategy
是一种单机的同步的分布式训练策略。它支持在一台机器的多个 GPU
之间进行分布式训练,它会在每个 GPU
上创建一个模型副本,模型中的每个变量 (Variables
) 都会进行镜像复制并放置到相应的 GPU
上,这些变量被称作镜像变量 (MirroredVariable
)。
MirroredStrategy
策略通过 AllReduce
算法使得所有镜像变量在每个 GPU
之间保持同步更新, AllReduce
算法默认使用英伟达的 NcclAllReduce
,也可以通过 cross_device_ops
参数修改为其他的 AllReduce
算法,如 HierarchicalCopyAllReduce
。
MirroredStrategy
策略会自动使用所有能被 TensorFlow
发现的 GPU
来做分布式训练,如果只想使用部分的 GPU
则可以通过 devices
参数来指定。 MirroredStrategy
实例的创建代码如下所示:
mirrored_strategy = tf.distribute.MirroredStrategy( |
如果 TensorFlow
没有发现 GPU
则默认会退化为使用 CPU
来进行训练。 MirroredStrategy
的典型使用场景为单机多 GPU
。
CentralStorage
CentralStorageStrategy
也是一种单机的同步的分布式训练策略。但与 MirroredStrategy
策略不同的是,它会将模型的所有变量保存在 CPU
内存上,而不是通过镜像复制的方式保存在每个 GPU
上,所有的计算操作则会在每个 GPU
上以同样的方式执行。
如果机器只有一个 GPU
, 那么所有的变量和计算操作都会放在该 GPU
上。在对 CPU
上的变量进行更新前,该策略会先将所有 GPU
副本的上的变量梯度进行聚合,然后应用到 CPU
变量更新中。 CentralStorageStrategy
实例的创建代码如下所示:
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy() |
CentralStorageStrategy
策略在 CPU
与 GPU
通信代价远低于 GPU
与 GPU
之间的通信代价时,较为适用。
MultiWorkerMirrored
MultiWorkerMirroredStrategy
策略与 MirroredStrategy
策略很相似,可以理解为是 MirroredStrategy
策略的多机的
同步的分布式训练版本,它也会在每一台机器上创建所有变量的副本。
MultiWorkerMirroredStrategy
策略中运行的每一个节点称为一个 worker
,该 worker
节点上可以包含零或多个 GPU
。多个 worker
节点之间使用 AllReduce
算法来保持模型变量的同步更新, TensorFlow
里将这一操作称为 CollectiveOps
。 CollectiveOps
会在 TensorFlow
模型运行时自动根据硬件,网络拓扑以及张量的大小来自动选择合适的 AllReduce
算法来进行网络通信以完成变量更新。
MultiWorkerMirroredStrategy
策略还实现了一些额外性能优化,比如将多个小张量上的 AllReductions
转为在一个大张量上的 AllReductions
,以减少 worker
节点间的通信时间。
MultiWorkerMirroredStrategy
策略目前有两种可供选择的 CollectiveOps
。 一种为 CollectiveCommunication.RING
,它使用 gRPC
作为通信层实现了基于环
的 AllReduce
操作。 另一种为 CollectiveCommunication.NCCL
, 它使用了英伟达的 NCCL
库来实现 AllReduce
操作。在实际使用中,可以基于自己的运行环境选择合适的 CollectiveOps
,或者使用 CollectiveCommunication.AUTO
交由 TensorFlow
运行时自行选择。 MultiWorkerMirroredStrategy
实例的创建代码如下所示:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( |
MultiWorkerMirroredStrategy
策略因为要涉及到多个 worker
节点之间的通信交互,因此每个 worker
节点需要提前获知集群中各节点配置信息以便在变量更新时使用。
TensorFlow
中定义集群配置信息的标准方式是使用 TF_CONFIG
环境变量来实现的,该环境变量定义了集群中所有节点的配置信息,包括所有 worker
节点的网络地址,当前 worker
节点的索引 (index
) 以及当前 worker
节点的角色 (type
)。
如果所有 worker
节点都不包含 GPU
,则该策略会退化为使用 CPU
在多个 worker
节点间进行分布式训练。如果集群中的 worker
节点数量只有一个则该策略会退化为 MirroredStrategy
策略。
ParameterServer
ParameterServerStrategy
是一种多机的异步的分布式训练策略。所以它也需要提前指定 TF_CONFIG
环境变量信息,与 MultiWorkerMirroredStrategy
策略不同的是集群中的节点不全是 worker
,有一部分节点会被指定为 ps
用来存储变量信息。模型的每一个变量都会存储在一个 ps
节点上,所有的计算操作会在所有的 worker
节点上以同样的方式执行。 ParameterServerStrategy
实例的创建代码如下所示:
ps_strategy = tf.distribute.experimental.ParameterServerStrategy() |
TF 分布式集群
集群定义
由多个安装有 TensorFlow
的节点即可组成一个 TensorFlow
分布式集群。集群节点的信息是基于 TF_CONFIG
环境变量进行配置的,这里的节点是以 ip:port
的形式定义的,因此它不独指一台物理节点,也就是说多个集群节点可以同时存在于同一物理节点,所以 TensorFlow
的分布式训练其实是弱化了集群的概念的。
不过在真实训练环境中,由于单机的算力毕竟有限,我们一般会使用多机组合的方式来进行分布式训练,从而形成了一般意义上所谓的分布式集群。在 TensorFlow
集群上执行分布式训练任务与在 Hadoop
集群上执行 MapReduce
任务比较相似。
一个典型的 TF_CONFIG
环境变量的值如下所示:
{ |
它是一个 json
字符串。 cluster
中的信息表示集群中有 1
个 chief
节点,2
个 worker
节点, 1
个 ps
节点以及 1
个 evaluator
节点, task
中的信息表示当前节点是第 1
个 worker
节点。
chief
节点的作用和 worker
节点大致相同,不过它还会做一些额外的工作,比如保存检查点文件 (checkpoints
) 以及为 Tensorboard
记录日志文件等,如果不指定 cheif
节点,则默认会以 worker
列表中的第一个节点作为 chief
节点; worker
节点用来执行训练操作; ps
节点用来存储变量,只有在使用 ParameterServerStrategy
训练策略时才需要指定; evaluator
用来执行交叉验证操作,一般也是在使用 ParameterServerStrategy
策略时才会指定。
注意所有节点的 TF_CONFIG
环境变量中的 cluster
信息都是相同的,不同的地方在于 task
部分,而且所有角色 (task type
) 的 index
必须从 0
开始,因为 TensorFlow
会根据该 index
从 cluster
下相应角色的列表中读取节点信息。
TF_CONFIG
环境变量可以写入到系统的环境变量中,但前提是该物理节点上只会同时启动一个集群节点实例,但实际情况中我们可能会在同一物理节点使用不同的端口启动多个节点实例,固定 TF_CONFIG
的值会就会失去灵活性。因此在大多数情况下,我们会在 python
程序中通过 os.environ["TF_CONFIG"]
来指定集群的信息以实现按需创建,TensorFlow
运行时会自动解析其中的信息并启动训练任务。
上面介绍的 4
中分布式训练策略中可以使用集群进行多机训练的只有 MultiWorkerMirroredStrategy
和 ParameterServerStrategy
策略。
TF 集群分布式训练
TensorFlow
集群分布式训练的基本流程如下:
- 不同节点使用不同的
TF_CONFIG
环境变量启动该节点的训练任务。TensorFlow
会自动解析TF_CONFIG
环境变量的值,然后使用ip:port
启动当前节点的gRPC
服务,并监听其他节点的gRPC
服务。 - 当所有节点的
gRPC
服务都准备就绪后,各worker
节点开始使用自己的数据进行训练,全局训练开始。 - 每一个
batch
的训练结束之后,TensorFlow
会根据不同的分布式策略去更新所有worker
节点的变量,更新完成后进行下一个batch
的训练。 - 执行完所有的训练和评估操作后,训练结束,所有节点的
gRPC
服务随之关闭。
从上面的执行流程我们发现,集群分布式训练的难点在于每个节点的 TF_CONFIG
环境变量的构建,因为我们不能在每次训练时都去手动指定 ip
和端口(还需确定该端口是否被占用),一两个节点还可以忍受,可如果同时运行多个训练任务,并且每个任务都会使用几十个集群节点,那么手动构造这个环境变量的工作量是巨大的。
因此我们需要找到一种自动构建 TF_CONFIG
环境变量的方法,一些分布式训练框架可以为我们排忧解难。比如 360
推出的 XLearning
平台,使用 Yarn
来管理和分配集群资源,每次启动训练任务时,它会自动分配指定数量的资源,并定义好构建 TF_CONFIG
环境变量所需的信息,我们只需解析这些信息即可开始训练,这样可以大大减少训练前期的工作量,从而可以专注于具体的训练流程。
XLearning
只是众多分布式训练框架的一种,还有一些像 Google
的 Kubeflow
平台等也可实现类似的功能。而且它们的作用并不仅限于 TF_CONFIG
环境变量的构建,还可以进行 CPU
核心分配、内存分配以及数据划分等操作。利用这些框架可以让我们在使用 TensorFlow
进行分布式训练时更加得心应手。
TF 分布式训练实现
TensorFlow
支持使用高阶 API
的方式实现分布式训练,也支持使用自定义训练流程的方式来实现分布式训练。
Keras API
Keras
是 TensorFlow
构建模型的高阶 API
,它已经与 TensorFlow
中的分布式训练策略无缝集成,也是官方推荐的实现分布式训练的首选方式。
上面介绍的 4
种分布式策略中,支持使用 Keras API
进行分布式训练的策略有 MirroredStrategy
、 CentralStorageStrategy
以及 MultiWorkerMirroredStrategy
, 而 ParameterServerStrategy
策略预计会在后续版本中支持。所以目前在使用 Keras
进行多机的分布式训练时,只能使用 MultiWorkerMirroredStrategy
分布式策略来完成。
使用 Keras
构建的本地模型训练的代码可以很容易的转为对应的分布式训练代码。只需要将模型的构建和编译 (compile
) 操作放置于分布式策略实例的 scope
中即可,它可以支持所有类型的 Keras
模型,包括 Sequential
, Functional
以及 Subclassed
。使用 Keras
进行分布式训练示例代码段如下所示:
import tensorflow as tf |
其中 strategy
可以是支持 Keras
分布式训练的任一种策略实例。 strategy.scope()
表明了将要分布式运行的代码块,注意 fit
方法不能包含在内。 fit
操作可以与本地训练保持一致,无需进行其它的修改。
使用 Keras
进行分布式训练时,其训练和评估的分布式策略是一致的,也就是说模型的评估操作也是多 worker
节点共同参与执行的。对于 Keras
模型的输入数据, TensorFlow
会自动根据数据的来源(文本文件或数值)使用不同的策略来自动进行数据切分,使得每个 worker
节点训练整体数据的一部分,当然也可以通过 tf.data.Options
来控制输入数据不被切分。
Estimator API
Estimator
是 TensorFlow
构建模型的另一种高阶 API
,它原生支持异步的 Parameter Server
分布式训练策略。与 Keras
一样,它也与 TensorFlow
中的分布式训练策略进行了集成,不过只得到了有限的支持。使用 Estimator
构建的代码可以同时进行本地和分布式训练,无需进行额外的修改。在进行分布式训练时,它默认使用的分布式策略为 ParameterServerStrategy
。
与 Keras
中使用 strategy.scope
进行分布式训练不同, Estimator
通过将 strategy
实例传递给 RunConfig
来使用不同的分布式训练策略。 Estimator
模块中包含有一些预定义的模型,可以直接使用,比如线性回归模型等,如果要自定义模型,需要先定义好模型的构造函数 model_fn
。
这里以预定义的 Estimator
模型 LinearRegressor
为例,介绍它使用 MirroredStrategy
策略进行分布式训练的实现方式,示例代码如下所示:
import tensorflow as tf |
注意与 Keras
不同, Estimator
可以通过 RunConfig
方法的 train_distribute
和 eval_distribute
参数分别定义训练和评估的分布式策略,这种方式更加灵活。
而且在使用 Estimator
进行分布式训练时,模型的输入数据不会被自动切分,完全自主可控,我们可以自定义输入数据的处理方式,比如可以使用 dataset.shard
方法进行切分,或者对于从文件读取的数据,可以把提前把文件进行切分并分配给不同的 worker
节点。
需要注意的是,如果所有的 worker
节点都训练完全相同的数据,推荐使用不同的种子 (seed
) 对输入数据的 dataset
进行 shuffle
,避免所有节点的梯度下降都向着同一方向,从而导致收敛的时间变长以及最终的训练结果变差。
目前 TensorFlow 2.x
的分布式训练策略对 Estimator
的支持都比较有限,基本的训练和评估可以正常执行,但一些高级的功能比如 scaffold
等都无法支持,而且还可能包含一些潜在 bug
。短期来看,官方团队不打算花费太多精力来更好地完善基于 Estimator
的分布式训练,他们会更专注于为 Keras
以及自定义的分布式训练方式提供技术支持,因此这里不推荐使用 Estimator
来实现分布式训练。
自定义分布式训练
在大多数情况下,使用高阶 API
(Keras
和 Estimator
) 即可轻松地实现模型的分布式训练,而且仅需要改动少量的本地代码。但是也有一些情况下,你可能会希望对训练流程进行精细地掌控以实现更大的灵活性,此时就需要利用 tf.distribute.Strategy
自定义分布式训练流程了。比如在进行强化学习的模型训练时, TensorFlow
的高阶 API
可能会不太适用。
可以使用 tf.distribute.Strategy
类下的一系列核心方法来支持自定义的分布式训练流程。首先与 Keras
的分布式训练类似,在 strategy.scope()
下创建模型以及优化器 (optimizers
);然后构建输入的数据,并使用 tf.distribute.Strategy.experimental_distribute_dataset
方法来分配数据;接着利用 tf.GradientTape()
定义训练的流程;最后遍历输入数据并开始训练。示例代码如下:
import tensorflow as tf |
注意代码中使用了 strategy.run
来执行一个 batch
的训练,使用 strategy.reduce
来聚合各个节点训练的结果。当将 apply_gradients
方法放置于 strategy.scope()
下时,它的更新行为与本地训练相比发生了一些变化,它会先聚合所有 worker
节点的梯度并且求和,然后再使用该值去更新各节点的模型变量。
参考资料
- TensorFlow 分布式训练官方文档
- TensorFlow 分布式训练原理与实践
- 分布式 TensorFlow 入门教程
- AllReduce 算法的前世今生
- XLearning 项目地址