Skip to content

Commit

Permalink
More minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
szarnyasg committed Jun 6, 2024
1 parent 52003e1 commit 3b288a1
Show file tree
Hide file tree
Showing 22 changed files with 207 additions and 212 deletions.
22 changes: 11 additions & 11 deletions _posts/2021-05-14-sql-on-pandas.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ After your data has been converted into a Pandas DataFrame often additional data

As a short teaser, here is a code snippet that allows you to do exactly that: run arbitrary SQL queries directly on Pandas DataFrames using DuckDB.

```py
```python
# to install: pip install duckdb
import pandas as pd
import duckdb
Expand All @@ -42,7 +42,7 @@ DuckDB can also write query results directly to any of these formats. You can us
When you run a query in SQL, DuckDB will look for Python variables whose name matches the table names in your query and automatically start reading your Pandas DataFrames. Looking back at the previous example we can see this in action:


```py
```python
import pandas as pd
import duckdb

Expand Down Expand Up @@ -74,7 +74,7 @@ pip install duckdb

To set up the dataset for processing we download two parquet files using `wget`. After that, we load the data into a Pandas DataFrame using the built-in Parquet reader of DuckDB. The system automatically infers that we are reading a parquet file by looking at the `.parquet` extension of the file.

```py
```python
lineitem = duckdb.query(
"SELECT * FROM 'lineitemsf1.snappy.parquet'"
).to_df()
Expand All @@ -97,7 +97,7 @@ FROM lineitem;

The Pandas code looks similar:

```py
```python
lineitem.agg(
Sum=('l_extendedprice', 'sum'),
Min=('l_extendedprice', 'min'),
Expand Down Expand Up @@ -135,7 +135,7 @@ GROUP BY

In Pandas, we use the groupby function before we perform the aggregation.

```py
```python
lineitem.groupby(
['l_returnflag', 'l_linestatus']
).agg(
Expand Down Expand Up @@ -175,7 +175,7 @@ GROUP BY l_returnflag,

In Pandas, we can create a filtered variant of the DataFrame by using the selection brackets.

```py
```python
# filter out the rows
filtered_df = lineitem[
lineitem['l_shipdate'] < "1998-09-02"]
Expand All @@ -194,7 +194,7 @@ In DuckDB, the query optimizer will combine the filter and aggregation into a si

We can manually perform this optimization ("projection pushdown" in database literature). To do this, we first need to select only the columns that are relevant to our query and then subset the lineitem dataframe. We will end up with the following code snippet:

```py
```python
# projection pushdown
pushed_down_df = lineitem[
['l_shipdate',
Expand Down Expand Up @@ -250,7 +250,7 @@ GROUP BY l_returnflag,

For Pandas, we have to add a `merge` step. In a basic approach, we merge lineitem and orders together, then apply the filters, and finally apply the grouping and aggregation. This will give us the following code snippet:

```py
```python
# perform the join
merged = lineitem.merge(
orders,
Expand Down Expand Up @@ -279,7 +279,7 @@ Now we have missed two performance opportunities:

Applying these two optimizations manually results in the following code snippet:

```py
```python
# projection & filter on lineitem table
lineitem_projected = lineitem[
['l_shipdate',
Expand Down Expand Up @@ -394,7 +394,7 @@ SELECT sum(l_extendedprice), min(l_extendedprice), max(l_extendedprice), avg(l_e

For Pandas, we will first need to run `read_parquet` to load the data into Pandas. To do this, we use the Parquet reader powered by Apache Arrow. After that, we can run the query as we did before.

```py
```python
lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet')
result = lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
```
Expand All @@ -403,7 +403,7 @@ However, we now again run into the problem where Pandas will read the Parquet fi

The optimizer in DuckDB will figure this out by itself by looking at the query you are executing.

```py
```python
lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_extendedprice'])
result = lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
```
Expand Down
88 changes: 43 additions & 45 deletions _posts/2021-06-25-querying-parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ Apache Parquet is the most common "Big Data" storage format for analytics. In Pa

<!--more-->

<img src="/images/blog/parquet.svg" alt="Example parquet file shown visually. The parquet file (taxi.parquet) is divided into row-groups that each have two columns (pickup_at and dropoff_at)"
title="Taxi Parquet File" style="max-width:30%"/>
<img src="/images/blog/parquet.svg" alt="Example parquet file shown visually. The parquet file (taxi.parquet) is divided into row-groups that each have two columns (pickup_at and dropoff_at)" title="Taxi Parquet File" style="max-width:30%"/>

The Parquet format has a number of properties that make it suitable for analytical use cases:

Expand Down Expand Up @@ -66,8 +65,7 @@ WHERE pickup_at BETWEEN '2019-04-15' AND '2019-04-20';

In this query, we read a single column from our Parquet file (`pickup_at`). Any other columns stored in the Parquet file can be entirely skipped, as we do not need them to answer our query.

<img src="/images/blog/parquet-filter-svg.svg" alt="Projection & filter pushdown into parquet file example."
title="Filter Pushdown" style="max-width:30%"/>
<img src="/images/blog/parquet-filter-svg.svg" alt="Projection & filter pushdown into parquet file example." title="Filter Pushdown" style="max-width:30%"/>

In addition, only rows that have a `pickup_at` between the 15th and the 20th of April 2019 influence the result of the query. Any rows that do not satisfy this predicate can be skipped.

Expand All @@ -85,7 +83,7 @@ The examples are available [here as an interactive notebook over at Google Colab

First we look at some rows in the dataset. There are three Parquet files in the `taxi/` folder. [DuckDB supports the globbing syntax](https://duckdb.org/docs/data/parquet), which allows it to query all three files simultaneously.

```py
```python
con.execute("""
SELECT *
FROM 'taxi/*.parquet'
Expand All @@ -104,30 +102,30 @@ Despite the query selecting all columns from three (rather large) Parquet files,

If we try to do the same in Pandas, we realize it is not so straightforward, as Pandas cannot read multiple Parquet files in one call. We will first have to use `pandas.concat` to concatenate the three Parquet files together:

```py
```python
import pandas
import glob
df = pandas.concat(
[pandas.read_parquet(file)
for file
in glob.glob('taxi/*.parquet')])
[pandas.read_parquet(file)
for file
in glob.glob('taxi/*.parquet')])
print(df.head(5))
```

Below are the timings for both of these queries.

| System | Time (s) |
|:--------|---------:|
|:-------|---------:|
| DuckDB | 0.015 |
| Pandas | 12.300 |
| Pandas | 12.300 |

Pandas takes significantly longer to complete this query. That is because Pandas not only needs to read each of the three Parquet files in their entirety, it has to concatenate these three separate Pandas DataFrames together.

#### Concatenate Into a Single File

We can address the concatenation issue by creating a single big Parquet file from the three smaller parts. We can use the `pyarrow` library for this, which has support for reading multiple Parquet files and streaming them into a single large file. Note that the `pyarrow` parquet reader is the very same parquet reader that is used by Pandas internally.

```py
```python
import pyarrow.parquet as pq

# concatenate all three parquet files
Expand All @@ -140,7 +138,7 @@ Note that [DuckDB also has support for writing Parquet files](https://duckdb.org

Now let us repeat the previous experiment, but using the single file instead.

```py
```python
# DuckDB
con.execute("""
SELECT *
Expand All @@ -153,8 +151,8 @@ pandas.read_parquet('alltaxi.parquet')
```

| System | Time (s) |
|:--------|---------:|
| DuckDB | 0.02 |
|:-------|---------:|
| DuckDB | 0.02 |
| Pandas | 7.50 |

We can see that Pandas performs better than before, as the concatenation is avoided. However, the entire file still needs to be read into memory, which takes both a significant amount of time and memory.
Expand All @@ -165,7 +163,7 @@ For DuckDB it does not really matter how many Parquet files need to be read in a

Now suppose we want to figure out how many rows are in our data set. We can do that using the following code:

```py
```python
# DuckDB
con.execute("""
SELECT count(*)
Expand All @@ -177,31 +175,31 @@ len(pandas.read_parquet('alltaxi.parquet'))
```

| System | Time (s) |
|:--------|---------:|
| DuckDB | 0.015 |
| Pandas | 7.500 |
|:-------|---------:|
| DuckDB | 0.015 |
| Pandas | 7.500 |

DuckDB completes the query very quickly, as it automatically recognizes what needs to be read from the Parquet file and minimizes the required reads. Pandas has to read the entire file again, which causes it to take the same amount of time as the previous query.

For this query, we can improve Pandas' time through manual optimization. In order to get a count, we only need a single column from the file. By manually specifying a single column to be read in the `read_parquet` command, we can get the same result but much faster.

```py
```python
len(pandas.read_parquet('alltaxi.parquet', columns=['vendor_id']))
```

| System | Time (s) |
|:--------------------|---------:|
| DuckDB | 0.015 |
| Pandas | 7.500 |
| Pandas (optimized) | 1.200 |
|:-------------------|---------:|
| DuckDB | 0.015 |
| Pandas | 7.500 |
| Pandas (optimized) | 1.200 |

While this is much faster, this still takes more than a second as the entire `vendor_id` column has to be read into memory as a Pandas column only to count the number of rows.

#### Filtering Rows

It is common to use some sort of filtering predicate to only look at the interesting parts of a data set. For example, imagine we want to know how many taxi rides occur after the 30th of June 2019. We can do that using the following query in DuckDB:

```py
```python
con.execute("""
SELECT count(*)
FROM 'alltaxi.parquet'
Expand All @@ -212,43 +210,43 @@ con.execute("""
The query completes in `45ms` and yields the following result:

| count |
|--------|
|-------:|
| 167022 |

In Pandas, we can perform the same operation using a naive approach.

```py
```python
# pandas naive
len(pandas.read_parquet('alltaxi.parquet')
.query("pickup_at > '2019-06-30'"))
```

This again reads the entire file into memory, however, causing this query to take `7.5s`. With the manual projection pushdown we can bring this down to `0.9s`. Still significantly higher than DuckDB.

```py
```python
# pandas projection pushdown
len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'])
.query("pickup_at > '2019-06-30'"))
```

The `pyarrow` parquet reader also allows us to perform filter pushdown into the scan, however. Once we add this we end up with a much more competitive `70ms` to complete the query.

```py
```python
len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'], filters=[('pickup_at', '>', '2019-06-30')]))
```

| System | Time (s) |
|:---------------------------------------|---------:|
| DuckDB | 0.05 |
|:--------------------------------------|---------:|
| DuckDB | 0.05 |
| Pandas | 7.50 |
| Pandas (projection pushdown) | 0.90 |
| Pandas (projection & filter pushdown) | 0.07 |
| Pandas (projection & filter pushdown) | 0.07 |

This shows that the results here are not due to DuckDB's parquet reader being faster than the `pyarrow` Parquet reader. The reason that DuckDB performs better on these queries is because its optimizers automatically extract all required columns and filters from the SQL query, which then get automatically utilized in the Parquet reader with no manual effort required.

Interestingly, both the `pyarrow` Parquet reader and DuckDB are significantly faster than performing this operation natively in Pandas on a materialized DataFrame.

```py
```python
# read the entire parquet file into Pandas
df = pandas.read_parquet('alltaxi.parquet')
# run the query natively in Pandas
Expand All @@ -257,18 +255,18 @@ print(len(df[['pickup_at']].query("pickup_at > '2019-06-30'")))
```

| System | Time (s) |
|:---------------------------------------|---------:|
| DuckDB | 0.05 |
|:--------------------------------------|---------:|
| DuckDB | 0.05 |
| Pandas | 7.50 |
| Pandas (projection pushdown) | 0.90 |
| Pandas (projection & filter pushdown) | 0.07 |
| Pandas (native) | 0.26 |
| Pandas (projection & filter pushdown) | 0.07 |
| Pandas (native) | 0.26 |

#### Aggregates

Finally lets look at a more complex aggregation. Say we want to compute the number of rides per passenger. With DuckDB and SQL, it looks like this:

```py
```python
con.execute("""
SELECT passenger_count, count(*)
FROM 'alltaxi.parquet'
Expand All @@ -278,7 +276,7 @@ con.execute("""
The query completes in `220ms` and yields the following result:

| passenger_count | count |
|:-----------------|----------:|
|---------------:|----------:|
| 0 | 408742 |
| 1 | 15356631 |
| 2 | 3332927 |
Expand All @@ -292,15 +290,15 @@ The query completes in `220ms` and yields the following result:

For the SQL-averse and as a teaser for a future blog post, DuckDB also has a "Relational API" that allows for a more Python-esque declaration of queries. Here's the equivalent to the above SQL query, that provides the exact same result and performance:

```py
```python
con.from_parquet('alltaxi.parquet')
.aggregate('passenger_count, count(*)')
.df()
```

Now as a comparison, let's run the same query in Pandas in the same way we did previously.

```py
```python
# naive
pandas.read_parquet('alltaxi.parquet')
.groupby('passenger_count')
Expand All @@ -317,11 +315,11 @@ df.groupby('passenger_count')
```

| System | Time (s) |
|:------------------------------|---------:|
| DuckDB | 0.22 |
|:-----------------------------|---------:|
| DuckDB | 0.22 |
| Pandas | 7.50 |
| Pandas (projection pushdown) | 0.58 |
| Pandas (native) | 0.51 |
| Pandas (projection pushdown) | 0.58 |
| Pandas (native) | 0.51 |

We can see that DuckDB is faster than Pandas in all three scenarios, without needing to perform any manual optimizations and without needing to load the Parquet file into memory in its entirety.

Expand Down
2 changes: 1 addition & 1 deletion _posts/2021-11-12-moving-holistic.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ for the following string data, using a frame that includes one element from each

For this example we are using strings so we don't have to worry about interpolating values.

```py
```python
data = ('a', 'b', 'c', 'd', 'c', 'b',)
w = len(data)
for row in range(w):
Expand Down
6 changes: 3 additions & 3 deletions _posts/2021-12-03-duck-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ There are two ways in Python of querying data from Arrow:

1. Through the Relational API

```py
```python
# Reads Parquet File to an Arrow Table
arrow_table = pq.read_table('integers.parquet')

Expand All @@ -121,7 +121,7 @@ arrow_table_from_duckdb = rel_from_arrow.arrow()

2. By using replacement scans and querying the object directly with SQL:

```py
```python
# Reads Parquet File to an Arrow Table
arrow_table = pq.read_table('integers.parquet')

Expand Down Expand Up @@ -172,7 +172,7 @@ arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")

#### Python

```py
```python
# Reads dataset partitioning it in year/month folder
nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

Expand Down
Loading

0 comments on commit 3b288a1

Please sign in to comment.