Maggy Distributed Training with Tensorflow example

Maggy enables you to train with Tensorflow distributed optimizers. Using Maggy, you have to make minimal changes in train your model in a distributed fashion.

0. Spark Session

Make sure you have a running Spark Session/Context available.

On Hopsworks, just run your notebook to start the spark application.

1. Model definition

Let’s define the model we want to train. The layers of the model have to be defined in the __init__ function.

Do not instantiate the class, otherwise you won’t be able to use Maggy.

import pyspark
sp = pyspark.sql.SparkSession.builder \
    .master("local") \
    .appName("f-mnist-maggy") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.cores", 4) \
    .config(
    "spark.dynamicAllocation.minExecutors","1") \
    .config("spark.dynamicAllocation.maxExecutors","5") \
    .getOrCreate()

sp
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-1-8a0dd646cc59> in <module>
----> 1 import pyspark
      2 sp = pyspark.sql.SparkSession.builder \
      3     .master("local") \
      4     .appName("f-mnist-maggy") \
      5     .config("spark.dynamicAllocation.enabled", "true") \


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/__init__.py in <module>
     49 
     50 from pyspark.conf import SparkConf
---> 51 from pyspark.context import SparkContext
     52 from pyspark.rdd import RDD, RDDBarrier
     53 from pyspark.files import SparkFiles


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/context.py in <module>
     29 from py4j.protocol import Py4JError
     30 
---> 31 from pyspark import accumulators
     32 from pyspark.accumulators import Accumulator
     33 from pyspark.broadcast import Broadcast, BroadcastPickleRegistry


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/accumulators.py in <module>
     95     import socketserver as SocketServer
     96 import threading
---> 97 from pyspark.serializers import read_int, PickleSerializer
     98 
     99 


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/serializers.py in <module>
     70     xrange = range
     71 
---> 72 from pyspark import cloudpickle
     73 from pyspark.util import _exception_message
     74 


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/cloudpickle.py in <module>
    143 
    144 
--> 145 _cell_set_template_code = _make_cell_set_template_code()
    146 
    147 


~/miniforge3/envs/tf/lib/python3.8/site-packages/pyspark/cloudpickle.py in _make_cell_set_template_code()
    124         )
    125     else:
--> 126         return types.CodeType(
    127             co.co_argcount,
    128             co.co_kwonlyargcount,


TypeError: an integer is required (got type bytes)
from tensorflow import keras 
from tensorflow.keras.layers import Dense
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam

# you can use keras.Sequential(), you just need to override it 
# on a custom class and define the layers in __init__()
class NeuralNetwork(Sequential):
        
    def __init__(self, nl=4):
        
        super().__init__()
        self.add(Dense(10,input_shape=(None,4),activation='tanh'))
        if nl >= 4:
          for i in range(0, nl-2):
            self.add(Dense(8,activation='tanh'))
        self.add(Dense(3,activation='softmax'))

model = NeuralNetwork

2. Dataset creation

You can create the dataset here and pass it to the TfDistributedConfig, or creating it in the training function.

You need to change the dataset path is correct.

train_set_path = "/Users/riccardo/Downloads/iris_train.csv"
test_set_path = "/Users/riccardo/Downloads/iris_test.csv"
train_set = sp.read.format("csv").option("header","true")\
  .option("inferSchema", "true").load(train_set_path).drop('_c0')

test_set = sp.read.format("csv").option("header","true")\
  .option("inferSchema", "true").load(test_set_path).drop('_c0')

raw_train_set = train_set.toPandas().values
raw_test_set = test_set.toPandas().values
def process_data(train_set, test_set):
    
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import LabelEncoder
    from sklearn.model_selection import train_test_split
    
    X_train = train_set[:,0:4]
    y_train = train_set[:,4:]
    X_test = test_set[:,0:4]
    y_test = test_set[:,4:]

    return (X_train, y_train), (X_test, y_test)
  
train_set, test_set = process_data(raw_train_set, raw_test_set)

3. Defining the training function

The programming model is that you wrap the code containing the model training inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment.

The function should return the metric that you want to optimize for. This should coincide with the metric being reported in the Keras callback (see next point). You can return the metric list, in this case only the loss element will be printed.

def hpo_function(number_layers, reporter):
  
  model = NeuralNetwork(nl=number_layers)
  model.build()
  
  #fitting the model and predicting
  model.compile(Adam(lr=0.04),'categorical_crossentropy',metrics=['accuracy'])
  train_input, test_input = process_data(raw_train_set, raw_test_set)

  train_batch_size = 75
  test_batch_size = 15
  epochs = 10
  
  model.fit(x=train_input[0], y=train_input[1],
            batch_size=train_batch_size,
            epochs=epochs,
            verbose=1)

  score = model.evaluate(x=test_input[0], y=test_input[1], batch_size=test_batch_size, verbose=1)
                         
  print(f'Test loss: {score[0]}')
  print(f'Test accuracy: {score[1]}')

  return score[1]
def training_function(model, train_set, test_set, hparams):
    
    model = model()
    model.build()
    #fitting the model and predicting

    model.compile(Adam(lr=hparams['learning_rate']),'categorical_crossentropy',metrics=['accuracy'])
    
    #raise ValueError(list(train_set.as_numpy_iterator()))

    model.fit(train_set,epochs=hparams['epochs'])

    accuracy = model.evaluate(test_set)

    return accuracy

4. Configuring the experiment

In order to use maggy distributed training, we have to configure the training model, we can pass it to TfDistributedConfig. the model class has to be an implementation of tf.keras.Model. We can also define train_set, test_set and eventually the model_parameters. model_parameters is a dictionary containing the parameters to be used in the __init__ function of your model.

from maggy.experiment_config.tf_distributed import TfDistributedConfig

#define the constructor parameters of your model
model_params = {
    #train dataset entries / num_workers
    'train_batch_size': 75,
    #test dataset entries / num_workers
    'test_batch_size': 15,
    'learning_rate': 0.04,
    'epochs': 20,
}

training_config = TfDistributedConfig(name="tf_test", model=model, train_set=train_set, test_set=test_set, process_data=process_data, hparams = model_params)
from maggy.experiment_config import OptimizationConfig
from maggy import Searchspace

# The searchspace can be instantiated with parameters
sp = Searchspace(number_layers=('INTEGER', [2, 8]))

hpo_config = OptimizationConfig(num_trials=4, optimizer="randomsearch", searchspace=sp, direction="max", es_interval=1, es_min=5, name="hp_tuning_test")
Hyperparameter added: number_layers

5. Run distributed training

Finally, we are ready to launch the maggy experiment. You just need to pass 2 parameters: the training function and the configuration variable we defined in the previous steps.

from maggy import experiment

result = experiment.lagom(train_fn=hpo_function, config=hpo_config)
result
Your are running Maggy on a base configuration.



---------------------------------------------------------------------------

AttributeError                            Traceback (most recent call last)

<ipython-input-10-a8f37851dca2> in <module>
      1 from maggy import experiment
      2 
----> 3 result = experiment.lagom(train_fn=hpo_function, config=hpo_config)
      4 result


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/experiment.py in lagom(train_fn, config)
     81 
     82         driver = lagom_driver(config, APP_ID, RUN_ID, LOCAL)
---> 83         return driver.run_experiment(train_fn)
     84     except:  # noqa: E722
     85         _exception_handler(util.seconds_to_milliseconds(time.time() - job_start))


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/driver.py in run_experiment(self, train_fn)
    140             return result
    141         except Exception as exc:  # pylint: disable=broad-except
--> 142             self._exp_exception_callback(exc)
    143         finally:
    144             # Grace period to send last logs to sparkmagic.


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/optimization_driver.py in _exp_exception_callback(self, exc)
    156         if self.exception:
    157             raise self.exception  # pylint: disable=raising-bad-type
--> 158         raise exc
    159 
    160     def _patching_fn(self, train_fn: Callable) -> Callable:


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/driver.py in run_experiment(self, train_fn)
    131             if self.local:
    132                 # Trigger execution locally
--> 133                 executor_fn(exp_json)
    134             else:
    135                 # Trigger execution on Spark nodes.


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/executors/trial_executor.py in _wrapper_fun(_)
     69 
     70         # get task context information to determine executor identifier
---> 71         partition_id, task_attempt = util.get_partition_attempt_id()
     72 
     73         client = rpc.Client(


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/util.py in get_partition_attempt_id()
     66     """
     67     task_context = TaskContext.get()
---> 68     return task_context.partitionId(), task_context.attemptNumber()
     69 
     70 


AttributeError: 'NoneType' object has no attribute 'partitionId'
experiment.lagom(training_function, training_config)
Writing reports at /Users/riccardo/Git Repos/maggy/examples/experiment_log/local-1620719981928/2
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Epoch 1/20
INFO:tensorflow:Error reported to Coordinator: Layer tensorflow_model_wrapper expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'cond_1/Identity:0' shape=(None, 4) dtype=float32>, <tf.Tensor 'cond_1/Identity_1:0' shape=(None, 3) dtype=float32>]
Traceback (most recent call last):
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/training/coordinator.py", line 297, in stop_on_exception
    yield
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_run.py", line 323, in run
    self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/impl/api.py", line 667, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/impl/api.py", line 396, in converted_call
    return _call_unconverted(f, args, kwargs, options)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/impl/api.py", line 478, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py", line 788, in run_step
    outputs = model.train_step(data)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py", line 754, in train_step
    y_pred = self(x, training=True)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer.py", line 993, in __call__
    input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
  File "/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/input_spec.py", line 204, in assert_input_compatibility
    raise ValueError('Layer ' + layer_name + ' expects ' +
ValueError: Layer tensorflow_model_wrapper expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'cond_1/Identity:0' shape=(None, 4) dtype=float32>, <tf.Tensor 'cond_1/Identity_1:0' shape=(None, 3) dtype=float32>]



---------------------------------------------------------------------------

ValueError                                Traceback (most recent call last)

<ipython-input-11-c6a199bfc919> in <module>
----> 1 experiment.lagom(training_function, training_config)


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/experiment.py in lagom(train_fn, config)
     81 
     82         driver = lagom_driver(config, APP_ID, RUN_ID, LOCAL)
---> 83         return driver.run_experiment(train_fn)
     84     except:  # noqa: E722
     85         _exception_handler(util.seconds_to_milliseconds(time.time() - job_start))


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/driver.py in run_experiment(self, train_fn)
    140             return result
    141         except Exception as exc:  # pylint: disable=broad-except
--> 142             self._exp_exception_callback(exc)
    143         finally:
    144             # Grace period to send last logs to sparkmagic.


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/tf_distributed_training_driver.py in _exp_exception_callback(self, exc)
     95                  automatically on the workers for you."""
     96             ) from exc
---> 97         raise exc
     98 
     99     def _patching_fn(self, train_fn: Callable) -> Callable:


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/experiment_driver/driver.py in run_experiment(self, train_fn)
    131             if self.local:
    132                 # Trigger execution locally
--> 133                 executor_fn(exp_json)
    134             else:
    135                 # Trigger execution on Spark nodes.


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/maggy/core/executors/local_tf_dist_executor.py in wrapper_function(_)
     82 
     83                 reporter.write("Starting training. \n")
---> 84                 retval = train_fn(
     85                     model=model,
     86                     train_set=train_set,


<ipython-input-7-9e2d8d128d3d> in training_function(model, train_set, test_set, hparams)
      9     #raise ValueError(list(train_set.as_numpy_iterator()))
     10 
---> 11     model.fit(train_set,epochs=hparams['epochs'])
     12 
     13     accuracy = model.evaluate(test_set)


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_batch_size, validation_freq, max_queue_size, workers, use_multiprocessing)
   1098                 _r=1):
   1099               callbacks.on_train_batch_begin(step)
-> 1100               tmp_logs = self.train_function(iterator)
   1101               if data_handler.should_sync:
   1102                 context.async_wait()


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in __call__(self, *args, **kwds)
    826     tracing_count = self.experimental_get_tracing_count()
    827     with trace.Trace(self._name) as tm:
--> 828       result = self._call(*args, **kwds)
    829       compiler = "xla" if self._experimental_compile else "nonXla"
    830       new_tracing_count = self.experimental_get_tracing_count()


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _call(self, *args, **kwds)
    869       # This is the first call of __call__, so we have to initialize.
    870       initializers = []
--> 871       self._initialize(args, kwds, add_initializers_to=initializers)
    872     finally:
    873       # At this point we know that the initialization is complete (or less


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
    723     self._graph_deleter = FunctionDeleter(self._lifted_initializer_graph)
    724     self._concrete_stateful_fn = (
--> 725         self._stateful_fn._get_concrete_function_internal_garbage_collected(  # pylint: disable=protected-access
    726             *args, **kwds))
    727 


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
   2967       args, kwargs = None, None
   2968     with self._lock:
-> 2969       graph_function, _ = self._maybe_define_function(args, kwargs)
   2970     return graph_function
   2971 


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _maybe_define_function(self, args, kwargs)
   3359 
   3360           self._function_cache.missed.add(call_context_key)
-> 3361           graph_function = self._create_graph_function(args, kwargs)
   3362           self._function_cache.primary[cache_key] = graph_function
   3363 


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
   3194     arg_names = base_arg_names + missing_arg_names
   3195     graph_function = ConcreteFunction(
-> 3196         func_graph_module.func_graph_from_py_func(
   3197             self._name,
   3198             self._python_function,


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
    988         _, original_func = tf_decorator.unwrap(python_func)
    989 
--> 990       func_outputs = python_func(*func_args, **func_kwargs)
    991 
    992       # invariant: `func_outputs` contains only Tensors, CompositeTensors,


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in wrapped_fn(*args, **kwds)
    632             xla_context.Exit()
    633         else:
--> 634           out = weak_wrapped_fn().__wrapped__(*args, **kwds)
    635         return out
    636 


/Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in wrapper(*args, **kwargs)
    975           except Exception as e:  # pylint:disable=broad-except
    976             if hasattr(e, "ag_error_metadata"):
--> 977               raise e.ag_error_metadata.to_exception(e)
    978             else:
    979               raise


ValueError: in user code:

    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_strategy.py:628 _call_for_each_replica
        return mirrored_run.call_for_each_replica(
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_run.py:93 call_for_each_replica
        return _call_for_each_replica(strategy, fn, args, kwargs)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_run.py:234 _call_for_each_replica
        coord.join(threads)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/training/coordinator.py:389 join
        six.reraise(*self._exc_info_to_raise)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/six.py:703 reraise
        raise value
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/training/coordinator.py:297 stop_on_exception
        yield
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_run.py:323 run
        self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:754 train_step
        y_pred = self(x, training=True)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer.py:993 __call__
        input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
    /Users/riccardo/miniforge3/envs/tf/lib/python3.8/site-packages/tensorflow/python/keras/engine/input_spec.py:204 assert_input_compatibility
        raise ValueError('Layer ' + layer_name + ' expects ' +

    ValueError: Layer tensorflow_model_wrapper expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'cond_1/Identity:0' shape=(None, 4) dtype=float32>, <tf.Tensor 'cond_1/Identity_1:0' shape=(None, 3) dtype=float32>]