Overcoming Data Preprocessing Bottlenecks with TensorFlow Data Service, NVIDIA DALI, and Other Methods | by Chaim Rand


Maximize Training Resource Utilization, Accelerate Learning, Save Money

Can you find the bottleneck? Photo by viswanath muddada on Unsplash

In a previous post, I spoke about the importance of profiling the runtime performance of your DNN training sessions as a means to making the most of your training resources, accelerating your training, and saving money. I described a typical training pipeline, (see the diagram below), reviewed some of the potential performance bottlenecks, and surveyed some of the tools available for identifying such bottlenecks. In this post I would like to expand on one of the more common performance bottlenecks, the CPU bottleneck, and some of the ways to overcome it. More specifically, we will discuss bottlenecks that occur in the data preprocessing pipeline, and ways to overcome them.

In the context of this post, we will assume that we are using TensorFlow, specifically TensorFlow 2.4, to train an image processing model on a GPU device, but the content is, mostly, just as relevant to other training frameworks, other types of models, and other training accelerators.

Sample Training Pipeline (by author)

A CPU bottleneck occurs when the GPU resource is under utilized as a result of one, or more of the CPUs, having reached maximum utilization. In this situation, the GPU will be partially idle while it waits for the CPU to pass in training data. This is an undesired state. Being that the GPU is, typically, the most expensive resource in the system, your goal should always be to maximize its utilization. Without getting into too many technical details, a CPU bottleneck generally occurs when the ratio between the “amount” of data pre-processing, which is performed on the CPU, and the “amount” of compute performed by the model on the GPU, is greater that the ratio between the overall CPU compute capacity and the overall GPU compute capacity. For example, if both your CPU cores and GPU are maximally utilized, and then you upgrade to a more powerful GPU, or downgrade to a system with fewer CPU cores, your training runtime performance will become CPU bound.

Naturally, your first instinct will be to simply switch over to a machine with a more appropriate CPU to GPU compute ratio. But, sadly, most of us don’t have that freedom. And while cloud services, such as Amazon SageMaker, offer a variety of training instance types, with different CPU-compute to GPU-compute ratios, you may find that none of them quite fit your specific needs.

Assuming that you are stuck with the system that you have, what steps can you take to address your performance bottleneck and speed up the training?

In the next sections we will propose four steps for addressing the preprocessing data bottleneck.

  1. Identify any operations that can be moved to the data preparation phase
  2. Optimize the data pre-processing pipeline
  3. Perform some of the pre-processing steps on the GPU
  4. Use the TensorFlow data service to offload some of the CPU compute to other machines

In order to facilitate our discussion, we will build a toy example based on Resnet50.

In the code block below, I have built a model using TensorFlow’s built in Resnet50 application. I have added a relatively heavy data pre-processing pipeline which includes dilation, blur filtering, and a number of TensorFlow pre-processing layers. (See the documentation for the advantages of using such layers.)

import tensorflow as tf
import
tensorflow_addons as tfa
from
tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.layers.experimental import preprocessing
def get_dataset(batch_size):
# parse TFRecord
def parse_image_function(example_proto):
image_feature_description =
{'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64)}
features = tf.io.parse_single_example(
example_proto, image_feature_description)
image = tf.io.decode_raw(features['image'], tf.uint8)
image.set_shape([3 * 32 * 32])
image = tf.reshape(image, [32, 32, 3])
label = tf.cast(features['label'], tf.int32)
return image, label
# dilation filter
def dilate(image, label):
dilateFilter = tf.zeros([3, 3, 3], tf.uint8)
image = tf.expand_dims(image, 0)
image = tf.nn.dilation2d(
image, dilateFilter, strides=[1, 1, 1, 1],
dilations=[1, 1, 1, 1],
padding='SAME',
data_format='NHWC')
image = tf.squeeze(image)
return image, label
# blur filter
def blur(image, label):
image = tfa.image.gaussian_filter2d(image=image,
filter_shape=(11, 11), sigma=0.8)
return image, label
# rescale filter
def rescale(image, label):
image = preprocessing.Rescaling(1.0 / 255)(image)
return image, label
# augmentation filters
def augment(image, label):
data_augmentation = tf.keras.Sequential(
[preprocessing.RandomFlip("horizontal"),
preprocessing.RandomRotation(0.1),
preprocessing.RandomZoom(0.1)])
image = data_augmentation(image)
return image, label
autotune = tf.data.experimental.AUTOTUNE
options = tf.data.Options()
options.experimental_deterministic = False
records = tf.data.Dataset.list_files('data/*',
shuffle=True).with_options(options)
# load from TFRecord files
ds = tf.data.TFRecordDataset(records,
num_parallel_reads=autotune).repeat()
ds = ds.map(parse_image_function, num_parallel_calls=autotune)
ds = ds.map(dilate, num_parallel_calls=autotune)
ds = ds.map(blur, num_parallel_calls=autotune)
ds = ds.batch(batch_size)
ds = ds.map(rescale,num_parallel_calls=autotune)
ds = ds.map(augment, num_parallel_calls=autotune)
ds = ds.prefetch(autotune)
return ds
if __name__ == "__main__":
model = ResNet50(weights=None,
input_shape=(32, 32, 3),
classes=10)
model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=tf.optimizers.Adam())
dataset = get_dataset(batch_size = 1024)
model.fit(dataset, steps_per_epoch=100, epochs=10))

The raw data input is stored in TFRecord files, which I created from the CIFAR-10 dataset, (using this script).

I have created this example so as to artificially create a performance bottleneck. I would not, under any circumstances, recommend using it for actual training.

All tests were run on an Amazon ec2 p2.xlarge instance type using an Amazon Deep Learning AMI.

There are a number of different tools and techniques for evaluating the runtime performance of a training session, and identifying and studying an input pipeline bottleneck. Let’s review just a few of them:

System Metrics

The first thing to check is the system resource utilization. There are a number of different ways to do this. The Linux top command shows the CPU utilization. To see how the utilization breaks down per CPU core, type ‘1’ while top is running. To measure the GPU utilization, you can use nvidia-smi. When training in Amazon EC2, you can use Amazon CloudWatch to monitor system metrics. While the GPU metrics are not included by default, you can add these by using the gpumon utility. Below is a sample graph of the CPU and GPU utilization captured across several different experiments.

Performance metrics in Amazon CloudWatch (by author)

In the use case we introduced above, the average reported GPU utilization caps out under 50% with long periods of idle time. At the same time, the CPU is highly utilized, with some of the cores reaching maximum utilization.

Performance Profilers

To dive into the next level of detail of how the training is performing, you can use a performance profiler.

TensorFlow Profiler: The built in TensorFlow profiler includes a wealth of performance analytics, and in particular tools for analyzing the performance of the input pipeline. You can view using TensorBoard by installing the TensorBoard profile plugin. One way to enable the profiler, is to program the training loop with the TensorBoad callback.

# program the callback to capture steps 20-24
cb = tf.keras.callbacks.TensorBoard(
log_dir='/tmp/profile', profile_batch='20,24',
histogram_freq=0, write_images=False)
model.fit(dataset, steps_per_epoch=100, epochs=10, callbacks=[cb])

Below is the profiling overview page for our use case example on which the data input bottleneck is glaringly apparent.

tf profiler — overview page (by author using TensorBoard)

The trace-viewer tool allows you to drill down into the details of the pipeline execution, and study the flow of data in between the CPU and GPU. In our example, you can clearly see long periods of GPU idle time, due to the data input bottleneck.

tf profiler — trace-viewer (by author using TensorBoard)

Amazon SageMaker Debugger: If you are training in the Amazon SageMaker environment, you can take advantage of the profiling features that are built into Amazon SageMaker Debugger. Here is an example of how a severe bottleneck in the input pipeline will appear in Amazon SageMaker Studio.

Resource utilization in Amazon SageMaker Studio (by author)

Linux Profilers: General purpose Linux performance profilers are also often helpful in analyzing training bottlenecks. For example, using the Linux perf utility we are able to see that our CPU spends a large chunk of its time on an internal linear algebra function:

Linux perf capture (by author)

Throughput Measurement

Being that the objective of our analysis is to accelerate the training runtime, it is only natural that we would use this metric as a measure of our performance.

In our example, we will use the average runtime of a single (100 step) epoch as our primary performance metric, and measure how different changes to the model affect this value. The average runtime of a single epoch of the model above, is 122 seconds.

A useful technique (described here) for measuring what the runtime would be, if it were not for the data input bottleneck, is by caching the first processed input batch and using the same cached batch for all subsequent steps. This essentially shuts off the preprocessing pipeline, and enables us to calculate the ideal epoch runtime.

To implement the technique, we simply tack on the following line of code at the end of our dataset creation:

ds = ds.take(1).cache().repeat()

By applying this technique to our example, we are able to reduce the runtime to 58 seconds. In other words, were it not for the data input bottleneck, we would be able to speed up training by more than a factor of 2.

In the next sections we will walk through a number of proposed steps for solving a bottleneck in the input pipeline. We will demonstrate some of the steps on our toy example, keeping in mind the target runtime we have just calculated, 58 seconds per epoch.

The first thing to do in order to address the data preprocessing bottleneck, is to identify any operations that can be preponed into the, raw, data record creation phase. The more operations we can move into the data creation phase, the more we can free up CPU cycles during training. Any operations that are run in the beginning of the pipeline, in a deterministic fashion (have no random component), that do not depend on a hyper-parameter, and do not excessively increase the size of the data, are good candidates for preponement. In our toy example, the dilation operation, (assuming it does not depend on a hyper parameter), fits this criteria. So the first thing we will do is knock off the dilation operation, and assume that the TFRecords contain the image data after it has already undergone appropriate dilation.

In our specific implementation, the blur filter might have also been a good candidate for preponement, but since, in most cases, blurring is applied randomly, we will leave it in.

By removing just the dilation operation, our runtime decreases to 115 seconds per epoch. This is less than our starting value of 122 seconds per epoch, but we still have a long way to go to get to our target of 58 seconds per epoch.

One thing to take note of, is that certain operations might change the size of your data records, and thus, might impact the overall size of your dataset, as well as the amount of network traffic during training (if the training set is stored remotely). If you choose to prepone operations that increase the size of the data, excessively, you might run the risk of replacing one bottleneck with another, i.e. a network IO or data-loading bottleneck.

Once we have moved as many operations as possible to the data creation phase, the second step is to identify ways in which to optimize the performance of the remaining pipeline.

Pipeline Tuning

Often times, some small tweaks to the input pipeline setup, could reduce the performance overhead. Here are just a few things you could try:

  1. If you have multiple dataset map functions that are relatively small, consider grouping them into a single map function.
  2. Conversely, if you have a dataset map function that is very large, consider breaking it up into two or more smaller functions in order to better utilize the built in parallel call support.
  3. Look for operations that could be applied post batching rather than per record. (In our example, the blur function could, theoretically, be applied on training batches, but since it is typically applied in a random fashion, we will leave it per record.)
  4. Use low precision types wherever possible. Postpone casting to higher precision to the end of the pipeline.
  5. If your pipeline includes tf.numpy_function or tf.py_function, consider using TensorFlow primitives instead.

CPU optimizations and extensions

Make sure that your TensorFlow binaries were configured (and compiled) to take full advantage of your CPU, and CPU extensions. For example, if you are using a modern x86 ISA CPU, (such as Intel or AMD), make sure to use TensorFlow binaries that are optimized to use the CPU’s advanced vector extensions (e.g. AVX2). Intel, in general, offers a wide variety of binaries that are specifically optimized to run on Intel CPUs, including intelpython, and TensorFlow-mkl.

Note that one of the advantages of using a cloud based solution for training, is that the cloud learning environment is, (presumably,) configured to take full advantage of the cloud system resources.

CPU Load Balancing

When you have a CPU bottleneck in a multi-CPU core system, you might find that, while one or more of the CPU cores are at full utilization, other are not. This is actually quite common. One thing that you could try, is to improve the load balancing between the CPUs so that the overall CPU utilization increases. You could try this by using the tf.config.set_logical_device_configuration API to separate the CPU compute into multiple logical devices, and the tf.device API to specify where each operation should be run. You can also try to improve the load balancing by playing around with different options for the num_parallel_calls argument of the tf.data.Dataset.mapfunction, (instead of relying on TensorFlow’s autotune feature). In any case, keep in mind that this is likely to be a tedious, pain-staking, effort, and that even the slightest change to your model, will, likely require recalculating the load balancing.

As in our example, you might find that even after you have exhausted all options for preponing operations to the data creation phase, and optimizing the CPU code, you continue to face a data preprocessing bottleneck. The next option to consider is to modify the load balancing between the CPU and the GPU, by moving some of the preprocessing operations onto the GPU. The downside to this approach is that we are almost certain to increase the runtime of a GPU step. Also, since we are increasing the size of the computation graph that is running on the GPU, we may need to free up some GPU memory by running with a smaller batch size. As a result, it is highly unlikely that we will be able to achieve the target throughput we calculated above. But if it reduces the overall train time, then it is totally worth it. Let’s explore a few ways to offload preprocessing operations onto the GPU.

Postpone Preprocessing Operations to GPU

In most cases, the best way to offload from the CPU, is by moving operations that are performed at the end of the preprocessing pipeline unto the GPU. By targeting these “tail” operations, rather than operations in the middle of the pipeline, we avoid the overhead of data transfers between the GPU and the CPU. If the “tail” operations are performed on the model input, we can place them at the head of the model. If they are performed on label data, we can modify our loss function to perform these operations before applying the actual loss.

In our example, we have removed the augmentations from our input pipeline, and instead applied them to the beginning of our GPU computation graph:

data_augmentation = tf.keras.Sequential(
[preprocessing.RandomFlip("horizontal"),
preprocessing.RandomRotation(0.1),
preprocessing.RandomZoom(0.1)])
inputs = tf.keras.Input(shape=(32, 32, 3))
# Augment images
x = data_augmentation(inputs)
# Add the rest of the model
outputs = tf.keras.applications.ResNet50(weights=None,
input_shape=(32, 32, 3), classes=10)(x)
model = tf.keras.Model(inputs, outputs)

By applying this change, we are able to reduce the runtime down to 108 seconds per epoch.

By wrapping operations with a tf.device(‘/device:GPU:0’) statement, we can force certain operations to run on the GPU. The downside to this method, is that it requires transferring data to and from the GPU device.

In our example, we chose to apply this technique to the blur function, by modifying it as follows:

def blur(image, label):
import tensorflow_addons as tfa
with tf.device('/device:GPU:0'):
image = tfa.image.gaussian_filter2d(image=image,
filter_shape=(11, 11), sigma=0.8)
return image, label

When running the blur function on the GPU in this manner, while leaving the augmentations on the CPU, we attain an epoch runtime of 97 seconds. When combining both techniques, the epoch runtime is 98 seconds.

Using the TensorFlow profiler trace-viewer, we are able to see how the tf.device technique increases the data traffic between the CPU and the GPU:

Offload to GPU with tf.device — tf profiler trace-viewer (by author using TensorBoard)

By comparing the highlighted streams in this experiment, to the same streams in the trace-viewer capture above, we see that there are significantly more memory copies to and from the GPU. We also see that the GPU is far more active.

Another way to verify that the blur function is indeed running on the CPU, is to set tf.debugging.set_log_device_placement(True). You can run the example, once with the blur function on the CPU, and once with the blur function on the GPU, and see how it impacts the output of the log device placement routine.

NVIDAI DALI

NVIDAI DALI is a framework for building highly optimized preprocessing pipelines. In particular, using NVIDIA DALI, you can program parts of your pipeline, or your entire pipeline, to run on the GPU. A DALI pipeline is built from DALI operations. DALI comes with a list of supported operations, as well as APIs for creating custom operations. Using the TensorFlow DALI plugin, DALI pipelines can be wrapped with the, tf.data.Dataset API compliant, DALIDataset, as shown here. In addition, DALI supports loading from TFRecord files as shown here. Unfortunately, as of the time of this writing, the documented support for DALI is limited to version 1 compatible TensorFlow. (Those of you who have read my previous blogs, should already know how I feel about using legacy code.) In addition, NVIDIA DALI was designed for NVIDIA GPUs. It will not run on other machine learning accelerators. Another consideration is distributed training. While DALI does support multi-gpu training, depending on how you implement distributed training, (e.g. with Horovod or a TensorFlow distribution strategy, with model.fit() or a custom training loop), integrating a DALI pipeline will vary between being slightly more difficult, and much more difficult. If you feel strongly about using the latest TensorFlow features, or if you want your code to be compliant with other accelerators, (AWS Tranium, Habana Gaudi, TPU, etc.), or if converting your pipeline to DALI operations would require a lot of work, or if you rely on the high level TensorFlow distributed training APIs, NVIDIA DALI might not be the right solution for you.

Using DALI requires use of the TensorFlow DALI plugin python package. See the documentation for installation steps. In the code block below, I show how to convert the pipeline from our use case to NVIDIA DALI. I have left out some of the random augmentations, as there were no built-in, corresponding, DALI operations.

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
import nvidia.dali.plugin.tf as dali_tf
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
class TFRecordPipeline(Pipeline):
def __init__(self, batch_size, num_threads,
device = 'cpu', device_id = 0):
super(TFRecordPipeline, self).__init__(batch_size,
num_threads,
device_id)
self.input = ops.TFRecordReader(
path = ['data/train0.tfrecords'],
index_path = ['index/train0'],
features = {
"image": tfrec.FixedLenFeature((), tfrec.string, ""),
"label": tfrec.FixedLenFeature([], tfrec.int64, -1)})
self.decode = ops.Cast(device=device,dtype=types.UINT8)
self.reshape = ops.Reshape(device=device,
shape=[32, 32, 3])
self.cast = ops.Cast(device=device,
dtype=types.DALIDataType.INT32)
self.blur = ops.GaussianBlur(device=device,
window_size=11,sigma=0.8)
self.iter = 0
def define_graph(self):
inputs = self.input()
images = self.decode(inputs["image"].gpu())
images = self.reshape(images)
images = self.blur(images)/255.
labels = self.cast(inputs["label"].gpu())
return (images, labels)
def iter_setup(self):
pass
if __name__ == "__main__":
batch_size = 1024
shapes = ((batch_size, 32, 32, 3),
(batch_size))
pipe = TFRecordPipeline(batch_size=batch_size,
num_threads=4,
device='gpu',
device_id=0)
with tf.device('/gpu:0'):
# Create dataset
ds = dali_tf.DALIDataset(
pipeline=pipe,
batch_size=batch_size,
output_shapes=shapes,
output_dtypes=(tf.float32, tf.int32),
device_id=0)
model = tf.keras.applications.resnet.ResNet50(
weights=None,
input_shape=(32, 32, 3),
classes=10)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
optimizer=tf.keras.optimizers.Adam())
model.fit(ds, steps_per_epoch=100, epochs=10)

I ran the script in TensorFlow 2.3 (as it would seem that, as of the time of this writing, DALI has not been updated to support TensorFlow 2.4). The resultant runtime of a 100 step epoch was 77 seconds. While this trial did not include the augmentations, it is clear that DALI offers potential for significant runtime improvement.

As I mentioned above, offloading operations to the GPU might require freeing up some memory be reducing the size of the training batch. It turns out that this was not required in our toy example. (This probably means that we could have started out with a larger batch size.) This finding will not necessarily carry over to other models, especially if you are making sure to maximize your GPU memory and batch size.

The final option we explore, is to offload some of the preprocessing activity to other machines. Rather than moving preprocessing computation onto the GPU, we will move it to CPU cores on auxiliary machines. We will explore this approach using the, relatively new, TensorFlow data service feature. Introduced in TensorFlow version 2.3, tf.data.experimental.service provides APIs for defining dedicated worker machines for performing data preprocessing. A dispatch server is responsible for distributing preprocessing tasks to one, or more, worker servers, each of which load the raw data directly from storage, and send the processed data to the GPU device. By applying tf.data.experimental.service.distribute to your dataset, you can program the dataset to run all preprocessing operations up to the point of application, on the dedicated workers. The number and types of worker services to use, and where in the pipeline to apply the service, should be determined by considerations, such as the severity of your bottleneck, the availability and cost of auxiliary machines, the manner in which the preprocessing operations impact the size of the data, and how this impacts the network traffic. For example, if you choose a remote worker machine with a low network bandwidth, and program a preprocessing operation that blows up the size of the data to run on the worker, you might not see any performance improvement.

Let’s demonstrate the use of this API on our toy example. For this demonstration, I have chosen a single, auxiliary Amazon EC2 c5.4xlarge instance with 16 CPU cores, and with the same Amazon Deep Learning AMI. The communication between p2.xlarge and the c5.4xlarge will use the grpc network protocol, so you need to make sure that both instances are in a security group which allows inbound traffic of the relevant protocol, one from the other.

On the worker device we run the following script, where “10.0.1.171” is the ip adress of the auxiliary device:

import tensorflow as tf
d_config = tf.data.experimental.service.DispatcherConfig(port=5000)
dispatcher = tf.data.experimental.service.DispatchServer(d_config)
w_config = tf.data.experimental.service.WorkerConfig(port=5001,
dispatcher_address=dispatcher.target.split("://")[1],
worker_address='10.0.1.171:5001')
worker = tf.data.experimental.service.WorkerServer(w_config)
dispatcher.join()

Note that we are running the dispatch server and worker server on the same machine. We also make sure that the TFRecord files are copied over to this machine, as the workers will load the raw data from those files. On the GPU machine we have modified the train script as follows:

autotune = tf.data.experimental.AUTOTUNE
options = tf.data.Options()
options.experimental_deterministic = False
records = tf.data.Dataset.list_files('data/*', shuffle=True).with_options(options)
ds = tf.data.TFRecordDataset(records, num_parallel_reads=autotune).repeat()
ds = ds.map(parse_image_function, num_parallel_calls=autotune)
ds = ds.map(blur, num_parallel_calls=autotune)
# use the TensorFlow Data Service
ds = ds.apply(tf.data.experimental.service.distribute(
processing_mode="parallel_epochs",
service='grpc://10.0.1.171:5000'))
ds = ds.batch(batch_size)
ds = ds.map(rescale,num_parallel_calls=autotune)
ds = ds.map(augment, num_parallel_calls=autotune)
ds = ds.prefetch(autotune)

Note, that we have programmed just the record parsing, and heavy blur function to run on the worker. The batching and augmentations remain on the primary device.

The results of running this setup could not be better! The runtime per epoch is 58 seconds, meeting the target we set for ourselves above. By using an auxiliary CPU device, and the TensorFlow data service to offload preprocessing computation, we have completely solved the CPU bottleneck! And indeed, we find that the average GPU utilization in this case is up around 97%.

In the table below we summarize our findings on our toy, resnet50 model:

In this post we have surveyed a number of ways to address a performance bottleneck in the data input pipeline. In particular, we have shown how the TensorFlow Data Service can be used to completely solve this bottleneck. This survey is not intended to be all comprehensive. There are likely to be additional tools and techniques available. While we have demonstrated how to apply these techniques to a toy Resnet50 model, their impact is certain to vary across models and datasets. Please don’t hesitate to share your own tools, techniques, and experiences.



Source link

Be the first to comment

Leave a Reply

Your email address will not be published.


*