Distributed Training on Iris Dataset
Maggy Distributed Training with Tensorflow on Iris
Maggy enables you to train with Tensorflow distributed optimizers. Using Maggy, you have to make minimal changes in train your model in a distributed fashion.
0. Spark Session
Make sure you have a running Spark Session/Context available.
On Hopsworks, just run your notebook to start the spark application.
1. Model definition
Let’s define the model we want to train. The layers of the model have to be defined in the __init__ function.
Do not instantiate the class, otherwise you won’t be able to use Maggy.
from tensorflow import keras
from tensorflow.keras.layers import Dense
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam
# you can use keras.Sequential(), you just need to override it
# on a custom class and define the layers in __init__()
class NeuralNetwork(Sequential):
def __init__(self):
super().__init__()
self.add(Dense(10,input_shape=(4,),activation='tanh'))
self.add(Dense(8,activation='tanh'))
self.add(Dense(6,activation='tanh'))
self.add(Dense(3,activation='softmax'))
model = NeuralNetwork
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
60 | application_1619178784067_0005 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
2. Dataset creation
You can create the dataset here and pass it to the TfDistributedConfig, or creating it in the training function.
You need to change the dataset path is correct.
train_set_path = "hdfs:///Projects/demo_ml_meb10000/TourData/iris/train.csv"
test_set_path = "hdfs:///Projects/demo_ml_meb10000/TourData/iris/test.csv"
def process_data(train_set, test_set):
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
train_set = pd.read_csv(train_set)
test_set = pd.read_csv(test_set)
X_train = train_set.iloc[1:,1:5].values
y_train = train_set.iloc[1:,5:].values
X_test = test_set.iloc[1:,1:5].values
y_test = test_set.iloc[1:,5:].values
return (X_train, y_train), (X_test, y_test)
train_set_1, test_set_1 = process_data(train_set_path, test_set_path)
type(train_set_1[0])
<class 'numpy.ndarray'>
3. Defining the training function
The programming model is that you wrap the code containing the model training inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment.
The function should return the metric that you want to optimize for. This should coincide with the metric being reported in the Keras callback (see next point). You can return the metric list, in this case only the loss element will be printed.
def training_function(model, train_set, test_set, hparams):
model = model()
model.build()
#fitting the model and predicting
model.compile(Adam(lr=0.04),'categorical_crossentropy',metrics=['accuracy'])
model.fit(train_set,epochs=20)
accuracy = model.evaluate(test_set)
return accuracy
4. Configuring the experiment
In order to use maggy distributed training, we have to configure the training model, we can pass it to TfDistributedConfig. the model class has to be an implementation of tf.keras.Model. We can also define train_set, test_set and eventually the model_parameters. model_parameters is a dictionary containing the parameters to be used in the __init__ function of your model.
from maggy.experiment_config.tf_distributed import TfDistributedConfig
#define the constructor parameters of your model
model_params = {
#train dataset entries / num_workers
'train_batch_size': 75,
#test dataset entries / num_workers
'test_batch_size': 15,
'nlayers':2
}
config = TfDistributedConfig(name="tf_test", model=model, train_set=train_set_path, test_set=test_set_path, process_data = process_data, hparams = model_params)
5. Run distributed training
Finally, we are ready to launch the maggy experiment. You just need to pass 2 parameters: the training function and the configuration variable we defined in the previous steps.
from maggy import experiment
experiment.lagom(training_function, config)
HBox(children=(FloatProgress(value=0.0, description='Maggy experiment', max=1.0, style=ProgressStyle(descripti…
1: All executors registered: True
0: All executors registered: True
1: Epoch 1/20
1:
1:
0: Epoch 1/20
0:
0:
1: Epoch 2/20
1: Epoch 3/20
1: Epoch 4/20
1: Epoch 5/20
1: Epoch 6/20
1: Epoch 7/20
1: Epoch 8/20
1: Epoch 9/20
1: Epoch 10/20
1: Epoch 11/20
1: Epoch 12/20
1: Epoch 13/20
0: Epoch 2/20
0: Epoch 3/20
0: Epoch 4/20
0: Epoch 5/20
1: Epoch 14/20
1: Epoch 15/20
1: Epoch 16/20
1: Epoch 17/20
1: Epoch 18/20
1: Epoch 19/20
1: Epoch 20/20
1:
1:
0: Epoch 6/20
0: Epoch 7/20
0: Epoch 8/20
0: Epoch 9/20
0: Epoch 10/20
0: Epoch 11/20
0: Epoch 12/20
0: Epoch 13/20
0: Epoch 14/20
0: Epoch 15/20
0: Epoch 16/20
0: Epoch 17/20
0: Epoch 18/20
0: Epoch 19/20
0: Epoch 20/20
You are running Maggy on Hopsworks.
Final average test loss: 0.176
Finished experiment. Total run time: 0 hours, 0 minutes, 40 seconds
{'test result': 0.1757449358701706}