Examples

The below examples walk through the scripts in the examples examples directory of the main project on Github. All of these examples can be run as Jupyter notebooks on Binder

Making a Model

In this example, we will use least squares regression to fit a polynomial of order 5 to data generated by a random polynomial. In the process, we will see how to pre-process data, interface with databases, do model selection, save checkpoints, and visualize the data using Fireworks. All of these techniques would be directly applicable to training arbitrary models, such as neural networks, and they take advantage of PyTorch’s ability to train a model using GPUs.

Let’s begin with the code in examples/nonlinear_regression_utils.py. We have the model definition:

class NonlinearModel(PyTorch_Model):

    required_components = ['a','b', 'c', 'd', 'e']

    def init_default_components(self):

        for letter in ['a', 'b', 'c', 'd', 'e']:
            self.components[letter] = torch.nn.Parameter(torch.Tensor(np.random.normal(0,1,1)))

        self.in_column = 'x'
        self.out_column = 'y_pred'

    def forward(self, message):

        x = message[self.in_column]
        message[self.out_column] = (self.a + self.b*x + self.c*x**2 + self.d*x**3 + self.e*x**4)

        return message

The class PyTorch_Model is a subclass of torch.nn.Module and has all of the functionality of a PyTorch module. It has some additional functionality, such as the ability to produce a dictionary representation of its state, the ability to be part of a pipeline, and the ability to specify parameters that must be provided upon initialization or via init_default_components using the required_components list.

Every Model must implement a method called forward() which performs an evaluation on input data. Notice that in this example, the model is evaluated by directly calling it on the argument sample_input. This is the recommended way to invoke a model, because the __call__ method is overridden to first call the Model’s input (if it exists).

Additionally, there are some functions to generate the training data from a random polynomial. You can change the parameterization settings here to play with the model training in the next example. Notice that we inject some noise in the data to make the problem more difficult:

def generate_data(n=1000):

    a = randint(-10,10)
    b = randint(-10,10)
    c = randint(-10,10)
    errors = np.random.normal(0, .5, n)
    x = np.random.rand(n) * 100 - 50
    y = a + b*x + c*x**2 + errors

    return Message({'x': x, 'y': y, 'errors': errors}), {'a': a, 'b': b, 'c': c}

Next, we define a function to return the data after performing a train/test split:

def get_data(n=1000):

    data, params = generate_data(n)
    train, test = train_test_split(data, test=.25)

    shuffler = ShufflerPipe(train)
    minibatcher = BatchingPipe(shuffler, batch_size=25)
    train_set = TensorPipe(minibatcher, columns=['x','y']) # Only columns 'x' and 'y' will be tensorized

    test_set = TensorPipe(test, columns=['x','y'])

    return train_set, test_set, params

This function randomly breaks apart the original data into two Pipes that produce tensorized minibatches that are randomly sampled from each set. If you have a Cuda-enabled GPU available, these batches will also be moved to GPU memory.

Nonlinear Regression

Next, let’s look at the code in examples/nonlinear_regression.py. You can run this script and see the model training, along with the final output which plots a visualization of the training trajectory. You will need to have examples/nonlinear_regression_utils.py be importable for this to work as well.

Let’s walk through the code in detail:

description = "In this experiment, we are training a polynomial model using least squares regression to fit data generated by a random polynomial."
experiment = Experiment("nonlinear_regression", description=description)

train_set, test_set, params = get_data(n=1000)

model = NonlinearModel()

# Construct training closure and train using ignite
base_loss = torch.nn.MSELoss()
loss = lambda batch: base_loss(batch['y_pred'], batch['y'])
trainer = IgniteJunction(components={'model': model, 'dataset': train_set}, loss=loss, optimizer='Adam', lr=.1)

The Experiment class facilitates.

class ModelSaverMetric(Metric):

    def __init__(self, output_transform=lambda x:x, log_interval=100):
        self.model_state = Message()
        Metric.__init__(self, output_transform=output_transform)
        self.log_interval = log_interval

    def iteration_completed(self, engine):
        iter = (engine.state.iteration-1)
        if iter % self.log_interval == 0:
            current_state = Message.from_objects(deepcopy(engine.state.output['state']))
            current_state['iteration'] = [iter]
            self.model_state = self.model_state.append(current_state)

    def compute(self):
        # Return most recent model state
        l = len(self.model_state)
        return self.model_state[l-1]

This metric saves a dict representing the state of our model after every 100 training steps. The IgniteJunction returns the current model state by default in its ouput dict. See Ignite documentation for more details about Metrics and Engines.

model_state_metric = ModelSaverMetric()
model_state_metric.attach(trainer, 'state')

x = Message({'x':np.arange(-10,10,.2)}).to_tensors()

# Run initial evaluation
y_initial = model(x)['y_pred'].detach().numpy()
initial_loss = loss(model(test_set[0:250]))
print("Initial loss on test set: {0}".format(initial_loss))

# Save initial state of model
file_path = experiment.open('initial_model', string_only=True)
initial_state = model.get_state()
Message.from_objects(initial_state).to('json', path=file_path)

trainer.train(max_epochs=30)

final_loss = loss(model(test_set[0:250]))
print("Final loss on test set:: {0}".format(final_loss))

Here, we initialize our Metric and attach it to the IgniteJunction trainer. We also perform an initial evaluation run of the model on the test set, save a snapshot of the initialized model, train the model, and print a final test set evaluation.

# Visualize functions
true_model = NonlinearModel(components={'a':[params['a']], 'b': [params['b']], 'c': [params['c']], 'd': [0], 'e': [0]})

y_true = true_model(x)['y_pred'].detach().numpy()
y_final = model(x)['y_pred'].detach().numpy()

# Save model states during training
file_path = experiment.open('model_states', string_only=True)
model_states = model_state_metric.model_state
Message.from_objects(initial_state).to('json', path=file_path)

fig, ax = plt.subplots()

Now we visualize the results by plotting the learned polynomial over time as the model was trained. We can do this easily because we have the intermediate states in Message format. We can simply loop through this Message and call set_state on a new NonlinearModel instance to get a snapshot of the model during a given iteration. The animate() function below uses these snapshots to draw a graph of those models against a graph of the true data.

def animate(frame):

    current_state = {'internal': frame['internal'][0], 'external': {}}
    model.set_state(current_state)

    y_predicted = model(x)['y_pred'].detach().numpy()
    xdata = list(x['x'].detach().numpy())
    ydata = list(y_predicted)
    ax.clear()
    ax.plot(xdata, list(y_true), 'r')
    ax.plot(xdata, ydata, 'g')
    title = "Iteration: {0}".format(frame['iteration'][0])

    ax.set_title(title)

ani = FuncAnimation(fig, animate, model_state_metric.model_state, interval=1000)
ani.save(experiment.open("models.mp4", string_only=True)) # This will only work if you have ffmpeg installed.
plt.show()

The results weren’t that great, and this is partially because we are using a 4th order polynomial to fit data from a 2nd order polynomial. What happens if we restrict our model? We can do this by freezing arbitrary parameters. Let’s uncomment out these two lines from the script and rerun the script. This initializes the model with ‘d’ and ‘e’ set to 0 and then freezes them so they will not update during training. The resulting graph should show a very close fit.

model = NonlinearModel(components={'d': [0], 'e':[0]})
model.freeze(['d','e'])

Model Selection

We were able to get a good fit because we already knew what the true model was. What if we wanted to algorithmically determine what the ‘best’ model is? This is where we can use model selection and hyper-parameter optimization to test out different variations of our training process and models in order to select an optimal one.

The make_model function takes a dictionary of parameters and produces a Model from those parameters:

def make_model(parameters):
    temp_parameters = deepcopy(parameters)
    include = [letter for letter in ['a','b','c','d','e'] if letter in parameters]
    exclude = [letter for letter in ['a','b','c','d','e'] if letter not in parameters]
    for letter in exclude:
        temp_parameters[letter] =  [0]
    model = NonlinearModel(temp_parameters)
    for letter in exclude: # Prevent training from taking place for these parameters
        model.freeze(letter)
    return model

The next function takes a set of parameters and trains and returns a model initialized off those parameters.

def get_trainer(train_set, loss, optimizer, **kwargs):

    def train_from_params(parameters):

        model = make_model(parameters)
        trainer = IgniteJunction(components={'model': model, 'dataset': train_set}, loss=loss, optimizer=optimizer, **kwargs)
        print("Now training model for parameters {0}".format(parameters))
        trainer.train(max_epochs=10)
        evaluator = IgniteJunction(components={'model': model, 'dataset': train_set}, loss=loss, optimizer=optimizer, update_function=default_evaluation_closure, **kwargs)
        print("Now evaluating trained model.")
        return trainer

    return train_from_params

The next object is a function that takes two arguments: a Message containing all of the previously used parameters, along with a dictionary of Messages containing all of the previously computed metrics. In general, such a function can use these arguments to implement any desired model selection or hyperparameter selection algorithm. To keep things simple, we simply make this parameterizer iterate through every possible class of fourth order polynomial.

class Parameterizer:

    def __init__(self):
        possible_params = ['a','b','c','d','e']
        def generator():
            for i in reversed(range(5)):
                for combination in combinations(possible_params,i):
                    params = {param: [0] for param in combination}
                    yield params
        self.generator = generator()

    def __call__(self,past_params, metrics):
        try:
            params = self.generator.__next__()
            if params == {}:
                raise EndHyperparameterOptimization
            return params

        except StopIteration:
            raise EndHyperparameterOptimization

Lastly, we can provide as many Metrics as we want to be computed during this process. Here, we define an accuracy metric which simply evaluates the average L2-loss on the test set:

class AccuracyMetric(Metric):

    def __init__(self, output_transform = lambda x:x):
        Metric.__init__(self, output_transform=output_transform)
        self.reset()

    def reset(self):
        self.l2 = 0.
        self.num_examples = 0

    def update(self, output):
        self.l2 += output['loss']
        self.num_examples += len(output['output'])

    def compute(self):

        if self.num_examples == 0:
            raise NotComputableError(
                "Metric must have at least one example before it can be computed."
            )
        return Message({'average-loss': [self.l2 / self.num_examples]}).to_dataframe()

We put all of these components together via a LocalMemoryFactory and run the model selection process.

description = "In this experiment, we will compare the performance of different polynomial models when regressed against data generated from a random polynomial."
experiment = Experiment("model_selection", description=description)

factory = LocalMemoryFactory(components={
    'trainer': get_trainer(train_set, loss, optimizer='Adam', lr=.1),
    'eval_set': test_set,
    'parameterizer': Parameterizer(),
    'metrics': {'accuracy': AccuracyMetric(), 'model_state': ModelSaverMetric()}
    })

factory.run()

Now, we can read the metrics and models that were saved at each step and plot the results. If you look at the final metrics, you should observe that the model of the form ‘a + bx + cx^2’ had the lowest test set error. The next code block also saves the parameters and metrics into the experiment directory.

params, metrics = factory.read()
accuracy_file = experiment.open('accuracy.csv', string_only=True)
metrics['accuracy'].to('csv', path=accuracy_file)
model_state_file = experiment.open('model_states.csv', string_only=True)
metrics['model_state'].to('csv', path=model_state_file)
params_file = experiment.open('params.csv', string_only=True)
params.to('csv', path=params_file)

The resulting animation should loop through and plot each of the models that were trained against the true model. Here is an example of that output:

Using Databases

If your dataset is really big, then it might make sense to store it in a database. We can stream in data from a database query at the start of our pipeline. First, we dump our training data into a sqlite table (this could be any database supported by sqlalchemy). We define a list of columns for our table and use the Fireworks.extensions.database.create_table() function to create a simple SQLalchemy table object. Let’s look at the code in examples/database_example.py

columns = [
    Column('x', Float),
    Column('y', Float),
    Column('errors', Float),
]

table = create_table("nonlinear_regression", columns)

We then create a TablePipe, which has methods for insertion and queries to our table. We inert our data (in Message format) to this DBPipe. We also write the true model parameters to a file for posterity.

def write_data(filename='example.sqlite', n=1000):
    try:
        os.remove(filename)
    except FileNotFoundError:
        pass

    engine = create_engine('sqlite:///{0}'.format(filename))
    db = TablePipe(table, engine)

    data, params = generate_data(n)
    db.insert(data)

    with open(filename+"_params", 'w') as f:
        f.write(json.dumps(params))

    db.commit()

Next, we write a function that produces a DBPipe which can iterate through this table. Notice how you only have to provide the name of the table in order to query it. DBPipe uses schema reflection to infer the appropriate schema by searching for a table in the database with the same name as the provided string. We can also perform arbitrary SQL queries on this object if we want, but the default is a “SELECT * FROM table” query.

def load_data(filename='example.sqlite'):
    if not os.path.exists(filename):
        raise FileNotFoundError("File {0} does not exist.".format(filename))
    with open(filename+'_params') as f:
        params = json.load(f)

    engine = create_engine('sqlite:///{0}'.format(filename))
    db = DBPipe(table, engine) # Default query is SELECT * FROM table

    return db, params

We combine all of this into a new get_data() function. In examples/nonlinear_regression.py, modify the import statements to import this get_data function instead of the one in examples/nonlinear_regression_utils.py. Now, when you run examples/nonlinear_regression.py, you will be training your model using data from a db query.

def get_data(filename='example.sqlite', n=1000):
    if not os.path.exists(filename) and os.path.exists(filename+'_params'):
        write_data(filename, n)

    data, params = load_data(filename)
    looper = LoopingPipe(data)
    cache = CachingPipe(looper, cache_size=1000)
    train, test = train_test_split(cache, test=.25)

    shuffler = ShufflerPipe(train)
    minibatcher = BatchingPipe(shuffler, batch_size=25)
    train_set = TensorPipe(minibatcher, columns=['x','y'])

    test_set = TensorPipe(test, columns=['x','y'])

    return train_set, test_set, params

Model Selection With Databases

We can also save our metrics from the model selection example to a database. To do so, we will use a SQLFactory instead of a LocalMemoryFactory. The only difference between these two is that the former takes additional arguments for table objects that describe the schema to use for storing parameters and metrics, along with an engine to connect to the database. Run the code in examples/model_selection_database.py to get something similar to the original model_selection example, except there will be sqlite files in the experiment folder storing historical parameters and metrics.

description = "Model selection for nonlinear regression. We are comparing the regression accuracy of different polynomial models fit to data generated by a random polynomial."
experiment = Experiment("model_selection_db", db_path=".", description=description)
# SQL factory
params_table = create_table('params', columns=[
    Column('a', Integer), Column('b', Integer), Column('c', Integer), Column('d', Integer), Column('e', Integer)
    ])
metrics_tables = {'accuracy': create_table('accuracy', columns=[Column('average-loss', Float)])}
# engine = create_engine('sqlite:///model_selection.sqlite')
engine = experiment.get_engine('factory.sqlite')
factory = SQLFactory(components={
    'trainer': get_trainer(train_set, loss, optimizer='Adam', lr=.1),
    'eval_set': test_set,
    'parameterizer': Parameterizer(),
    'metrics': {'accuracy': AccuracyMetric()},
    'engine': engine,
    'params_table': params_table,
    'metrics_tables': metrics_tables,
    })

factory.run()

params_table = DBPipe('params', factory.engine)
print(params_table.all())
accuracy_table = DBPipe('accuracy', factory.engine)
print(accuracy_table.all())

Neural Networks with MNIST

The examples above used the simple example of nonlinear regression with synthetic data in order to demonstrate features, but we can use these tools to make more complicated models as well. Let’s train a convolutional neural network to classify Fashion MNIST images. First, we will download the dataset from the torchvision library, set up an experiment, and convert it to Message format for input into our pipeline.

env_name = 'mnist_fashion'
# vis = visdom.Visdom(env=env_name) # If you have a running Visdom server, you can uncomment this to generate plots.
description = "Here, we will train a convolutional neural network on the Fashion MNIST dataset to demonstrate the usage of Fireworks."
experiment = Experiment(env_name, description=description)
mnist_dir = env.get('MNIST_DIR', './MNIST/')
print(mnist_dir)

# First, we download our dataset and plot one of its elements as an example.
mnist = FashionMNIST(mnist_dir, download=True)
dataset = Message({'examples': mnist.data, 'labels': mnist.targets})
example = dataset['examples'][0]
plt.imshow(example)
plt.show()

Next, we will perform a series of preprocessing steps before feeding the data to our model. Using the Pipes built into Fireworks already, we can split our dataset into train and test sets, shuffle it during each epoch, specify a minibatch sizing, and move the batches to GPU memory dynamically. However, there are some additional operations we need to perform which are specific to this dataset, and we will implement them as FunctionPipes:

# Now we construct our training and test sets as a pipeline.
train, test = train_test_split(dataset, test=.1)

# We can compose pipes to create an input pipeline that will shuffle the training set on each iteration and produce minibatches formatted for our image classifier.
shuffler = ShufflerPipe(train)
minibatcher = BatchingPipe(shuffler, batch_size=100)
to_cuda = TensorPipe(minibatcher, columns=['examples', 'labels']) # By default, all columns will be moved to Cuda if possible, but you can explicitly specify which ones as well

def tensor_to_float(batch, column='examples'):
    """ This converts the images from bytes to floats which is the data type that torch.nn.Conv2d expects. """
    batch[column] = batch[column].float()
    return batch

def reshape_batch(batch, column='examples'):
    """ This reshapes the batch to have an extra dimension corresponding to the input channels so we can apply the torch.nn.Conv2d operation in our model. """
    shape = batch[column].shape
    new_shape = torch.Size([shape[0], 1, shape[1], shape[2]])
    batch[column] = batch[column].reshape(new_shape)
    return batch

def normalize_batch(batch, column='examples'):
    """ Normalizes pixel intensities to fall between 0 and 1. """
    batch[column] /= 255.
    return batch

to_float = FunctionPipe(input=to_cuda, function=tensor_to_float)
normalized = FunctionPipe(input=to_float, function=normalize_batch)
training_set = FunctionPipe(input=normalized, function=reshape_batch)

The above code sets up a pipeline which will also normalize the pixel intensities in the images (this improves learning efficiency) and also perform some necessary type conversions. Now, if you iterate through the resulting training_set object, it will generate minibatches following the steps in this pipeline. We can also construct a pipleine in a less verbose manner, as shown here for the test set:

test_set = \
    FunctionPipe(
        input=FunctionPipe(
            input=FunctionPipe(
                input=TensorPipe(
                    input=BatchingPipe(
                        input=test,
                        batch_size=100
                        ),
                    columns=['examples', 'labels']
                ),
                function=to_float
            ),
            function=normalize_batch
        ),
        function=reshape_batch
    )

Now we can construct our model. We will make two models: a convolutional network which maps input images into a low-dimensional space, and a classifier network which predicts labels given those low-dimensional embeddings. The code here is similar to how you would implement this using torch.nn.modules, and PyTorch_Models are subclasses of torch.nn.modules, thus inheriting all of their methods and attributes. However, these models can also be chained together as Pipes:

class mnistModel(PyTorch_Model):
    """ Embeds each image into a 10-dimensional vector. """
    required_components = ['in_column', 'out_column', 'conv1', 'pool1', 'conv2', 'pool2']

    def init_default_components(self):

        self.components['in_column'] = 'examples'
        self.components['out_column'] = 'embeddings'
        self.components['conv1'] = torch.nn.Conv2d(1, 64, 2, padding=1)
        self.components['pool1'] = torch.nn.MaxPool2d(2)
        self.components['conv2'] = torch.nn.Conv2d(64, 32, 2)
        self.components['pool2'] = torch.nn.MaxPool2d(2)
        self.components['nonlinearity'] = torch.nn.ELU()

    def forward(self, batch):

        embedding = batch[self.in_column]
        embedding = self.nonlinearity(self.conv1(embedding))
        embedding = self.pool1(embedding)
        embedding = self.nonlinearity(self.conv2(embedding))
        embedding = self.pool2(embedding)
        embedding = embedding.reshape(len(batch), 1152)
        batch[self.out_column] = embedding
        return batch

class Classifier(PyTorch_Model):
    """ Uses the input embedding to perform a classification. """
    required_components = ['in_column', 'out_column', 'linear_layer']

    def init_default_components(self):
        self.components['in_column'] = 'embeddings'
        self.components['out_column'] = 'predictions'
        self.components['linear1'] = torch.nn.Linear(1152, 256)
        self.components['linear2'] = torch.nn.Linear(256, 10)
        self.components['nonlinearity'] = torch.nn.ELU()
        self.components['softmax'] = torch.nn.Softmax(dim=1)

    def forward(self, batch):

        predictions = batch[self.in_column]
        predictions = self.nonlinearity(self.linear1(predictions))
        predictions = self.softmax(self.linear2(predictions))
        batch[self.out_column] = predictions
        return batch

# All function calls to the classifier will call the embedder first
# ie. classifier(x) is equivalent to classifier.forward(embedder.forward(x))
embedder = mnistModel()
classifier = Classifier(input=embedder)

if torch.cuda.is_available():
    embedder.cuda()
    classifier.cuda()

Now we set up a loss function and train our model using the IgniteJunction which is a wrapper around the Engine class in the Ignite library:

ce_loss = torch.nn.CrossEntropyLoss()
loss = lambda batch: ce_loss(batch['predictions'], batch['labels'])

# By default, this Junction applies a standard training closure of evaluating the model,
# computing gradients of the loss, and backpropagating using the chosen optimizer.
trainer = IgniteJunction(
    components={
        'model': classifier,
        'dataset': training_set
        },
    loss=loss, optimizer='Adam',
    lr=.0001, weight_decay=.001,
    visdom=False, # If you have a running Visdom server, you can set this to true to get plots
    environment=env_name
    )

trainer.run(max_epochs=10) # This will take almost 15 minutes on CPU and less than 30 seconds on GPU

Once that is done, we can evaluate the trained model against the test set and compute metrics. There are a family of classification metrics based on the number of true positives, false positives, true negatives, and false negatives the model makes for different labels. The class below implements a Pipe that will compute those metrics.

classes = {i: class_name for i, class_name in zip(count(), mnist.classes)}
class Metrics(HookedPassThroughPipe):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.true_positives = {class_name: 0 for class_name in classes.values()}
        self.true_negatives = {class_name: 0 for class_name in classes.values()}
        self.false_positives = {class_name: 0 for class_name in classes.values()}
        self.false_negatives = {class_name: 0 for class_name in classes.values()}
        self.total_count = 0
        self.label_counts = {class_name: 0 for class_name in classes.values()}
        self.prediction_counts = {class_name: 0 for class_name in classes.values()}

    def _call_hook(self, batch):
        """
        This will get called every time the model is called. As a result, this pipe will continuously update
        itself as we iterate through the test set.
        """
        labels = batch['labels']
        predictions = torch.max(batch['predictions'],1)[1]
        correct_indices = (predictions == labels).nonzero().flatten().tolist()
        incorrect_indices = (predictions != labels).nonzero().flatten().tolist()
        for index, name in classes.items():
            self.label_counts[name] += int(sum(labels == index)) # How often the class showed up
            self.prediction_counts[name] += int(sum(predictions == index)) # How often the class was predicted
            self.true_positives[name] += int(sum(predictions[correct_indices] == index)) # How often the correct prediction was for thsi class
            self.true_negatives[name] += int(sum(predictions[correct_indices] != index)) # How often the correct prediction was not for the class; ie. how often the prediction was a true negative for this class
            self.false_positives[name] += int(sum(predictions[incorrect_indices] == index)) # How often a wrong prediction was for this class
            self.false_negatives[name] += int(sum(predictions[incorrect_indices] != index)) # How often a wrong prediction was for another class; ie. how often the prediction was a false negative for this class
        self.total_count += len(batch)
        return batch

    def compile_metrics(self):
        """
        After we have gone through the entire test set, we can call this method to compute the actual metrics.
        """
        class_names = classes.values()
        negative_counts = {name: sum(self.label_counts[other] for other in class_names if other != name) for name in class_names}
        self.sensitivity = {name: self.true_positives[name] / self.label_counts[name] for name in class_names}
        self.specificity = {name: self.true_negatives[name] / negative_counts[name] for name in class_names}
        negative_prediction_counts = {name: sum(self.prediction_counts[other] for other in class_names if other != name) for name in class_names}
        self.ppv = {name: self.true_positives[name] / self.prediction_counts[name] for name in class_names}
        self.npv = {name: self.true_negatives[name] / negative_prediction_counts[name] for name in class_names}
        self.f1 = {name: 2 / (1/self.ppv[name] + 1/self.sensitivity[name]) for name in class_names}
        self.accuracy = {name: (self.true_positives[name] + self.true_negatives[name]) / self.total_count for name in class_names}

    def get_metrics(self):
        """
        Lastly, we will use this method to return the computed metrics as a Pandas DataFrame.
        """
        columns = ['sensitivity', 'specificity', 'ppv', 'npv', 'f1', 'accuracy']
        df = pd.DataFrame(columns=columns, index=classes.values())
        for attribute in columns:
            value = getattr(self, attribute)
            df[attribute] = [value[key] for key in df.index]

        return df

# This class is implemented a a HookedPassThroughPipe, meaning that it's _call_hook method will be applied every time
# The class is called like a function, and this call will pass through to its input.
metrics_computer = Metrics(input=classifier)

We have now have two pipelines: the test set, and the model pipeline which consists of the embedder, classifier, and metrics pipe. Every time this model pipeline is evaluated on a batch from the test set, the embedder will pipe its output to the classifier which will send its prediction to the metrics pipe which will update its internal state:

for batch in test_set:
    # We can simply call this object repeatedly on batches in the test set
    # This operation is equivalent to metrics_computer._call_hook(classifier(batch))
    metrics_computer(batch)

metrics_computer.compile_metrics()
df = metrics_computer.get_metrics()
print(df)
m = Message(df)
print(m)

Lastly, we can save our results in the Experiment object.

df.to_csv(experiment.open("metrics.csv", string_only=True))
# Since our models are still subclasses of torch.nn.module, we can save them using the standard torch.save feature
# but if we want, we can also save their parameters in other formats such as JSON
state = embedder.get_state()
Message.from_objects(state).to('json', path=experiment.open("embedder.json",string_only=True))
state = classifier.get_state()
Message.from_objects(state).to('json', path=experiment.open("classifier.json",string_only=True))