TensorFlow is a deep learning framework developed by Google in 2015. It is maintained and continuously updated by implementing results of recent deep learning research. Therefore, TensorFlow supports a large variety of state-of-the-art neural network layers, activation functions, optimizers and tools for analyzing, profiling and debugging deep neural networks. In order to deliver good performance, the TensorFlow installation at NERSC utlizes the optimized MKL-DNN library from Intel. Explaining the full framework is beyond the scope of this website. For users who want to get started we recommend reading the TensorFlow getting started page. The TensorFlow page also provides a complete API documentation.
TensorFlow at NERSC¶
In order to use TensorFlow at NERSC load the TensorFlow module via
module load tensorflow/intel-<version>
<version> should be replaced with the version string you are trying to load. To see which ones are available use
module avail tensorflow.
Running TensorFlow on a single node is the same as on a local machine, just invoke the script with
By default, TensorFlow supports GRPC for distributed training. However, this framework is tedious to use and very slow on tightly couple HPC systems. Therefore, we recommend using Uber Horovod and thus also pack it together with the TensorFlow module we provide. The version of Horovod we provide is compiled against the optimized Cray MPI and thus integrates well with SLURM. We will give a brief overview of how to make an existing TensorFlow code multi-node ready but we recommend inspecting the examples on the Horovod page. We recommend using pure TensorFlow instead of Keras as it shows better performance and the Horovod integration is more smooth.
In order to use TensorFlow, one needs to import the horovod module by doing
import horovod.tensorflow as hvd
One of the first statements should then be
which initializes the MPI runtime. Then, the user needs to wrap the optimizers for distributed training using
opt = hvd.DistributedOptimizer(opt)
To keep track of the global step a global step object has to be created via
tf.train.get_or_create_global_step() and passed to the
apply_gradients) member functions of the optimizer instance.
Furthermore, to ensure model consistency on all nodes it is mandatory to register a broadcast hook via
bcast_hook = [hvd.BroadcastGlobalVariablesHook(0)]
and pass it along with other hooks to the
MonitoredTrainingSession object. For example it is beneficial to register a stop hook via
stop_hook = [tf.train.StopAtStepHook(last_step=num_steps_total)]
For example, a training code like
import tensorflow as tf # Pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Make training operation train_op = opt.minimize(loss) # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.Session() as sess: for steps in range(num_steps_total): # Perform synchronous training. sess.run(train_op)
should read for distributed training
import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Add session stop hook global_step = tf.train.get_or_create_global_step() hooks.append(tf.train.StopAtStepHook(last_step=num_steps_total)) # Make training operation train_op = opt.minimize(loss, global_step=global_step) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op)
It is important to use
MonitoredTrainingSession instead of the regular
Session because it keeps track of the number of global steps and knows when to stop the training process when a correspondig hook is installed. For more fine grained control over checkpointing, a
CheckpointSaverHook can be registered as well. Note that the graph has to be finalized before the monitored training session context is entered. In case of the regular session object, this is a limitation and can cause some trouble with summary writers. Please see the distributed training recommendations for how to handle these cases.
It is important to note that splitting the data among the nodes is up to the user and needs to be done besides the modifications stated above. Here, utility functions can be used to determine the number of independent ranks via
hvd.size() and the local rank id via
hvd.rank(). If multiple ranks are employed per node,
hvd.local_size() return the node-local rank-id's and number of ranks. If the dataset API is being used we recommend using the
dataset.shard option to split the dataset. In other cases, the data sharding needs to be done manually and is application dependent.
Frequently Asked Questions¶
I/O Performance and Data Feeding Pipeline¶
For performance reasons, we recommend storing the data on the scratch directory, accessible via the
SCRATCH environment variable. At high concurrency, i.e. when many nodes need to read the files we recommend staging them into burst buffer. For efficient data feeding we recommend using the
TFRecord data format and using the
dataset API to feed data to the CPU. Especially, please note that the
TFRecordDataset constructor takes
num_parallel_reads options which allow for prefetching and multi-threaded reads. Those should be tuned for good performance, but please note that a thread is dispatched for every independent read. Therefore, the number of inter-threads needs to be adjusted accordingly (see below). The
buffer_size parameter is meant to be in bytes and should be an integer multiple of the node-local batch size for optimal performance.
For best MKL-DNN performance, the module already sets a set of OpenMP environment variables and we encourage the user not changing those, especially not changing the
OMP_NUM_THREADS variable. Setting this variable incorrectly can cause a resource starvation error which manifests in TensorFlow telling the user that too many threads are spawned. If that happens, we encourage to adjust the inter- and intra-task parallelism by changing the
NUM_INTRA_THREADS environment variables. Those parameters can also be changed in the TensorFlow python script as well by creating a session configs object via
and pass that to the session manager
with tf.train.MonitoredTrainingSession(config=sess_config, hooks=hooks) as sess: ...
Please note that
num_total_threads is 64 on Haswell or 272 on KNL.