Maggy Distributed Training on MNIST with Tensorflow example

Maggy enables you to train with Tensorflow distributed optimizers. Using Maggy, you have to make minimal changes in train your model in a distributed fashion.

0. Spark Session

Make sure you have a running Spark Session/Context available. On Hopsworks, just run your notebook to start the spark application.

from maggy import experiment
---------------------------------------------------------------------------

ModuleNotFoundError                       Traceback (most recent call last)

<ipython-input-1-308631a69d5b> in <module>
----> 1 from maggy import experiment


/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/maggy/experiment.py in <module>
     29 from typing import Callable
     30 
---> 31 from maggy import util
     32 from maggy.core.environment.singleton import EnvSing
     33 from maggy.experiment_config.lagom import LagomConfig


/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/maggy/util.py in <module>
     22 
     23 import numpy as np
---> 24 from pyspark import TaskContext
     25 from pyspark.sql import SparkSession
     26 


ModuleNotFoundError: No module named 'pyspark'

1. Model definition

Let’s define the model we want to train. The layers of the model have to be defined in the __init__ function.

Do not instantiate the class, otherwise you won’t be able to use Maggy.

# Model definition.
import tensorflow as tf
from tensorflow import keras
    
class NeuralNetwork(tf.keras.Model):

    def __init__(self, nlayers):
        super().__init__()
        self.conv1 = keras.layers.Conv2D(28, 2, activation='relu')
        self.flatten = keras.layers.Flatten()
        self.d1 = keras.layers.Dense(32, activation='relu')
        self.d2 = keras.layers.Dense(10, activation='softmax')
        
    def call(self, x):
        x = self.conv1(x)
        x = self.flatten(x)
        x = self.d1(x)
        return self.d2(x)

model = NeuralNetwork

2. Dataset creation

You can create the dataset here and pass it to the TfDistributedConfig, or creating it in the training function.

In this example, we are downloading the dataset from tensorflow.

import numpy as np

mnist = tf.keras.datasets.mnist

(x_train, y_train),(x_test, y_test) = mnist.load_data()

x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = np.reshape(x_train, (60000, 28, 28, 1))
x_test = np.reshape(x_test, (10000, 28, 28, 1))

3. Defining the training function

The programming model is that you wrap the code containing the model training inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment.

The function should return the metric that you want to optimize for. This should coincide with the metric being reported in the Keras callback (see next point). You can return the metric list, in this case only the loss element will be printed.

def training_function(model, train_set, test_set, hparams):

    from tensorflow import keras
    
    # Define training parameters
    num_epochs = 10
    batch_size = 256
    learning_rate = 0.1

    criterion = keras.losses.SparseCategoricalCrossentropy()
    optimizer = keras.optimizers.SGD(learning_rate=learning_rate,momentum=0.9,decay=1e-5)
    
    model = model(nlayers = 2)
    
    model.compile(optimizer=optimizer, loss=criterion, metrics=["accuracy"])
    
    model.fit(train_set,
              batch_size=batch_size,
              epochs=num_epochs,
              )

    print("Testing")
    
    loss = model.evaluate(
        test_set,
        batch_size=32)
    
    return loss

4. Configuring the experiment

In order to use maggy distributed training, we have to configure the training model, we can pass it to TfDistributedConfig. the model class has to be an implementation of tf.keras.Model. We can also define train_set, test_set and eventually the model_parameters. model_parameters is a dictionary containing the parameters to be used in the __init__ function of your model.

from maggy.experiment_config.tf_distributed import TfDistributedConfig

#define the constructor parameters of your model
model_parameters = {
    'train_batch_size': 30000,
    'test_batch_size': 5000,
    'nlayers': 2
}

#pass the model parameters in the last 
config = TfDistributedConfig(name="tf_test", 
                             model=model, 
                             train_set=(x_train, y_train), 
                             test_set=(x_test, y_test),
                             hparams=model_parameters
                            )

5. Run distributed training

Finally, we are ready to launch the maggy experiment. You just need to pass 2 parameters: the training function and the configuration variable we defined in the previous steps.

experiment.lagom(training_function, config)