Benchmarking Python DataFrame Libraries on CASFS

July 1st, 2021


Abstract


Several Python DataFrame libraries are benchmarked on CASFS on AWS. Various statistics, like time, memory usage, and cost, are compared on varying dataset sizes to help users determine which library is the best for their specific use case.

Introduction


When most aspiring data professionals begin learning data analysis in Python, the first libraries they normally encounter are Numpy and Pandas, both of which compose the foundation for the modern paradigm of data processing today. While Pandas is, and will remain, the standard for processing tabular data in Python, Pandas' biggest drawback is that as dataset size increases, Pandas' performance grows linearly. While other libraries make use of multithreading or multiprocessor clusters, Pandas is designed to only make use of one core. Increasing the amount of processing cores does not improve Pandas performance.

In this paper, we compare alternatives to Pandas and make recommendations on how to improve code performance with increasing data sizes on CASFS. The libraries we compare are Pandas, Polars, Dask, Ray, and PySpark. For libraries that can deploy code to a cluster, we compare both running the code on a singular machine and running the code on a cluster.


Library Description


  • Pandas - Data processing library written in Python. Individual columns are built from Numpy arrays, which are written in C.
  • Polars - Data processing library similar to Pandas, written in Python and Rust. Uses multithreading by default. Has capabilities for lazy evaluation
  • Dask - Data processing library similar to Pandas and parallel scheduling framework written in Python. Has DataFrame, array, and scheduling APIs. Able to process DataFrames either by multithreading on one machine or deploying the code to a cluster.
  • Ray - Parallel scheduling framework written in Python. Allows the user to manually parallelize their Pandas code. Dask DataFrames and PySpark determine where to split the data in order to process in parallel.However, by using Ray, you choose where to make that split yourself. Ray can distribute your process by either using multithreading on one machine or deploying the code to a cluster.
  • PySpark - Big data processing library written in Scala. Excels at processing large datasets in memory. Has a DataFrame and series API. Allows users to query datasets as if they were in a SQL database. Schedules code using multithreading on a single machine in standalone mode or can be deployed using cluster mode.

Data Description


The libraries were tested using daily, financial datasets. The dataset sizes are as follows:

  • 1 file - 2,524,365 rows x 20 columns
  • 10 files - 23,746,635 rows x 20 columns
  • 100 files - 241,313,625 rows x 20 columns


Process Description


We processed the data on a r5.24xlarge machine with 96 cores and 768 GB of RAM. The cost for this machine averages $1.00 per hour. For the code processed on a cluster, we used 10 r5.2xlarge machines with a total of 80 cores and 640 GB of RAM. The cost for this cluster averages $0.80 per hour. We chose to have less total resources for the cluster to demonstrate that some of these libraries actually perform better on a cluster than they do on a singular machine, even if the single machine has more cores. Because of this, it may be more cost effective to use a cluster rather than a single machine.

To process the data we used this generalized algorithm:

  1. Read in the data from csv files
  2. Query the data
  3. Group by three columns and aggregate two separate columns
  4. Calculate a new column using a vectorized operation
  5. Write processed data to parquet

For each library, we tried to keep the code as similar as possible to Pandas. Polars and Dask have a similar syntax to Pandas and require few code changes. PySpark's API is different from Pandas, so most of the code for PySpark was rewritten. However, we still utilized the same algorithm. To properly implement Ray, we didn't have to change the syntax of the Pandas code, but we had to determine the most optimal place to parallelize the code in our algorithm. In the algorithm used by Pandas, Polars, Dask, and PySpark, we read in all the data at once, and let the libraries decide how to best use resources. In the algorithm used by Ray, we determined the best split was to read in and process files individually in parallel, and then concatenate them before writing to parquet.

Results


  • Pandas - Performance is comparative with other libraries when dataset size is small, but scales linearly as dataset size increases. Pandas does not improve performance with increasing machine sizes or number of cores since it only utilizes one core.
  • Polars - Improves performance relative to Pandas when reading csvs and performing group by operations. Polars performs similarly to Pandas when performing other operations. Polars scales linearly as dataset size increases. Polars makes use of multithreading by default, so performance improves as machine size and number of cores are increased. On average, Polars used 25%-50% of the 96 cores available on a single machine.
  • Dask - Dask improves performance over Pandas by scheduling DataFrame operations to multiple cores. Dask scales linearly until there are more tasks than cores available. It scaled linearly between 1 and 10 files, but did not scale linearly between 10 and 100 files since there were only 96 cores available on the single machine and 80 cores available on the cluster. On average, Dask utilized 25%-50% of the 96 cores available on a single machine. It performs better on a cluster than the standalone machine because its scheduler utilizes resources better on a cluster.
  • Ray - Using Ray with Pandas can improve performance depending on the overall algorithm, as well as where the user decides to parallelize the code. Like Dask, Ray scales linearly until there are more tasks than cores available. It scaled linearly between 1 and 10 files, but did not scale linearly between 10 and 100 files since there were only 96 cores available on the single machine and 80 cores available on the cluster. On average, Ray utilized close to 100% of cores available to it. Because there were more cores available, Ray performed best on a single machine. Overall, Ray performed best as data size increased because there was a logical place for the code to be parallelized.
  • PySpark - Since Spark is designed for big data processing, it performs worse than the other DataFrame libraries when data size is relatively small. However, it increases performance relative to the other libraries as the dataset size increases. It scales linearly on a single machine, but doesn't scale linearly on a cluster. This is because in Spark's cluster mode, the scheduler creates a directed acyclic graph (DAG) for the process. This requires a fixed amount of time independent to the size of the datasets being processed. On average, PySpark utilized 75%-100% of the 96 cores available on a single machine. Like Dask, it performs better on a cluster than the standalone machine because its scheduler utilizes resources better on a cluster. It performs best as dataset size is increased, without having to change the overall algorithm, like Ray.

Our testing results are presented in the table and graphs on the subsequent pages. The x and y scales have been scaled by logarithm to the base 10 to show which libraries' performance scales linearly as the dataset size increases. Memory usage was not recorded for cluster processing because it was difficult to accurately aggregate total memory usage across each machine of the cluster. Overall, PySpark and Ray were the most performant and memory efficient as the data size increased.



Table


Library

N Files

Time (seconds)

Memory (MiB)

Cost Per Hour

Total Cost (dollars)

pandas

1

7.89

1255.12

1.00

0.0022

pandas

10

76.00

11452.78

1.00

0.0211

pandas

100

797.00

130078.20

1.00

0.2214

polars

1

3.09

2712.89

1.00

0.0009

polars

10

30.30

15161.97

1.00

0.0084

polars

100

318.00

106192.75

1.00

0.0883

dask standalone

1

7.09

1642.15

1.00

0.0020

dask standalone

10

34.40

11501.16

1.00

0.0096

dask standalone

100

370.00

75321.80

1.00

0.1028

spark standalone

1

19.50

62.62

1.00

0.0054

spark standalone

10

37.50

63.91

1.00

0.0104

spark standalone

100

86.00

63.93

1.00

0.0239

ray standalone

1

6.73

40.05

1.00

0.0019

ray standalone

10

7.53

144.20

1.00

0.0021

ray standalone

100

26.30

917.56

1.00

0.0073

dask cluster

1

8.74


0.90

0.0022

dask cluster

10

26.88


0.90

0.0067

dask cluster

100

241.50


0.90

0.0604

spark cluster

1

22.78


0.90

0.0057

spark cluster

10

22.84


0.90

0.0057

spark cluster

100

53.65


0.90

0.0134

ray cluster

1

8.10


0.90

0.0020

ray cluster

10

21.32


0.90

0.0053

ray cluster

100

159.81


0.90

0.0400



Processing Time (Standalone)


image

Processing Time (Cluster)


image

Processing Time (Total)


image

Memory Usage


image

Total Cost


image

Analysis


After completing this research, we provide the following recommendations:
File size:

  • 0 - 9,999,999 rows
    • Standalone
      1. CSV: Polars + Pandas
      2. Other file formats: Dask
    • Cluster
      1. Dask
  • 10,000,000 - 99,999,999 rows
    • Standalone
      1. CSV: Polars + Pandas
      2. Other file formats: Dask
    • Cluster:
      1. Spark
  • 100,000,000 rows
    • Standalone
      1. Spark
    • Cluster:
      1. Spark
  • If code can be parallelized, using Ray can maximize performance for all file sizes.

General code improvements:


  1. Improve existing Pandas processes by replacing pandas.read_csv() and pandas.groupby() with polars.read_csv() and polars.groupby(). You can easily convert between Polars DataFrames and Pandas DataFrames with the polars.from_pandas() and df.to_pandas() methods.
  2. Improve existing Pandas processes by replacing Pandas DataFrames with Dask DataFrames. Dask will then automatically schedule the process to use multiple cores with minimal syntax changes.
  3. Refactor existing Pandas processes to utilize Ray if the code can be parallelized.
  4. Refactor big data processes to utilize PySpark.

Conclusion


The performance of Pandas processes in Python can be improved by utilizing other DataFrame libraries, such as Polars, Dask, Ray, and PySpark. Dask, Ray, and PySpark can be used either on a single machine or on a cluster. On CASFS, when users add worker nodes to their Analytics cluster, CASFS automatically adds the worker nodes to a Dask and a Ray cluster. Currently, we provide a script to launch Spark clusters on Analytics, but in the future, Analytics clusters will automatically connect worker nodes to a Spark cluster as well.