Hamilton: Scaling to Match your Data!

Stefan Krawczyk and Elijah ben Izzy
- San Francisco, CA

A few months ago we released Hamilton, Stitch Fix’s open-source microframework[1] for managing dataflows. With feedback from the community, we’ve implemented a variety of new features that make it more general purpose for Data Science/Data Engineering teams. Importantly, Hamilton now operates over any python object types, and can integrate with a variety of distributed compute platforms.

First, let’s take a step back and re-introduce ourselves to the concept of Hamilton…

Reintroducing Hamilton

Stitch Fix’s demand forecasting team manages an ever-expanding set of complex, highly-configurable forecasts. In the before-times, these pipelines took the form of an ancient codebase that continually manipulated the same dataframe to transform actuals data into predictions.

The systems that massaged features to produce the team’s forecasts were among the oldest at Stitch Fix. While they had been business-critical for years, the organic, sprawling nature of the code’s development meant that nobody fully understood the software in all its complexity.

Hamilton was born to enable this team to scale. At its core, it allows the manager of a pipeline, i.e. a dataflow in Hamilton parlance, to express every element as a simple python function. Instead of writing code that looks like this:

df['c'] = df['a'] + df['b']

they would write the following:

def c(a: pd.Series, b: pd.Series) -> pd.Series:
      """C - the sum of a & b."""
	return a + b

The Hamilton framework compiles these functions to model a dataflow as nodes in a Directed Acyclic Graph (DAG). It uses the names of the parameters as references to upstream nodes, and the name of the function itself to specify the node. In this case, a and b are implied to be other nodes in the DAG (possibly defined by further upstream functions…).

This simple abstraction, powered by a variety of extensions in the framework, made it remarkably easy to work on these vast, complex feature pipelines without incurring additional cognitive burden. Their newly migrated codebase turned into a self-documenting feature store, and the forecasting team was able to scale and focus more importantly on building better models for the business. Everyone was happy.

Scaling in Multiple Directions

When we talk about scale at Stitch Fix, we generally refer to one of the following:

  • scaling in complexity (additional model architectures, more parameterizable pipelines, additional business lines/regions), and…
  • scaling in data size & compute (larger data sets due to growing client base, bigger training data, more CPU cores to utilize)

While Hamilton was initially designed with the former in mind, we have now enabled it to solve the latter, without limiting the method of distributed computation! This is feasible with the notion of a Graph Adapter, an abstraction responsible for processing the Hamilton DAG and determining how individual nodes should execute. Loosely stated, this allows you to run your Hamilton DAG on whatever platform you want – enabling you to take advantage of the parallelism of ray, and the scalability of spark (using the pandas API, a.k.a. koalas) and dask. If you’re using pandas, good news, you now have three ways to scale your code!

This all sounds cool, perhaps too good to be true. There may be a caveat or two (read the docs to find out more!), but let’s dive into an example to convince you that you can both write your pipelines simply and let them scale. We’ll start with a simple pipeline, i.e. a Hamilton dataflow, that computes marketing spend:

import pandas as pd

def avg_3wk_spend(spend: pd.Series) -> pd.Series:
    """Rolling 3 week average spend."""
    return spend.rolling(3).mean()

def acquisition_cost(avg_3wk_spend: pd.Series, signups: pd.Series) -> pd.Series:
    """The cost per signup in relation to a rolling average of spend."""
    return avg_3wk_spend / signups

def spend_mean(spend: pd.Series) -> float:
    """Shows function creating a scalar. In this case it computes the mean of the entire column."""
    return spend.mean()

def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
    """Shows function that takes a scalar. In this case to zero mean spend."""
    return spend - spend_mean

We can run it easily in pandas using the following “driver” code:

import pandas as pd
from hamilton import driver

import my_functions  # we import the module here!

initial_columns = {  # load from actuals or wherever -- this is one way to provide inputs to Hamilton
    'signups': pd.Series([1, 10, 50, 100, 200, 400]),
    'spend': pd.Series([10, 10, 20, 40, 40, 50]),
}
dr = driver.Driver(initial_columns, my_functions)  # creates the DAG
outputs = ['spend', 'signups', 'acquisition_cost', 'spend_zero_mean']
df = dr.execute(outputs)  # let's create the dataframe!
print(df)

By making the following small change to the driver code (five in total), we can run it on dask! Do note, the functions which define the business logic do not require a change at all (thanks to dask implementing a good section of the Pandas API)!

import pandas as pd
from hamilton import base
from hamilton import driver
from hamilton.experimental import h_dask

import my_functions  # we import the module here!

from dask.distributed import Client, LocalCluster  # import dask components (1)
from dask import dataframe  # import dask components (1)

initial_columns = {  # need to adjust how we load data -- (2)
    'signups': dataframe.from_pandas(pd.Series([1, 10, 50, 100, 200, 400]), name='signups', npartitions=2),
    'spend': dataframe.from_pandas(pd.Series([10, 10, 20, 40, 40, 50]), name='spend', npartitions=2),
}
client = Client(LocalCluster())  # Setup connection to dask (3)
adapter = h_dask.DaskGraphAdapter(client, base.PandasDataFrameResult())  # Create dask adapter for Hamilton (4)
dr = driver.Driver(initial_columns, my_functions, adapter=adapter)  # and pass in the Adapter (4)
outputs = ['spend', 'signups', 'acquisition_cost', 'spend_zero_mean']
df = dr.execute(outputs)  # This will now execute using dask!
print(df)
client.shutdown()  # shut things down (5)

And it’s as easy as that.

Side note, we’re excited by all the work in the PyData community to ensure more interoperability between python data libraries/frameworks[2]. Scaling on dask, in this example, just works because of all their work!

Opening up Hamilton to be a general purpose dataflow framework

The forecasting team’s pipelines all revolved around creating pandas dataframes, and Hamilton grew up with this assumption. The concept of modeling a dataflow through functions, however, is more general than that initial application.

Thus we introduced the Result Mixin, an abstraction to help tell Hamilton how to turn the output of the DAG into a usable python object. This enables Hamilton users to operate over much more than just Pandas DataFrames! It is used in conjunction with a Graph Adapter, and is simple to use.

Here is an example Hamilton dataflow that fits a model and predicts with it.

import numpy as np
from sklearn import base, linear_model
from hamilton import function_modifiers

# This dataflow is abbreviated for this blog post -- see the full example in the examples folder.

@function_modifiers.config.when(clf='logistic')
def prefit_clf__logreg(penalty: str) -> base.ClassifierMixin:
    """Returns an unfitted Logistic Regression classifier object"""
    return linear_model.LogisticRegression(penalty)

def fit_clf(prefit_clf: base.ClassifierMixin, X_train: np.ndarray, y_train: np.ndarray) -> base.ClassifierMixin:
    """Calls fit on the classifier object; it mutates it."""
    prefit_clf.fit(X_train, y_train)
    return prefit_clf

def predicted_output(fit_clf: base.ClassifierMixin, X_test: np.ndarray) -> np.ndarray:
    """Exercised the fit classifier to perform a prediction."""
    return fit_clf.predict(X_test)

def predicted_output_with_labels(predicted_output: np.ndarray, target_names: np.ndarray) -> np.ndarray:
    """Replaces the predictions with the desired labels."""
    return np.array([target_names[idx] for idx in predicted_output])

def y_test_with_labels(y_test: np.ndarray, target_names: np.ndarray) -> np.ndarray:
    """Adds labels to the target output."""
    return np.array([target_names[idx] for idx in y_test])

We can run it easily using the following driver code; it requests a numpy matrix with two columns, one column with predicted labels, and the other column with ground truth labels.

from hamilton import base, driver
import my_train_evaluate_logic

dag_config = {'clf': 'logistic'}
adapter = base.SimplePythonGraphAdapter(base.NumpyMatrixResult())
dr = driver.Driver(dag_config, my_train_evaluate_logic, adapter=adapter)
# the following is abbreviated code:
inputs = {'penalty': 'l1', 'X_train': ..., 'y_train': ..., 'X_test': ..., 'y_test': ..., 'target_names': ...}
result = dr.execute(['predicted_output_with_labels', 'y_test_with_labels'], inputs=inputs)
# result is a np.ndarray that represents a matrix with two columns.
# One could then go and compute performance metrics with this output...

But, what if we wanted to debug the outputs? We can easily modify our code to return a dictionary as a result and then inspect the stages of the Hamilton dataflow. We just need to update the Graph Adapter, and specify what extra outputs we want to inspect are.

from hamilton import base, driver
import my_train_evaluate_logic

dag_config = {'clf': 'logistic'}
adapter = base.SimplePythonGraphAdapter(base.DictResult())  # (1) change this
dr = driver.Driver(dag_config, my_train_evaluate_logic, adapter=adapter)
inputs = {'penalty': 'l1', 'X_train': ..., 'y_train': ..., 'X_test': ..., 'y_test': ..., 'target_names': ...}
result = dr.execute(['predicted_output_with_labels', 'y_test_with_labels', 'fit_clf'], inputs=inputs)  # (2) change this
# result is now a dictionary
# One could now debug/inspect the fit model and iterate/develop further...

And we’re just getting started

We are really excited about the direction in which Hamilton is headed, and are actively working on extensions! As a teaser, we want to share our thoughts on how Hamilton could help surface issues in data quality. Could we add a simple decorator to run basic checks? Perhaps an API like this would be sufficient…

@check_output({'type': float, 'range': (0.0, 10000.0)})
def SOME_IMPORTANT_OUTPUT(input1: pd.Series, input2: pd.Series) -> pd.Series:
    """Does some complex logic"""
    ...

What do you think? Leave us your thoughts in our github issue.

If you’re interested in Hamilton, we’ve been developing a community on discord 📣. Please join if you have questions or want advice on how to best use Hamilton. You can also learn more by exploring the Hamilton repository (please ⭐️ it), or browsing our documentation here 📚.

Footnotes

[1]↩ For the original introductory post see Functions & DAGs: introducing Hamilton, a microframework for dataframe generation.

[2]↩ E.g. the work on defining a Python Array API standard.

Tweet this post! Post on LinkedIn
Multithreaded

Come Work with Us!

We’re a diverse team dedicated to building great products, and we’d love your help. Do you want to build amazing products with amazing peers? Join us!