逑识

吾生也有涯,而知也无涯,以无涯奉有涯,其易欤?

0%

TensorFlow 数据输入格式之 TFRecord

导语」 TFRecord 是 TensorFlow 生态中的一个重要组件,它是一种二进制序列的存储格式,使用该格式可以使输入数据的读取和处理更为高效,从而提升整体训练流程的速度,另外,它还具有极高的灵活性,可以为复杂特征数据的构建与解析提供便利。本文将对 TFRecord 数据文件的生成与读取流程进行详细地介绍,并提供相应的示例代码作为参考。

TFRecord 格式简介

TFRecordTensorFlow 生态中一个重要的组件,其本质上是一种文件格式,用于存储二进制序列的内容。 TFRecord 文件由序列化后的 protobuf 数据构成,可以直接供 TensorFlow 程序读取并用于模型训练。

对于文本格式的数据文件而言,其存储和读取的开销都比较大,相比之下, TFRecord 格式的数据文件占用的磁盘空间会更少,而且可以更高效地进行数据读取,因此将数据使用 TFRecord 格式存储能够在一定程度上提升数据处理的效率。

另外,文本格式的数据更适合处理定长且维度单一的特征数据,对于变长以及多维度的特征数据的处理会比较麻烦,而 TFRecord 格式的数据则没有这一限制,这给数据读取与处理带来了极大的灵活性。

TFRecord ProtoBuf

在生成或者读取 TFRecord 文件之前,我们需要对与 TFRecord 格式相关的 protobuf 协议有一定的了解,它们是构成 TFRecord 文件的关键部分。

Example

Example 指一个数据输入样本,它由一系列的特征组成。其 protobuf 格式如下所示:

message Example {
Features features = 1;
}

上面的 Features 即指特征组合,其 protobuf 格式如下所示:

message Features {
// Map from feature name to feature.
map<string, Feature> feature = 1;
}

这里 Features 使用 map 结构来进行存储,其中 mapkey 表示特征的名称,为 string 类型, value 表示具体的特征值,为 Feature 类型。

另外,可以看到 Example 只是封装了一下 Features 的结构,其实质与 Features 是等同的。

Feature

Feature 指具体的特征值,其 protobuf 格式如下所示:

message Feature {
// Each feature can be exactly one kind.
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
}

Feature 的定义可以看出,它可以接收三种格式的数据,分别为:

  1. BytesList 格式,可以表示 stringbyte 类型的数据。

  2. FloatList 格式,可以表示 floatdouble 类型的数据。

  3. Int64List 格式,可以表示 bool 类型、 enum 类型、 int32 类型、 uint32 类型、 int64 类型以及 uint64 等多种类型的数据。

以上三种格式,基本囊括了所有常见的数据输入类型,在生成和解析 TFRecord 数据时,可以根据具体的数据类型来使用相应的 Feature 结构。

Python 实现

在将上述 protobuf 文件转为语言相关的数据结构后,即可使用这些结构来构建特征数据并进行序列化了。下面以 python 语言为例,来介绍这些数据结构的具体使用方法。(注:在 TensorFlowpython 安装包里已经包含了 Example 以及 Feature 等相关的数据结构,我们可以直接使用而无需再用 protoc 工具生成了。)

首先,我们需要构建 Feature 对象。因为 Feature 结构支持 3 种格式的数据,所以这里使用 3 个函数来分别生成不同类型的 Feature 对象。示例代码如下所示:

import tensorflow as tf
import numpy as np

def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
# BytesList won't unpack a string from an EagerTensor.
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# Print BytesList.
print(_bytes_feature(b'test_string'))
print(_bytes_feature(u'test_bytes'.encode('utf-8')))
# Print FloatList.
print(_float_feature(np.exp(1)))
# Print Int64List.
print(_int64_feature(True))
print(_int64_feature(1))
# Serialize and Deserialize Feature.
serialized_feature = _float_feature(np.exp(1)).SerializeToString()
print(serialized_feature)
feature_proto = tf.train.Feature.FromString(serialized_feature)
print(feature_proto)

其中 _bytes_feature_float_feature_int64_feature 函数分别用来生成 BytesListFloatListInt64List 格式的 Feature 对象,这里假设这 3 个函数的参数 value 均为单个值。

由于 tf.train.*List 函数接收的参数均为列表 (list) 或数组 (array),所以上面的代码中才会加上 [] 符号来表示列表,如果输入的参数 value 已经为列表或数组则无需此操作。另外还需注意传递给 3 个函数的参数 value 的基础类型要与具体的 *List 相匹配,否则会报错。

在生成了 Feature 对象后,可以调用其 SerializeToString 方法对其进行序列化,从而生成序列化后的字符串。另外还可以使用 tf.train.Feature.FromString 方法来将序列化后的数据还原为 Feature 对象。

接着,我们就可以构建 Example 对象并对其进行序列化了。假设我们有 4Feature ,它们分别为 boolean 类型的特征、 integer 类型的特征、 string 类型的特征和 float 类型的特征,我们首先将这 4 种特征通过上面 3 个函数编码后返回相应的 Feature 对象,然后构建一个 Feature Map 字典并生成 Features 对象,最后使用 Features 对象生成 Example 对象并进行序列化。将上述流程统一到一个函数 serialize_example 中,其示例代码如下所示:

def serialize_example(feature0, feature1, feature2, feature3):
"""
Creates a tf.train.Example message ready to be written to a file.
"""
# Create a dictionary mapping the feature name to the tf.train.Example-compatible
feature = {
'feature0': _int64_feature(feature0),
'feature1': _int64_feature(feature1),
'feature2': _bytes_feature(feature2),
'feature3': _float_feature(feature3),
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(features=tf.train.Features(
feature=feature))
return example_proto.SerializeToString()

# Serialize and Deserialize Example.
serialized_example = serialize_example(False, 4, b'goat', 0.9876)
print(serialized_example)
example_proto = tf.train.Example.FromString(serialized_example)
print(example_proto)

同样地,可以调用 Example 对象的 SerializeToString 来将其序列化为字符串,调用 tf.train.Example.FromString 方法来将序列化后的 Example 对象还原。

TFRecord 文件生成

假设我们有 4 种类型的 Feature ,如上一节所述,并假设它们的原始数据 (numpy) 生成方式如下述代码所示:

# The number of observations in the dataset.
n_observations = int(1e4)
# Boolean feature, encoded as False or True.
feature0 = np.random.choice([False, True], n_observations)
# Integer feature, random from 0 to 4.
feature1 = np.random.randint(0, 5, n_observations)
# String feature.
strings = np.array([b'cat', b'dog', b'chicken', b'horse', b'goat'])
feature2 = strings[feature1]
# Float feature, from a standard normal distribution.
feature3 = np.random.randn(n_observations)
print(feature0, feature1, feature2, feature3)

现在我们要使用这 4Feature 来生成一个包含 10,000 个数据样本的 TFRecord 文件,可以使用以下几种方式进行生成。

使用 tf.data 生成

首先使用 tf.data.Dataset.from_tensor_slices 函数来生成一个包含原始数据类型的 dateset ,代码如下所示:

features_dataset = tf.data.Dataset.from_tensor_slices(
(feature0, feature1, feature2, feature3))
# Print dataset.
print(features_dataset)
# Print one element in dataset.
for f0, f1, f2, f3 in features_dataset.take(1):
print(f0, f1, f2, f3)

接着我们使用上一节定义的 serialize_example 函数来生成一个包含序列化字符串类型的 dataset ,代码如下所示:

def generator():
for features in features_dataset:
yield serialize_example(*features)

serialized_features_dataset = tf.data.Dataset.from_generator(
generator,
output_types=tf.string,
output_shapes=(),
)
# Print serialized dataset.
print(serialized_features_dataset)
# Print one element in serialized_features_dataset.
for s in serialized_features_dataset.take(1):
print(s)

最后我们将序列化后的 dataset 写入 TFRecord 文件中,代码如下所示:

filename = 'train.tfrecord'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)

注意这里的 writer 使用的是 tf.data.experimental.TFRecordWriter 对象,它专用于将序列化的 dataset 对象写入到 TFRecord 文件中,要与后面介绍的 tf.io.TFRecordWriter 对象区分开来。

使用 tf.io 生成

首先将每个数据样本都转为 tf.train.Example 对象并序列化,然后再将其写入 TFRecord 文件中,这里同样使用上面介绍过的 serialize_example 函数来进行序列化,代码如下所示:

# Write the `tf.train.Example` observations to the file.
with tf.io.TFRecordWriter(filename) as writer:
for i in range(n_observations):
example = serialize_example(
feature0[i],
feature1[i],
feature2[i],
feature3[i],
)
writer.write(example)

这里的 writer 使用的是 tf.io.TFRecordWriter 对象,它直接将序列化的字符串写入到 TFRecord 文件中。

一般情况下,这种生成 TFRecord 文件的方式在 python 中是最常使用的,在实际使用中可以根据具体情况进行选择。

使用 MapReduce 生成

在数据处理环节,我们可能会使用 MapReduce 进行一些预处理操作,同时我们也希望可以直接借助 MapReduce 任务来生成多个 TFRecord 数据文件以供分布式训练使用,为了满足这一需求, TensorFlow 生态提供了一个扩展库 tensorflow-hadoop ,它包含了 TFRecord 格式的 MapReduce InputFormatOutputFormat 实现。利用这个扩展库,我们就可以直接使用 MapReduce 任务来生成和读取 TFRecord 文件了。部分示例代码如下所示:

// Main function.
import org.tensorflow.hadoop.io.TFRecordFileOutputFormat;
Job job = Job.getInstance(config, "TFRecord");
job.setOutputFormatClass(TFRecordFileOutputFormat.class);

// Mapper or Reducer.
import java.util.Arrays;
import com.google.protobuf.ByteString;
import org.tensorflow.example.BytesList;
import org.tensorflow.example.Example;
import org.tensorflow.example.Feature;
import org.tensorflow.example.Features;
import org.tensorflow.example.FloatList;
import org.tensorflow.example.Int64List;
// map or reduce function
// *List value.
Int64List value0 = Int64List.newBuilder().addAllValue(Arrays.asList(0L)).build();
Int64List value1 = Int64List.newBuilder().addAllValue(Arrays.asList(4L)).build();
BytesList value2 = BytesList.newBuilder()
.addAllValue(Arrays.asList(ByteString.copyFrom("goat".getBytes()))).build();
FloatList value3 = FloatList.newBuilder().addAllValue(Arrays.asList(0.9876f)).build();
// All features.
Feature feature0 = Feature.newBuilder().setInt64List(value0).build();
Feature feature1 = Feature.newBuilder().setInt64List(value1).build();
Feature feature2 = Feature.newBuilder().setBytesList(value2).build();
Feature feature3 = Feature.newBuilder().setFloatList(value3).build();
// Feature map.
Features feature = Features.newBuilder().putFeature("feature0", feature0)
.putFeature("feature1", feature1).putFeature("feature2", feature2)
.putFeature("feature3", feature3).build();
// Example.
Example example = Example.newBuilder().setFeatures(feature).build();
// Write to TFRecord file.
context.write(new BytesWritable(example.toByteArray()), NullWritable.get());

需要注意的是,为了匹配正在使用的 hadoop 版本,你可能需要修改 tensorflow-hadoop 源码中的 pom.xml 文件,将 hadoop.version 设置为你正在使用的 hadoop 版本并使用 maven 工具重新编译该项目,然后将生成的 jar 包引入到 MapReduce 项目中,避免因版本不匹配而报错。

另外,为了使 MapReduce 项目能正常编译,你还需引入 org.tensorflow:proto 库以及 com.google.protobuf:protobuf-java 库,可以从 maven 官方仓库搜索这 2 个库的最新版本并加入到 gradlemaven 项目的配置文件中,然后再进行项目编译即可。

使用 TFRecorder 生成

由于在生成 TFRecord 文件时往往需要编写大量的复杂代码,为了优化代码的复杂度, TensorFlow 官方开源了 TensorFlow Recorder 项目(即 TFRecorder)来更为便捷地生成 TFRecord 文件。

TFRecorder 允许用户从 Pandas dataframeCSV 直接生成 TFRecords 文件,而无需编写任何复杂的代码。其对于图像数据的处理尤其方便,在 TFRecorder 之前,要大规模生成 TFRecord 格式的图像数据,必须编写一个数据流水线来从存储中加载图像并将结果序列化为 TFRecord 格式,而现在只需几行代码即可生成基于图像的 TFRecord 文件。示例代码如下所示:

import pandas as pd
import tfrecorder

# From Pandas DataFrame
df = pd.read_csv('/path/to/data.csv')
df.tensorflow.to_tfr(output_dir='/my/output/path')

# From CSV
tfrecorder.create_tfrecords(
source='/path/to/data.csv',
output_dir='/my/output/path',
)

# From an image directory
tfrecorder.create_tfrecords(
source='/path/to/image_dir',
output_dir='/my/output/path',
)

更多 TFRecorder 的用法请参考其官方文档。

TFRecord 文件读取

TensorFlow 提供了专用于读取 TFRecord 文件的 API 接口 tf.data.TFRecordDataset ,该接口可以将 TFRecord 文件中的内容读取到 dataset 中。代码如下所示:

# Read TFRecord file to dataset.
raw_dataset = tf.data.TFRecordDataset(filename)
print(raw_dataset)

此时 dataset 中存储的是序列化格式的字符串,如果要将其解析为真实的值,还需要进一步操作。

还原为 Example

我们可以将 raw_dataset 中的每个元素都还原为 tf.train.Example ,一般在小范围对 TFRecord 数据进行检查时使用,示例代码如下所示:

for raw_record in raw_dataset.take(1):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
# or
example_proto = tf.train.Example.FromString(raw_record.numpy())
print(example_proto)

模型训练使用

为了在模型训练时使用该 dataset ,我们需要将 raw_dataset 中的每个元素都解析为 FeatureMap ,以匹配 Keras 模型的输入与输出。代码如下所示:

def _parse_function(example_proto):
# Create a description of the features.
feature_description = {
'feature0': tf.io.FixedLenFeature([], tf.int64, default_value=0),
'feature1': tf.io.FixedLenFeature([], tf.int64, default_value=0),
'feature2': tf.io.FixedLenFeature([], tf.string, default_value=''),
'feature3': tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
}
# Parse the single input `tf.train.Example` proto using the dictionary above.
# return tf.io.parse_single_example(example_proto, feature_description)
# Parse the batch input tf.train.Example protos using the dictionary above.
return tf.io.parse_example(example_proto, feature_description)

# Print parsed dataset.
parsed_dataset = raw_dataset.map(_parse_function)
print(parsed_dataset)
# Print one element in parsed_dataset.
for parsed_record in parsed_dataset.take(1):
print(parsed_record)

这里使用了 tf.io.parse_example 函数来将序列化后字符串解析为指定的数据类型,我们需要提前准备好 feature_description 字典, 该字典定义了 feature 的名称、长度(定长/变长)、数据类型以及默认值,以供在解析时使用。最终我们通过调用 raw_datasetmap 方法来将该解析函数应用到 dataset 中的每个元素。

另外,我们也可以使用 tf.io.parse_single_example 函数来进行解析,但要注意它与 tf.io.parse_example 的区别,前者适合解析单个序列化的元素,而后者适用于一个 batch 的解析。在TensorFlow 数据输入的最佳实践一文中曾介绍过 dataset 的向量化 map 操作,即对 dataset 先应用 batch 转换然后再应用 map 转换以提升效率,因此这里推荐使用后者作为序列化数据的解析函数。

最终我们可以将 parsed_dataset 应用于模型的训练中。示例代码如下所示:

model.fit(parsed_dataset)

注: label 数据需要与 features 数据放在同一个 dataset 中, TensorFlow 会根据模型输入和输出 Tensornamedataset 中获取相应的数据来进行训练。

参考资料

  1. TFRecord and tf.train.Example
  2. TFRecord 相关 ProtoBuf 文件地址
  3. 创建 TFRecords 的救星 — TensorFlow Recorder 现已开源!
  4. TFRecorder Github 地址
  5. Hadoop MapReduce InputFormat/OutputFormat for TFRecords

推荐阅读

欢迎关注我的其它发布渠道