Skip to content

Comparing transformation methods: BigQuery (with dbt), Spark and Dask

Notifications You must be signed in to change notification settings

calum-mcg/data-transform-compare

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Overview

The aim of this project is to compare the time it takes to extract, transform and load large datasets using three different methods. Please read the Medium post for more information.

Data

The data used in this project is the publicly available Stack Exchange Data Dump. Users.xml and Posts.xml were converted to users.csv.gz and posts.csv.gz and used as the source files for this project.

For the ISO 3166-1 country codes, the CSV used from DataHub was used (as country_codes.csv).

Pre-requisites

For Google Cloud:

  • Create a project
  • Create a Cloud Storage Bucket and upload posts.csv.gz, users.csv.gz and country_codes.csv files

For Spark:

  • Install sbt
  • Install SDKMAN!
  • In the spark folder, use SDKMAN! to install JDK (8 or 11 currently supported) and set JDK version of the project using sdk env init

For Dask:

  • Python 3.x installed
  • Install packages from requirements.txt -> pip install -r /dask/requirements.txt (for running on local)

For dbt:

  • Python 3.x installed
  • Install packages from requirements.txt -> pip install -r /dbt/requirements.txt
  • Copy the ISO 3166-1 country codes CSV into ./bigquery_dbt/seeds/country_codes.csv
  • Setup a dbt profile in ~/.dbt/profiles.yml called bigquery_dbt for BigQuery (Example)

Running

BigQuery (with dbt)

  1. Make BigQuery dataset bq mk --dataset ${PROJECT_ID}:${DATASET}

  2. Load files into BigQuery as tables (can be done concurrently)

bq load \
    --autodetect \
    --source_format=CSV \
    ${DATASET}.posts \
    gs://${BUCKET_NAME}/posts.csv.gz

bq load \
    --autodetect \
    --source_format=CSV \
    ${DATASET}.users \
    gs://${BUCKET_NAME}/users.csv.gz
  1. Ensure Google project id is specified in database field in schema.yml

  2. Run dbt

cd ./bigquery_dbt
dbt build # Load CSV as reference table (via seeds), run tests etc.
dbt run
  1. Load created table into GCS
bq extract \
--destination_format CSV \
--compression GZIP \
--field_delimiter ',' \
${PROJECT_ID}:${DATASET}.aggregated_users \
gs://${BUCKET_NAME}/dbt_bigquery/agg_users.csv.gz

Spark

  1. Ensure that you change the gcsBucket value in aggregate-users.scala
  2. Run the following (in the spark folder) to compile and package the project into a .jar for Dataproc:
sbt

Then within the sbt console:

package
  1. Copy the jar from local to GCS (optional):
gsutil cp spark/target/scala-2.12/${JAR_FILENAME}.jar gs://${BUCKET_NAME}/spark/aggregateusers.jar
  1. Create Dataproc cluster:
 gcloud dataproc clusters create ${SPARK_CLUSTER_NAME} \
    --project=${PROJECT_ID} \
    --region=${REGION} \
    --image-version=2.0 \
    --master-machine-type n1-standard-8 \
    --worker-machine-type n1-standard-8 \
    --num-workers 6
  1. Submit Spark job on Dataproc cluster
gcloud dataproc jobs submit spark \
    --cluster=${SPARK_CLUSTER_NAME} \
    --class=stackoverflow.AggregateUsers \
    --jars=gs://${BUCKET_NAME}/spark/aggregateusers.jar \
    --region=${REGION}
  1. Delete cluster when finished

Dask

  1. Copy initialisation actions to local bucket (optional):
gsutil cp gs://goog-dataproc-initialization-actions-${ZONE}/dask/dask.sh gs://${BUCKET_NAME}/dask/
  1. Create cluster
 gcloud dataproc clusters create ${DASK_CLUSTER_NAME} \
    --project=${PROJECT_ID} \
    --region=${REGION} \
    --master-machine-type n1-standard-8 \
    --worker-machine-type n1-standard-8 \
    --num-workers 6 \
    --image-version preview-ubuntu \
    --initialization-actions gs://${BUCKET_NAME}/dask/dask.sh \
    --metadata dask-runtime=yarn \
    --enable-component-gateway
  1. Copy files
gcloud compute scp \
    --project=${PROJECT_ID} \
    --zone=${ZONE} \
    --recurse ./dask/ ${DASK_CLUSTER_NAME}-m:~/
  1. Install package requirements & run
gcloud compute ssh ${CLUSTER_NAME}-m --zone ${ZONE}
/opt/conda/default/bin/python -m pip install python-dotenv
/opt/conda/default/bin/python ./dask/transform.py
  1. Delete cluster when finished

About

Comparing transformation methods: BigQuery (with dbt), Spark and Dask

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published