Process 2.4TB in Parquet Files in 76s

With <30 lines of Python.

In this example we:

  • Generate 1,000 billion-row Parquet files (2.4TB) and store them in Google Cloud Storage.

  • Run a DuckDB query on each file in parallel using a cluster with 10,000 CPUs.

  • Combine resulting data locally.

Demo Video:

What is the Trillion row Challenge?

An extension of the billion row challenge, the goal of the trillion row challenge is to compute the min, max, and mean temperature per weather station, for 413 unique stations, from data stored as a collection of parquet files in blob storage. Data looks like this (but with 1,000,000,000,000 rows):

Cluster settings:

For this challenge we use used a 125 node cluster with 80 cpus and 320G ram per node. Underneath these are N4-standard-80 machines. The cluster settings look like this:

Once the settings look good we just hit "⏻ Start" on the front page. This cluster took 1min 47s to boot.

Generating the dataset:

We chose to split the 1T row dataset into 1,000 parquet files. With 10k cpus, this means we only need to download one file per 10-cpu worker.

Once generated, parquet files are written to the `./shared` folder. Anything placed in this folder is synchronized with a Google Cloud Storage bucket using GCSFuse. The dashboard has a built in file manager where you can monitor, upload, and download files from the cluster (screenshot below).

Below, each call to generate_parquet is done in a python:3.12 docker container (see settings ↑), this can be any docker container as long as your VM's service account is authorized to pull it. Python packages your code uses that aren't in the container are detected and installed quickly at runtime. The code below took 3s to install pandas, numpy, and pyarrow in every container.

import pyarrow
import numpy as np
import pandas as pd
from burla import remote_parallel_map

TOTAL_ROWS = 1_000_000_000_000
ROWS_PER_FILE = 1_000_000_000

lookup_df = pd.read_csv("avg_temp_per_station.csv")
file_nums = range(TOTAL_ROWS // ROWS_PER_FILE)

def generate_parquet(file_num: int):
    rng = np.random.default_rng(seed=file_num)

    # array of station names
    station_indices = rng.integers(0, len(lookup_df) - 1, ROWS_PER_FILE)
    station_names = lookup_df.station.to_numpy()[station_indices]

    # array of station temps, aligned by index with `station_names`
    station_temps = rng.normal(0, 10, ROWS_PER_FILE)
    station_temps += lookup_df.mean_temp.to_numpy()[station_indices]
    station_temps = station_temps.round(1)

    # save to parquet in cluster filesystem
    df = pd.DataFrame({"station": station_names, "temp": station_temps})
    df.to_parquet(f"shared/1TRC/{file_num}.parquet", engine="pyarrow")

remote_parallel_map(generate_parquet, file_nums, func_ram=64)

This code completed in 5m 48s! The final dataset is 2.4TB. After running, files appear under the "Filesystem" tab (underneath this is a GCS Bucket).

Running the challenge!

This code runs a simple DuckDB query on all 1,000 Parquet files at the same time. Each query returns a pandas dataframe with the min/mean/max per station within that file. DataFrames are concatenated, and aggregated locally to produce a final min/mean/max temperature per weather station.

Like the data generation code above, every call to station_stats runs in a python:3.12 docker container, and any missing python packages are quickly installed at runtime. Parquet files pulled from the `./shared` folder are actually being downloaded from Google Cloud Storage using GCSFuse.

To avoid using any cached files/data the cluster was rebooted before running the code below.

import duckdb
import pandas as pd
from time import time
from burla import remote_parallel_map

TOTAL_ROWS = 1_000_000_000_000
ROWS_PER_FILE = 1_000_000_000
file_nums = range(TOTAL_ROWS // ROWS_PER_FILE)

def station_stats(file_num: int):
    query = """
    SELECT
        station,
        MIN(temp) AS min_temp,
        AVG(temp) AS mean_temp,
        MAX(temp) AS max_temp
    FROM read_parquet(?)
    GROUP BY station
    ORDER BY station
    """
    con = duckdb.connect(database=":memory:")
    con.execute("PRAGMA threads=8")
    return con.execute(query, (f"shared/1TRC/{file_num}.parquet",)).df()

start = time()
dataframes = remote_parallel_map(station_stats, file_nums, func_cpu=10)
df = pd.concat(dataframes).groupby("station").agg(
    min_value=("min_temp", "min"),
    mean_value=("mean_temp", "mean"),
    max_value=("max_temp", "max"),
)
print(df)
print(f"Done after {start-time()}s!")

This only took 76s to finish, which I believe is technically a new record! (on the full 2.4TB dataset). However, as I talk more about below, I don't actually think pure runtime is what really matters most. I also think, with the right optimization, this could be done in <5s! If anyone is brave enough :)

Output:

✔ Done! Ran 1000 inputs through `station_stats` (1000/1000 completed)         
              name  min_value  mean_value  max_value
0             Abha      -46.9   18.000151       84.2
1          Abidjan      -34.1   26.000040       89.9
2           Abéché      -30.9   29.400059       91.5
3            Accra      -36.1   26.399808       88.7
4      Addis Ababa      -46.5   15.999770       81.7
..             ...        ...         ...        ...
407       Yinchuan      -54.2    9.000354       75.5
408         Zagreb      -52.6   10.700003       75.1
409  Zanzibar City      -36.0   25.999645       91.1
410         Ürümqi      -53.8    7.399437       69.3
411          İzmir      -40.8   17.900016       80.9

[412 rows x 4 columns]

How expensive was this?

For this demo we used spot instances. These are extra machines rented at a discount that might be deleted ("preempted") at any time when Google needs them. This isn't an issue for us because Burla jobs continue working even if some nodes are preempted while in the middle of a job.

Our cluster used 125 N4-standard-80 machines, took 1m47s to boot, and was shut down 1.5min later. To be safe let's assume the billable runtime per node was 3.5min, with 125 nodes this is 7.3 hours.

At spot pricing N4-standard-80 machines cost $1.22/hour meaning this job cost about $8.91.

What really matters.

76s is a respectable time, but the real question I'm trying to answer is: If I were in the office on a busy day, and needed to process a bunch of stuff, what would I do? How long would it take? and how expensive would it be?

Let's be honest, this isn't the most perfectly efficient solution in the world. Most of the time is spent downloading data, and while GCSFuse is convenient, it probably isn't maxing out the VM's network capacity.

But, if I were in the office, and you asked me to get you the min/mean/max per station, assuming I'd never heard of this challenge before, I'd have an answer for you around 5 minutes later, and for less than $10. In my opinion this is the real result, and I think it's an impressive one!

Not to mention, I'd do it all using an interface a beginner can understand!


Bonus: I think a time of <5s is possible 👀

As I mentioned earlier, I think the real result that matters is how quickly you could do this in a real world setting, without hyper-optimizing, including the time you spent writing code.

But hyper-optimizing is fun! So how fast could it be? Well, Databricks was able to achieve a time of 64s using better compression that shrunk the dataset to 1.2TB. I think this is totally fair game given that's just how their system decided to store the data.

What if we used the same compression format they did? AND 10,000 CPUs? Well, we tested this, and it took just 39s to complete! (code coming soon, keep an eye on GitHub).

Unfortunately, 1T rows in 39s is SLOW, how can we hit single digits?

Faster downloads:

N4-standard-80 machines have a max download speed of about 50Gbps from cloud storage in the same region, and each machine (with improved compression) needs to download eight 1.17G files, or 9.36G of total data.

In practice, it can be hard to hit the max download speed of 50Gbps (I think?) so let's assume that, using the right parallel connection logic (not GCSFuse), you can consistently achieve 40Gbps. This would mean you could get the entire compressed dataset into memory in just 1.9 seconds.

Parallelize the 1-Billion challenge winning code?

The best solution to the original 1-billion row challenge finishes in 1.5s using 8cpus and 32G of ram. Could we just run the 1BRC winning code on 1,000 machines in parallel? Then aggregate results? Definitely! Stuff like this is exactly what Burla is designed to accomplish.

The only issue is we have a compressed parquet file in memory, and the 1-billion row challenge code expects a CSV file on disk. If somebody modified the 1BRC winning code to operate on a compressed parquet file instead of a CSV file. Then deployed 1,000 in parallel, I think it's likely a <5s time is possible.

If anyone decides to give this a try, or has a good reason they don't think this would work, let me know! My email is [email protected]