Building Distributed Tensorflow Models

Building Distributed Tensorflow Models
by Elliot Smith, October 18, 2018

Requirements

Optional

Creating a new Model

git clone https://github.com/maxwellmri/Distributed-Tensorflow-Template.git
cd Distributed-Tensorflow-Template
virtualenv -p python3.6 example_model_env
source example_model_env/bin/activate
pip install -r requirements.txt
pre-commit install
git remote set-url origin https://new.repo-url.here
git add -A
git commit -m "initialised new project from "
git push -u origin master
class VGG16(BaseModel):
    def __init__(self, config: dict) -> None:
        """
        :param config: global configuration
        """
        super().__init__(config)
def _create_model(x: tf.Tensor, is_training: bool) -> tf.Tensor:
    """
    :param x: input data
    :param is_training: flag if currently training
    :return: completely constructed model
    """
    pass
# TODO: update model predictions
predictions = {
    "classes": tf.argmax(input=logits, axis=1),
    "probabilities": tf.nn.softmax(logits),
}if mode == tf.estimator.ModeKeys.PREDICT:
    # TODO: update output during serving
    export_outputs = {
        "labels": tf.estimator.export.PredictOutput(
            {"label": predictions["classes"], "id": features["id"]}
        )
    }
    return tf.estimator.EstimatorSpec(
        mode, predictions=predictions, export_outputs=export_outputs
    )
# TODO: update summaries for tensorboard
tf.summary.scalar("loss", loss)
tf.summary.image("input", tf.reshape(x, [-1, 28, 28, 1]))# if mode is evaluation
if mode == tf.estimator.ModeKeys.EVAL:
    # TODO: update evaluation metrics
    summaries_dict = {
        "val_accuracy": tf.metrics.accuracy(
            labels, predictions=predictions["classes"]
        )
    }
    return tf.estimator.EstimatorSpec(
        mode=mode, loss=loss, eval_metric_ops=summaries_dict
    )
# TODO: update optimiser
optimizer = tf.train.AdamOptimizer(lr)
class VGGTrainer(BaseTrain):
    def __init__(
        self,
        config: dict,
        model: VGGModel,
        train: TFRecordDataLoader,
        val: TFRecordDataLoader,
        pred: TFRecordDataLoader,
    ) -> None:
        """
        This function will generally remain unchanged, it is used to 
        train and
        export the model. The only part which may change is the run
        configuration, and possibly which execution to use  
        (training, eval etc)
        :param config: global configuration
        :param model: input function used to initialise model
        :param train: the training dataset
        :param val: the evaluation dataset
        :param pred: the prediction dataset
        """
        super().__init__(config, model, train, val, pred)
def _export_model(
        self, estimator: tf.estimator.Estimator, save_location: str
    ) -> None:
        """
        Used to export your model in a format that can be used with
        Tf.Serving
        :param estimator: your estimator function
        """
        # this should match the input shape of your model
        # TODO: update this to your input used in prediction/serving
        x1 = tf.feature_column.numeric_column(
            "input", shape=[self.config["batch_size"], 28, 28, 1]
        )
        # create a list in case you have more than one input
        feature_columns = [x1]
        feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
        export_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
            feature_spec
        )
        # export the saved model
        estimator.export_savedmodel(save_location, export_input_fn)
def _parse_example(
        self, example: tf.Tensor
    ) -> Tuple[Dict[str, tf.Tensor], tf.Tensor]:
        """
        Used to read in a single example from a tf record file and do any augmentations necessary
        :param example: the tfrecord for to read the data from
        :return: a parsed input example and its respective label
        """
        # do parsing on the cpu
        with tf.device("/cpu:0"):
            # define input shapes
            # TODO: update this for your data set
            features = {
                "image": tf.FixedLenFeature(shape=[28, 28, 1], 
                                            dtype=tf.float32),
                "label": tf.FixedLenFeature(shape=[1], 
                                            dtype=tf.int64),
            }
            example = tf.parse_single_example(example, 
                                              features=features)            if self.mode == "train":
                input_data = self._augment(example["image"])
            else:
                input_data = example["image"]        return {"input": input_data}, example["label"]
def init() -> None:
    """
    The main function of the project used to initialise all the 
    required functions for training the model
    """
    # get input arguments
    args = get_args()
    # get static config information
    config = process_config()
    # combine both into dictionary
    config = {**config, **args}    # initialise model
    model = VGGModel(config)
    # create your data generators for each mode
    train_data = TFRecordDataLoader(config, mode="train")    val_data = TFRecordDataLoader(config, mode="val")    test_data = TFRecordDataLoader(config, mode="test")    # initialise the estimator
    trainer = VGGTrainer(config, model, train_data, val_data,  
                         test_data)    # start training
    trainer.run()
def process_config() -> dict:
    """
    Add in any static configuration that is unlikely to change very         
    often
    :return: a dictionary of static configuration data
    """
    config = {"exp_name": "example_model_train"}    return config

Training

Example Training Job 
Learning Rate: 0.001
Epochs: 100
Batch Size (train/eval): 512/ 512
Hypothesis:
Model will converge quickly
Results:
Model diverged even quicker
##########################################################
# where to write tfevents
OUTPUT_DIR="gs://model-exports"
# experiment settings
TRAIN_BATCH=512
EVAL_BATCH=512
LR=0.001
EPOCHS=100
# create a job name for the this run
prefix="example"
now=$(date +"%Y%m%d_%H_%M_%S")
JOB_NAME="$ENV_NAME"-"$prefix"_"$now"
# locations locally or on the cloud for your files
TRAIN_FILES="data/train.tfrecords"
EVAL_FILES="data/val.tfrecords"
TEST_FILES="data/test.tfrecords"
##########################################################

Training on CPU

Usage: ./train_local_cpu.sh [ENV_NAME]

Training on GPU

nvidia-smi
Usage: ./train_local_single.sh <GPU_ID> [ENV_NAME]

Distributed local training

config="
{
    \"master\": [\"localhost:27182\"],
    \"ps\": [\"localhost:27183\"],
    \"worker\": [
        \"localhost:27184\",
        \"localhost:27185\"
        ]
}, \"environment\": \"cloud\""
...
# ensure parameter server doesn't use any of the GPUs in this case
export CUDA_VISIBLE_DEVICES=""
# Parameter Server can be run on cpu
task="{\"type\": \"ps\", \"index\": 0}"
export TF_CONFIG="{\"cluster\":${config}, \"task\":${task}}"
run ps# Master should be run on GPU as it runs the evaluation
export CUDA_VISIBLE_DEVICES="1"
task="{\"type\": \"master\", \"index\": 0}"
export TF_CONFIG="{\"cluster\":${config}, \"task\":${task}}"
run master
# Workers (Number of GPUS-1 one used by the master server)
for gpu in 0 1
do
    task="{\"type\": \"worker\", \"index\": $gpu}"
    export TF_CONFIG="{\"cluster\":${config}, \"task\":${task}}"
    export CUDA_VISIBLE_DEVICES="$gpu"    run "worker${gpu}"
done
Usage: ./train_local_dist.sh [ENV_NAME]

Distributed cloud training

Usage: ./train_cloud.sh

Viewing Training on Tensorboard

tensorboard --log-dir path_to_your_checkpoints

Conclusion

This post originally appeared on medium.com by Lewis Smith