HandySpark: bringing pandas-like capabilities to Spark DataFrames

“Panda statues on gray concrete stairs during daytime” by chuttersnap on Unsplash

Originally posted on Towards Data Science.

TLDR;

HandySpark is a new Python package designed to improve PySpark user experience, especially when it comes to exploratory data analysis, including visualization capabilities.

Try it yourself using Google Colab:

Check the repository:

Introduction

Apache Spark is the most popular cluster computing framework. It is listed as a required skill by about 30% of job listings (link).

The majority of Data Scientists uses Python and Pandas, the de facto standard for manipulating data. Therefore, it is only logical that they will want to use PySpark — Spark Python API and, of course, Spark DataFrames.

But, the transition from Pandas to Spark DataFrames may not be as smooth as one could hope…

Motivation

I’ve been teaching Applied Machine Learning using Apache Spark at Data Science Retreat to more than 100 students over the course of 2 years.

My students were quite often puzzled with some of the quirks of PySpark and, some other times, baffled by the lack of some functionalities Data Scientists take for granted while using the traditional Pandas/Scikit-Learn combo.

I decided to address these problems by developing a Python package that would make exploratory data analysis much easier in PySpark

Introducing HandySpark

HandySpark is really easy to install and to integrate into your PySpark workflow. It takes only 3 steps to make your DataFrame a HandyFrame:

    1. Install HandySpark using pip install handyspark
    1. Import HandySpark with from handyspark import *
  1. Make your DataFrame a HandyFrame with hdf = df.toHandy()

After importing HandySpark, the method toHandy is added to Spark’s DataFrame as an extension, so you’re able to call it straight away.

Let’s take a quick look at everything you can do with HandySpark 🙂

1. Fetching Data

No more cumbersome column selection, collection and manual extraction from Row objects!

Now you can fetch data just like you do in Pandas, using cols :

hdf.cols['Name'][:5]

0                              Braund, Mr. Owen Harris
1    Cumings, Mrs. John Bradley (Florence Briggs Th...
2                               Heikkinen, Miss. Laina
3         Futrelle, Mrs. Jacques Heath (Lily May Peel)
4                             Allen, Mr. William Henry
Name: Name, dtype: object

Much, much easier, right? The result is a Pandas Series!

Just keep in mind that, due to the distributed nature of data in Spark, it is only possible to fetch the top rows of any given HandyFrame — so, no, you still cannot do things like [3:5] or [-1] and so on… only [:N].

There are also other pandas-like methods available:

hdf.cols['Embarked'].value_counts(dropna=False)
S      644
C      168
Q       77
NaN      2
Name: Embarked, dtype: int64

If you haven’t guessed yet, the examples above (and all others in this post) are built using the famous Titanic dataset 🙂

2. Plotting Data

The lack of an easy way of visualizing data always puzzled my students. And, when one searches the web for examples of plotting data using PySpark, it is even worse: many, many tutorials simply convert the WHOLE dataset to Pandas and then plot it the traditional way.

Please, DON’T EVER DO IT! It will surely work with toy datasets, but it would fail miserably if used with a really big dataset (the ones you likely handle if you’re using Spark).

HandySpark addresses this problem by properly computing statistics using Spark’s distributed computing capabilities and only then turning the results into plots. Then, it turns out to be easy like that:

fig, axs = plt.subplots(1, 4, figsize=(12, 4))
hdf.cols['Embarked'].hist(ax=axs[0])
hdf.cols['Age'].boxplot(ax=axs[1])
hdf.cols['Fare'].boxplot(ax=axs[2])
hdf.cols[['Fare', 'Age']].scatterplot(ax=axs[3])

Plotting with HandySpark!

Yes, there is even a scatterplot! How is that possible?! HandySpark splits both features into 30 bins each, computes frequencies for each and every one of the 900 combinations and plots circles which are sized accordingly.

3. Stratify

What if you want to perform stratified operations, using a split-apply-combine approach? The first idea that may come to mind is to use a groupby operation… but groupby operations trigger the dreaded data shuffling in Spark, so they should be avoided.

HandySpark handles this issue by filtering rows accordingly, performing computations on each subset of the data and then combining the results. For instance:

hdf.stratify(['Pclass']).cols['Embarked'].value_counts()
Pclass  Embarked
1       C            85
        Q             2
        S           127
2       C            17
        Q             3
        S           164
3       C            66
        Q            72
        S           353
Name: value_counts, dtype: int64

You can also stratify it with non-categorical columns by leveraging on either Bucket or Quantile objects. And then use it in a stratified plot:

hdf.stratify(['Sex', Bucket('Age', 2)]).cols['Embarked'].hist()

Stratified histogram

4. Imputing Missing Values

“Thou shall impute missing values”

First things first, though. How many missing values are there?

hdf.isnull(ratio=True)
PassengerId    0.000000
Survived       0.000000
Pclass         0.000000
Name           0.000000
Sex            0.000000
Age            0.198653
SibSp          0.000000
Parch          0.000000
Ticket         0.000000
Fare           0.000000
Cabin          0.771044
Embarked       0.002245
Name: missing(ratio), dtype: float64

OK, now we know there are 3 columns with missing values. Let’s drop Cabin(after all, 77% of its values are missing) and focus on the imputation of values for the other two columns: Age and Embarked.

The imputation of missing values could not be integrated into a Spark pipeline until version 2.2.0, when the Imputer transformer was released. But it still does not handle categorical variables (like Embarked), let alone stratified imputation…

Let’s see how HandySpark can help us with this task:

hdf_filled = hdf.fill(categorical=['Embarked'])
hdf_filled = (hdf_filled.stratify(['Pclass', 'Sex'])
              .fill(continuous=['Age'], strategy=['mean']))

First, it uses the most common value to fill missing values of our categorical column. Then, it stratifies the dataset according to Pclass and Sex to compute the mean value for Age , which is going to be used in the imputation.

Which values did it use for the imputation?

hdf_filled.statistics_
{'Embarked': 'S',
 'Pclass == "1" and Sex == "female"': {'Age': 34.61176470588235},
 'Pclass == "1" and Sex == "male"': {'Age': 41.28138613861386},
 'Pclass == "2" and Sex == "female"': {'Age': 28.722972972972972},
 'Pclass == "2" and Sex == "male"': {'Age': 30.74070707070707},
 'Pclass == "3" and Sex == "female"': {'Age': 21.75},
 'Pclass == "3" and Sex == "male"': {'Age': 26.507588932806325}}

So far, so good! Time to integrate it into a Spark pipeline, generating a custom transformer with transformers:

imputer = hdf_filled.transformers.imputer()

The imputer object is now a full-fledged serializable PySpark transformer! What does that mean? You can use it in your pipeline and save / load at will 🙂

5. Detecting Outliers

“You shall not pass!”

How many outliers should we not allow to pass?

hdf_filled.outliers(method='tukey', k=3.)
PassengerId      0.0
Survived         0.0
Pclass           0.0
Age              1.0
SibSp           12.0
Parch          213.0
Fare            53.0
dtype: float64

Currently, only Tukey’s method is available (I am working on Mahalanobis distance!). This method takes an optional k argument, which you can set to larger values (like 3) to allow for a more loose detection.

Take the Fare column, for instance. There are, according to Tukey’s method, 53 outliers. Let’s fence them!

hdf_fenced = hdf_filled.fence(['Fare'])

What are the lower and upper fence values?

hdf_fenced.fences_
{'Fare': [-26.7605, 65.6563]}

Remember that, if you want to, you can also perform a stratified fencing 🙂

As you’d probably guessed already, you can also integrate this step into your pipeline, generating the corresponding transformer:

fencer = hdf_fenced.transformers.fencer()

6. Pandas Functions

In Spark 2.3, Pandas UDFs were released! This turned out to be a major improvement for us, PySpark users, as we could finally overcome the performance bottleneck imposed by traditional User Defined Functions (UDFs). Awesome!

HandySpark takes it one step further, by doing all the heavy lifting for you 🙂 You only need to use its pandas object and voilà — lots of functions from Pandas are immediately available!

For instance, let’s use isin as you’d use with a regular Pandas Series:

some_ports = hdf_fenced.pandas['Embarked'].isin(values=['C', 'Q'])
some_ports
Column<b'udf(Embarked) AS `<lambda>(Embarked,)`'>

But, remember Spark has lazy evaluation, so the result is a column expression which leverages the power of Pandas UDFs. The only thing left to do is to actually assign the results to a new column, right?

hdf_fenced = hdf_fenced.assign(is_c_or_q=some_ports)
# What's in there?
hdf_fenced.cols['is_c_or_q'][:5]
0     True
1    False
2    False
3     True
4     True
Name: is_c_or_q, dtype: bool

You got that right! HandyFrame has a very convenient assign method, just like in Pandas!

And this is not all! Both specialized str and dt objects from Pandas are available as well! For instance, what if you want to find if a given string contains another substring?

col_mrs = hdf_fenced.pandas['Name'].str.find(sub='Mrs.')
hdf_fenced = hdf_fenced.assign(is_mrs=col_mrs > 0)

For a complete list of all supported functions, please check the repository.

7. Your Own UDFs

The sky is the limit! You can create regular Python functions and use assign to create new columns 🙂 And they will be turned into Pandas UDFs for you!

The arguments of your function (or lambda) should have the names of the columns you want to use. For instance, to take the log of Fare:

import numpy as np
hdf_fenced = hdf_fenced.assign(logFare=lambda Fare: np.log(Fare + 1))

You can also use functions that take multiple columns as arguments. Keep in mind that the default return type, that is, the data type of the new column, will be the same as the first column used (Fare, in the example).

It is also possible to specify different return types — please check the repository for examples on that.

8. Nicer Exceptions

Spark exceptions are loooong… whenever something breaks, the error bubbles up through a seemingly infinite number of layers!

I always advise my students to scroll all the way down and work their way up trying to figure out the source of the problem… but, not anymore!

HandySpark will parse the error and show you a nice and bold red summary at the very top 🙂 It may not be perfect, but it will surely help!

Handy Exception

9. Safety First

Some dataframe operations, like collect or toPandas will trigger the retrieval of ALL rows of the dataframe!

To prevent the undesirable side effects of these actions, HandySpark implements a safety mechanism! It will automatically limit the output to 1,000 rows:

Safety mechanism in action!

Of course, you can specify a different limit using set_safety_limit or throw caution to the wind and tell your HandyFrame to ignore the safety using safety_off. Turning the safety mechanism off is good for a single action, though, as it will kick back in after returning the requested unlimited result.

Final Thoughts

My goal is to improve PySpark user experience and allow for a smoother transition from Pandas to Spark DataFrames, making it easier to perform exploratory data analysis and visualize the data. Needless to say, this is a work in progress, and I have many more improvements already planned.

If you are a Data Scientist using PySpark, I hope you give HandySpark a try and let me know your thoughts on it 🙂

If you have any thoughts, comments or questions, please leave a comment below or contact me on Twitter.