「导语」 在上一篇文章 《TensorFlow 2.x 分布式训练概览》中,我对 TensorFlow 所支持的分布式训练策略以及分布式训练的实现方式做了简要的介绍。本文将在上一篇文章的基础上,着重介绍基于 Keras 的多节点分布式训练的实现方式以及在分布式训练过程中需要注意的一些细节。
端到端示例
首先我们来实现一个基于 Keras
的多节点分布式训练的端到端示例,从宏观上对多节点分布式训练有一个大概的了解。这里使用了 MultiWorkerMirroredStrategy
分布式策略,其代码如下所示:
import os |
在分布式训练中,所有 worker
节点所执行的训练代码都是一致的,唯一不同的地方在于 TF_CONFIG
环境变量的值。示例中我们首先定义了 2
个 worker
节点,然后通过传递 index
参数来控制每个 worker
节点的索引值 (task index
),这样就可以使这两个 worker
节点在运行时具有不同的 TF_CONFIG
值,从而能够正确地进行分布式训练。
我在分布式训练概览
一文中介绍过多节点分布式训练的原理:首先,集群中的每个 worker
节点会根据 TF_CONFIG
中的 ip:port
信息去启动自己的 grpc
服务,接着它们会去监听其它 worker
节点的 grpc
服务并尝试建立通信,只有当所有 worker
节点都准备好之后,分布式训练才会正式开始。
基于上述原理,我们需要在两个终端窗口中分别执行如下语句方可启动本地的分布式训练,注意系统中要提前安装好 TensorFlow
,这里使用的 TensorFlow
版本为 2.2.0
(截至本文完稿时为最新稳定版)。
python worker.py --index=0 |
分布式训练完成后,即可进行模型分析以及模型 Serving
等后续操作了。下面我来就使用 Keras
进行多节点分布式训练的一些细节进行展开介绍。
训练前准备工作
在基于 Keras
进行多节点的分布式训练之前,有三项重要的准备工作需要完成,它们分别是模型构建, TF_CONFIG
环境变量定义以及数据准备。
模型构建
分布式训练中模型的构建方式与本地训练基本是一致的。在上面的示例中,我使用了 subclassed
子类化的方式构建了一个包含 3
个全连接层的 Keras
模型。当然也可以使用 Sequential
或者 Functional API
的方式来构建模型, MultiWorkerMirroredStrategy
策略都是可以支持的。模型构建的更多细节可以参考 TensorFlow 2.x 基于 Keras 的模型构建
一文,这里就不再赘述了。
在模型构建完成之后,我们应该先在本地训练该模型以验证其正确性,没有问题后再将其应用于分布式训练中。
TF_CONFIG 定义
因为 MultiWorkerMirroredStrategy
是一种数据并行的分布式训练策略,所以每个 worker
节点都会拥有模型的全部参数,在每个 batch
训练结束后, worker
节点之间都需要进行相互通信以更新模型的变量值,这就需要一种方法来定义各个节点的通信地址使得它们之间可以建立连接。
TF_CONFIG
环境变量就是为解决节点间通信问题而存在的, 它是一个 json
格式的字符串,共由两个部分组成,分别为 cluster
和 task
。 cluster
部分定义了不同角色的列表,列表中的值为各节点的 grpc
地址,task
部分则定义了当前节点的角色 (type
) 和索引 (index
) 信息。在 MultiWorkerMirroredStrategy
策略中仅包含 worker
角色,因此只需定义 worker
角色相关的信息即可。 TF_CONFIG
示例如下所示:
{ |
在代码中使用时,我们一般通过 os.environ["TF_CONFIG"] = json.dumps(tf_config)
来进行赋值操作,其中的 TF_CONFIG
信息可以被 TensorFlow
自动解析。
分布式训练中所有节点 TF_CONFIG
中的 cluster
部分均是一致的,而 task
部分则是不同的,这是因为每个节点的角色和索引是不同的。 TensorFlow
会根据索引去相应角色的列表中查询该节点的 grpc
地址并启动 grpc
服务,因此索引 index
的值应该在 [0, len(role)-1]
范围之间。
在 worker
角色列表中, index
为 0
的节点会被自动作为 chief
节点,它会负责一些除训练之外的其它工作,比如保存 Checkpoint
文件以及记录 Tensorboard
日志等。
在上面的示例中,我仅定义了两个本地的 worker
节点,这是为了便于本地的分布式训练测试。而在实际的分布式训练中,我们一般会使用不同的 ip
和 端口来创建多个节点,并且每个节点都应该设置合适的 TF_CONFIG
值。我们既可以手动设置每个节点的 TF_CONFIG
的值,也可以借助一些分布式训练框架如 XLearning
等来辅助我们完成这项工作。
注意 TF_CONFIG
环境变量在 MultiWorkerMirroredStrategy
实例创建时会自动被 TensorFlow
解析以启动其 grpc
服务,因此一定要在 tf.distribute.Strategy
实例创建之前就设置好 TF_CONFIG
的值,避免分布式训练启动失败。
数据准备
与本地训练一样,分布式训练的数据也可以是 numpy
类型的数据或者是 dataset
类型的数据。一般会在测试时使用 numpy
类型数据,而在真实的训练场景中,考虑到数据处理的效率,我们一般使用 dataset
类型的数据做为模型的输入。
在上面的例子中,我在数据生成函数 prepare_data()
中使用 np.random.seed(0)
方法指定了一个随机数种子,该种子的作用是使得每个 worker
节点在运行时生成的数据都相同,这样可以更好地模拟真实的训练场景。
一般情况下,dataset
类型的数据可以使用 from_tensor_slices()
方法从 numpy
数据中生成,而在实际训练中,我们更多地是从文件中读取,比如从 TFRecord
文件或 TextLine
文件中进行读取生成。 tf.data
模块下有两个函数可以分别用来处理上述两种类型的数据文件以生成 dataset
,它们分别是 tf.data.TFRecordDataset
和 tf.data.TextLineDataset
,其基本使用方法如下:
dataset = tf.data.TFRecordDataset(["file1.tfrecords", "file2.tfrecords"]) |
一般而言,训练用到的数据文件大多是经过 MapReduce
等分布式任务清洗处理之后生成的,文件的数量与 mapper
和 reducer
任务的个数有关(可能会有很多),最后它们都会落盘到 HDFS
等分布式文件系统上。
TensorFlow
的 Dataset API
是支持从 HDFS
路径直接读取数据的,但在使用时我们可能仅知道 HDFS
路径而无法获知该路径下所有数据文件的名称,而且即使知道,我们也不太可能将所有文件名都列出并作为参数传递给 TextLineDataset
函数。
为了解决上述问题,可以使用 Dataset
的 list_files
方法来帮助我们,该方法可以基于正则表达式获取指定路径下匹配指定模式的所有文件名称,并以 dataset
的形式返回,然后就可把该 dataset
作为 TextLineDataset
的输入参数来读取文件中数据了,其具体使用示例如下所示:
filenames = tf.data.Dataset.list_files(FLAGS.input + '/part-*') |
对于使用 TextLineDataset
读取的数据,每一行数据都可能包含有多个数据值表示不同的特征,那么一般还要经过分割处理,才能符合模型训练的输入需要,此时可以借助 dataset
的 map
方法来完成数据的转换。
map
方法可以对 dataset
中的每一行数据进行处理并转化为想要的数据格式。假设数据文件中的每行都包含 33
列,列之间使用 tab
进行分隔,其中前 32
列表示特征,最后 1
列表示标签,那么要将数据转换为可供上面示例使用的输入格式,可使用如下代码来完成操作:
def labeler(record): |
由于上面示例中的模型是单输入的,所以我在进行数据转换时将 32
列不同的特征统一起来作为一个单独的模型输入来进行处理。而对于多输入的模型,我们又需要将多个特征分开,使得数据的每列都有唯一的字段来标识,此时我们依然可以使用上述方法来分割每一个字段并返回一个 dataset
字典,但更为方便地方法是直接使用下面介绍的这种 Load CSV
的方法,相比较而言,后者会更为直接更容易理解。其示例代码如下所示(各项参数说明可查看官方文档):
column_names = [str(i) for i in range(33)] |
另外如果你想利用 MapReduce
任务来生成 TFRecord
文件,那么可以基于 TensorFlow
生态提供的 tensorflow-hadoop
函数库来完成这个任务。同样 TFRecordDataset
也可以使用 list_files()
方法来读取指定目录下的数据文件。
对于训练数据与验证数据的划分而言,如果我们在生成数据文件时就将二者分开,那么在数据加载时我们直接指定不同的输入路径即可,比如训练数据从 train
目录加载,而验证数据则从 validation
目录加载,这也是比较推荐的数据划分方式。
但是有时我们可能没有对训练和验证数据进行分离,则可以使用如下方法作为替代以达到相同的目的。以训练和验证数据划分比例为 80/20
为例,其代码如下所示:
def is_validation(x, y): |
这里借助了 Dataset
的 enumerate
和 filter
方法,其原理是先使用 enumerate
方法对 dataset
数据进行编号,然后使用 filter
方法来按指定规则筛选出训练和验证数据,最后使用 map
方法将数据恢复为原有格式。
但是需要注意,由于在训练时,每个 worker
节点都会遍历 dataset
的全部数据,会对训练的性能产生较大影响,所以在实际使用中并不推荐这种数据划分的方式。
开始训练
准备工作都完成之后,我们就可以开始进行分布式训练了。首先我们需要将模型的构建 (model
) 和编译 (model.compile()
) 流程放置于分布式策略的 strategy.scope()
下,该 scope
决定了模型变量的创建方式以及创建位置。接着我们可以调用模型的 fit
方法并指定训练的数据来开始训练。
每个 worker
节点的训练流程与本地的训练流程并无明显的不同,但由于整体的训练是分布式进行的,在一些细节上还是与本地训练有点差别,为了对分布式训练流程有更好地掌握,我们需要对这些细节有大致的了解。
数据切分策略
在多节点的分布式训练中,将输入数据按 worker
节点的数量进行切分是有必要的,它可以确保模型的收敛性以及训练的高效性,这也符合上一篇文章中提到的数据并行
的理念。
在上面的示例中,我们并没有手动对数据进行切分,而是直接将数据传递给 fit
方法进行训练,这是因为 TensorFlow
会自动将输入数据在多个 woker
节点间进行切分,无需我们手动干预。
TensorFlow
分布式训练中的数据切分的策略主要有两种,分别为 FILE
策略和 DATA
策略,前者是指基于文件的切分,后者则是指基于数据的切分。它们的使用方式如下所示:
options = tf.data.Options() |
在默认情况下,如果不进行显示地设置,那么该策略会被设置为 AutoShardPolicy.AUTO
,此时表示由 TensorFlow
运行时自动选择合适的数据切分策略。
在 AUTO
策略模式下, TensorFlow
会先尝试基于 FILE
策略进行切分,如果没有找到可以切分的文件,那么它会再基于 DATA
策略进行切分。如果 dataset
是使用 TFRecordDataset
或 TextLineDataset
等函数从文件中读取并生成的,那么它会基于 FILE
策略进行切分。如果是其它的生成方式比如 from_tensor_slices()
,那么它会选择使用 DATA
策略进行切分。
在使用 FILE
策略进行数据切分时,每个 worker
节点会均匀地获得全部数据文件中的一部分并进行处理,但如果某个 worker
节点没有被分配到至少一个数据文件,那么训练程序就会报错,整个训练流程也会随之终止。因此当使用文件作为数据的输入源并选择了 FILE
切分策略时(无论是手动指定还是 TensorFlow
自动选择),要确保每个 worker
节点至少能分配到一个数据文件。
在使用 DATA
策略进行数据切分时, TensorFlow
会对 dataset
进行 shard
操作,此时每个 worker
节点都会遍历全部的数据,然后丢弃不属于自己的数据,并找到自己负责的那部分数据进行训练。其中 shard
操作类似于上面介绍的 enumerate + filter
,对于数据集 dataset
, dataset.shard(n, i)
将会返回 dataset
中 index % n = i
的那部分数据,其中 n
表示 worker
节点的数量, i
表示 worker
节点的 index
。
可以看到,相比于 FILE
切分策略, DATA
切分策略中由于每个 worker
节点都做了冗余的数据遍历操作,对整体的训练性能会有很大的影响。因此我们最好提前对输入数据进行文件切分并使用基于 FILE
的数据切分策略进行分布式训练。
对于从文本文件生成的 dataset
类型数据,我们可以指定其数据切分策略为 DATA
(一般在不能确保每个 worker
节点会至少分配到一个文件时使用),但是对于非文本文件生成的 dataset
类型数据,我们却不能指定其数据切分策略为 FILE
, TensorFlow
运行时会由于找不到可切分的文件而报错。
如果你不想使用上述的数据切分策略,也可以通过将策略设置为 AutoShardPolicy.OFF
以禁用数据的自动切分。此时每个 worker
节点都会收到全部的数据,如何进行数据处理就完全取决你自己了。
另外对于每一个 worker
节点,如果该节点上有多于一个设备,比如有多个 GPU
,那么该节点的数据也会在这多个 GPU
之间进行自动切分,而且即使你已经将 worker
节点间的数据切分策略禁用,也不会影响同一节点多个设备间的数据切分。
注意,只可以对 dataset
类型的数据手动指定数据切分策略。 numpy
类型的数据会由 TensorFlow
自动选择切分策略,而且它只能使用 DATA
策略。
因为交叉验证操作也是在多个节点间分布式完成的,所以验证数据也会使用与训练数据相同的策略进行切分,当然也可以使用上面介绍的方式指定不同的切分策略。目前 model.predict()
操作不支持分布式运行,因此如果在分布式训练中调用了该方法, worker
程序会报错。
数据批处理
我们在 Keras 模型的本地训练与评估
一文中介绍过模型的本地训练对于 numpy
类型数据和 dataset
类型数据的处理方式,而在分布式训练中其处理方式稍有不同,这主要体现在与批处理相关的一些设置上。
上面的示例中使用的是 numpy
类型数据作为模型的输入,而在设置 BATCH_SIZE
时,我先设置了每个 worker
节点的批量大小 (BATCH_SIZE_PER_REPLICA
) ,然后乘以 worker
节点的数量得到了全局的批量大小 (BATCH_SIZE
) 。这样设置是因为在分布式训练中每个 worker
节点的有效批量大小为全局批量大小/节点个数
,也就是说每个 worker
节点在分布式训练时的实际批量大小即为 BATCH_SIZE_PER_REPLICA
。先为每个 worker
设置合适的 BATCH_SIZE_PER_REPLICA
,可以避免因全局 BATCH_SIZE
设置不当而使每个 worker
节点计算效率不高进而影响整体分布式训练性能的问题。
同样地,如果使用 dataset
类型的数据作为输入, BATCH_SIZE
也应该如此设置。
在使用 dataset
类型的数据作为模型的训练输入时,由于 MultiWorkerMirroredStrategy
策略暂不支持最后一个不完整 batch
的处理,因此必须要指定 steps_per_epoch
参数来限制每轮训练所执行的步数,如果不指定该参数,训练程序在执行时会报错,这点需要注意。
在指定 steps_per_epoch
参数后,每轮 (epoch
) 训练开始时, TensorFlow
会从上轮训练结束的 dataset
位置继续读取数据并进行训练直至训练流程结束。因此我们最好使用 .repeat()
方法来对训练数据进行复制,以保证有足够的数据来完成 N
轮的训练。
如果训练数据没有被复制,那么 steps_per_epoch
的大小应该根据每个 worker
的实际训练数据量以及 BATCH_SIZE_PER_REPLICA
的大小来进行设置,对于 worker
节点间数据切分不均衡的分布式训练,应该以最小数据量的 worker
节点为基准对 steps_per_epoch
参数进行设置,避免该 worker
因数据耗尽导致整体训练受到影响。
对于 dataset
类型的验证数据来说,也是同样的道理,必须指定 validation_steps
参数,其大小也需根据验证数据量和 BATCH_SIZE
来进行设置。与训练操作不同的是,验证操作在每轮执行时都会使用相同的验证数据,这样可以确保验证结果的公平性和有效性。虽然 dataset
类型的验证数据不必进行 repeat
操作,但官方还是推荐使用 repeat
方法,这样可以避免因 validation_steps
设置不当而引发的数据耗尽问题。
容错处理
在进行同步的分布式训练时,如果没有容错机制,那么当某一个 worker
节点宕掉或者被其它任务抢占资源时,整个分布式训练任务就会失败。幸运地是, TensorFlow
为基于 Keras
的分布式训练提供了容错的能力,通过将节点的训练状态保存在文件中,可以使整个集群在 worker
节点失败重启后恢复到之前的训练状态,从而确保训练可以继续进行。
若要使用 Keras
多节点分布式训练的容错机制,需要借助 ModelCheckpoint
回调函数来完成。该回调函数会将模型的训练状态和检查点文件保存到指定的文件目录下。具体使用方式如下:
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint( |
因为是同步的训练方式,在失败节点重启的过程中,所有参与该分布式训练的节点都会停止训练直到失败节点重启完成。当失败节点重新加入到集群后,其它节点也会重启,然后所有节点都会同时从 checkpoint
文件中恢复之前的训练状态,从而使得节点间重新保持同步,训练可以照常进行。
在生成的 checkpoints
目录下,存在一些临时的 checkpoint
文件,这些文件会在节点恢复时使用,如果训练过程正常完成,那么在训练结束后,这些临时文件会被自动删除。
模型文件保存路径
在多节点的分布式训练中,当你使用 model.save()
或者 tf.saved_model.save()
方法进行模型保存时,你可能希望仅由 chief
节点来完成这项工作,但事实并非如此,所有参与到训练中的 worker
节点都会执行模型保存操作,如果每个 worker
节点的模型存储路径都相同,在保存模型时所有的 worker
节点都会尝试去往该位置写文件,从而产生竞态,会导致一些奇怪的错误产生。
正确的做法是,对于每一个 worker
节点,提供不同的存储路径,比如可以用 worker
的 index
作为每个节点模型存储路径的一部分。其实各个 worker
节点保存的模型文件都是相同的,不过通常我们都会选择使用 chief
节点保存的模型文件来重构模型以及进行模型 Serving
。我们还应该设置一些清理的逻辑,在训练完成后自动清理掉非 chief
节点保存的模型文件。
而对于 Checkpoint
和 Tensorboard
等回调函数的存储路径,我们可以只设置一个全局路径即可,因为 TensorFlow
会自动为我们处理文件路径问题。在实际的分布式训练中, TensorFlow
会在该全局目录下自动为每个 worker
节点创建一个临时的目录用来存储它们各自的信息,在训练完成后也会自动将临时的目录清理,最后只保留 chief
节点的相关文件信息(你可在训练中关注一下相关目录下的文件结构变化)。这么做也是因为如果有多个节点同时向一个目录进行写操作,会导致一些读写错误。
常见问题
在进行多节点分布式训练时,可能会遇到以下问题,这里提供了一些解决的思路:
问题:
RuntimeError: Collective ops must be configured at program startup.
解决方案: 该运行时错误说明在分布式策略实例(这里为
MultiWorkerMirroredStrategy
) 创建和初始化之前,有其它的TensorFlow
节点被创建,比如提前创建了dataset
类型的输入等。解决的办法是将其它TensorFlow
节点的创建操作移动到分布式策略初始化之后即可。问题:
ValueError: When dataset is sharded across workers, please specify a reasonable steps_per_epoch such that all workers will train the same number of steps and each step can get data from dataset without EOF. This is required for allreduce to succeed. We will handle the last partial batch in the future.
解决方案: 该问题我们前面介绍过,对于
dataset
类型的输入数据,需要指定fit
方法中训练数据的steps_per_epoch
参数以及验证数据的validation_steps
参数,否则训练会报错,steps_per_epoch
的值可以根据整体的数据量来进行粗略估计。问题:
WARNING: Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least steps_per_epoch * epochs batches (in this case, 1000 batches). You may need to use the repeat() function when building your dataset.
UnboundLocalError: local variable 'logs' referenced before assignment.
解决方案: 该问题表明训练数据或者验证数据已经耗尽,导致训练无法继续进行了。此时可以对训练数据以及验证数据的
dataset
执行repeat
操作以解决该问题。问题: 为什么多个
worker
节点训练日志中打印的loss
值以及metrics
值都完全一致?解答: 在每个
batch
训练结束后,各worker
节点间会同步更新各自的模型变量值,而日志打印都是在变量更新完成之后,因此每个batch
打印的loss
值以及metrics
值都会相同。问题: 如何调节分布式训练的性能?
解答: 分布式训练的性能与节点间变量更新的速度有直接关系,可以先选择一个合适的通信策略如
Ring AllReduce
或NCCL
。如果节点间的网络带宽成为瓶颈,则可以尝试升级网卡为万兆网卡。另外官方推荐可以将模型中的变量都转为tf.float32
类型,可能会提升计算的效率。
参考资料
- Multi-worker training with Keras
- MapReduce TFRecords OutputFormat
- Build TensorFlow input pipelines
- tf.data.Dataset
- tf.data.TextLineDataset
- Load CSV data
- tf.data.experimental.make_csv_dataset
- tf.data.Options
- tf.data.experimental.DistributeOptions
- tf.data.experimental.AutoShardPolicy
- tf.distribute.Strategy