Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble Scaling XGBoost beyond in-memory training on databricks #10853

Closed
gdubs89 opened this issue Sep 27, 2024 · 12 comments
Closed

Trouble Scaling XGBoost beyond in-memory training on databricks #10853

gdubs89 opened this issue Sep 27, 2024 · 12 comments

Comments

@gdubs89
Copy link

gdubs89 commented Sep 27, 2024

I'm currently training a binary classifier using a tiny sample of a dataset. The dataset is of size approx 50bn rows per day, and we persist the data for ~60 days, so in theory I could be training this data on up to ~3TN rows of data. Of course that's probably a little excessive, but currently I'm training on a 0.1% sample of a day's data, i.e. approx 50 million rows.

I do this by doing df = spark.read.parquet('s3://bucketname/data.pq').sample(fraction=0.001).toPandas()

I can play with this fraction a little bit, I've pushed it as far as 100 million rows and might be able to push it a bit further, but fundamentally the approach of pulling everything into a massive driver node and training in memory is not scalable and it's never going to allow me to train on 1 billion rows, or 10 billion rows, or more.

To that end, I've been looking for the canonical way to scale xgboost, i.e. do distributed training on databricks. I'm open to doing GPU training but my strong suspicion is that I'm far more memory-limited than compute limited (when training on 50million rows on a single EC2 machine, once the data has been read in and converted to dmatrices, the actual training is a breeze, takes 10-15 minutes), so my instinct is to try distributed CPU training.

Also, I'm using the following bells & whistles which I'll need any distributed training to support

  1. Early stopping
  2. Monotonicity constraints on some input features
  3. Native handling of categoricals (i.e. I'm not going to one-hot encode the data. Either xgboost needs to handle the categoricals internally, or needs to be able to interact with a sparse feature representation)

For the sake of benchmarking, I've prepared the following 4 datasets:

  1. ~600 million rows (being able to train on this would constitute success I think, this is significantly more than I'm ever going to be able to handle on a single big EC2 instance)
  2. ~50 million rows (this is the benchmark, I can train on this relatively comfortably on a single EC2 instance)
  3. ~50 million rows but with about half the number of columns
  4. ~5 million rows (for quick prototyping/testing of syntax)
    (in each case there's a train set, the sizes above give the size of the train set, and then there's a corresponding eval set approx 20% of the size)

I first tried to do this using xgboost-dask. This is the solution I landed on:

import dask.distributed
import dask.dataframe as dd
from xgboost import dask as dxgb
from xgboost import DMatrix as xgb_DMatrix

cluster = dask.distributed.LocalCluster(n_workers=8, threads_per_worker=16, memory_limit='91 GiB')
#was using a cluster of 8 i3.4xlarge, driver is also i3.4xlarge
client = dask.distributed.Client(cluster)

train_ddf = dd.read_parquet("s3://bucketname/train.pq", storage_options={...})
eval_ddf = dd.read_parquet("s3://bucketname/eval.pq", storage_options={...})

categorical_columns = ["X", "Y", "Z"]
features = ["A", "B", "C", "X", "Y", "Z"] # ABC are dense/numerical columns, XYZ are all integer-valued columns, but they should be interpreted as categorical

train_ddf[categorical_columns] = train_ddf[categorical_columns].astype('category').categorize()
eval_ddf[categorical_columns] = eval_ddf[categorical_columns].astype('category').categorize()

dtrain = dxgb.DaskDMatrix(
    client=client,
    data=train_ddf[features],
    label=train_ddf['label'],
    enable_categorical=True
)

dvalid = dxgb.DaskDMatrix(
    client=client,
    data=eval_ddf[features],
    label=eval_ddf['label'],
    enable_categorical=True
)

params = {
        "objective": "binary:logistic",
        "max_depth": 8,
        "learning_rate":0.1,
        'monotone_constraints': {'B': 1},
        'eval_metric':'logloss',
        'tree_method':'hist'
    }

model = dxgb.train(
    client=client,
    params=params,
    dtrain=dtrain,
    num_boost_round=2000,
    early_stopping_rounds=10,
    evals=[(dvalid, 'eval')],
    verbose_eval=1
)

This "worked" when I used dataset 3 described above, but failed when I used dataset 2. I.e. 50 million rows and about ~20 columns worked but 50 million rows and ~50 columns was too much. I was also a little suspicious that dask wasn't utilising the worker nodes. I can't connect to the dask dashboard, I think it's something I'd need to talk to our databricks admin about (I tried to SSH into the driver but my connection timed out, to my best understanding, we'd need to unblock some port), but the databricks cluster dashboard only ever showed the driver node being engaged (in retrospect, it could also possibly have been just one worker being engaged, if this is deemed relevant I can re-run and check). Note that when I do print(client), it's telling me I have 128 threads (8*16, i.e. the number of worker cores) and ~500gb of RAM, but they don't seem to be being engaged by the training process.

If only one machine is being engaged, each of these machines has significantly less memory than the machine I used to train on the 50 million row dataset in memory, so it's not entirely surprising that this fell over at the point where it did. I tested this by firing up a "wonky" cluster, comprised of two rd5.16xlarge workers and a driver of the same type. This worked, but again only one machine was being engaged, so we've not gained anything over just training on a single large machine.

So my suspicion here is that raw dask doesn't play very well with databricks/spark, so instead I decided to try dask-databricks. So basically in the above code, replace

import dask.distributed
cluster = dask.distributed.LocalCluster(n_workers=8, threads_per_worker=16, memory_limit='91 GiB')
client = dask.distributed.Client(cluster)

with

import dask_databricks
client = dask_databricks.get_client()

Same deal, when I print(client), I see the number of threads/amount of memory I expect. However when running on a cluster of 8 i3.4xlarge workers, I have the same scaling issues as previously, I can run on the 50 milliow row dataset with ~20 columns but when I try on the set with ~50 columns, it falls over.

I'm now running a cluster of 12 r5d.8xlarge machines (I should have used r5d.16xlarge like I did before for reproducibility), and the training run for the 50million dataset with 50 columns hasn't technically crashed, but it's been running for 50 minutes now (which, given how big this cluster is compared to the single machine I can train this in memory in in ~10-15 minutes, is bad). When using dask-databricks, I can access the dask dashboard, and while I'm not expert on how to read this, it looks like all CPUs are being used, but only like 1.5/32 cores are being used per worker. This is in line with what the databricks cluster's dashboard is telling me.

I also get a warning

/databricks/python/lib/python3.11/site-packages/distributed/client.py:3361: UserWarning: Sending large graph of size 41.96 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.

which I don't fully know what to do with.

The cluster I'm currently using has at least 3x more RAM and 4x more cores than the largest single EC2 machine, the one that I've been using to train on 50million rows/50 columns (and that I've shown can be pushed a little bit further, at least to 100million rows, maybe to 150m, probably not as far as 200m), and also I would have hoped that when doing distributed training in dask, you'd get much more memory efficient handling of the data than when pulling the data into pandas. And yet I'm not even getting close to being able to replicate the performance I get with a single EC2 instance, which does not seem to bode well for scaling up to 500 million rows and beyond.

Help either with this, or other ways to scale XGBoost beyond in-memory training would be greatly appreciated. I was hoping there would be an accepted way to do distributed xgboost training but alas, it doesn't seem that there is an accepted wisdom on how to do this.

Other notes:

  • I'm using the most recent, 15.4 LTS databricks runtime
  • When I ran this in Vanilla dask, I got verbose training output. When I used dask-databricks, I lost verbosity
@trivialfis
Copy link
Member

Thank you for sharing your experience. Based on your description, you are trying to find some "best practices" for distributed training. I will try to do some more experiments and come up with something more comprehensive like a blog post. But for now, let's start with some checks:

  • Is the data reasonably balanced across workers? The overhead of waiting for one of the imbalanced workers is quite high. XGBoost relies on collective operations like allreduce during tree build, if every synchronization requires waiting, the total training time can be extremely slow. I recently observed on a 48GPU cluster that a 5-second iteration can be stalled to above 20 minutes due to imbalanced data. (2TB data, BTW) Imbalanced dataset also causes OOM errors.
  • The dask dashboard is really helpful, please try to get it configured before doing any serious work. Feel free to explore more about the dashboard, most of the performance issues require the dashboard to debug.
  • Yes, you will hit scaling performance limit eventually, as with all collective-based distributed training.

@gdubs89
Copy link
Author

gdubs89 commented Sep 27, 2024

On your first point, how would you suggest I dig into this a bit more? I've looked at three things:

  1. Databricks cluster dashboard
  2. Dask dashboard (note, when using dask-databricks, this works no problem)
  3. Running htop on a web terminal (presumably this only tells me about the driver node)

Some screenshots from the dask dashboard (I'm new to this so not entirely sure what I'm looking for)
image
image
image
image

Databricks cluster dashboard
image
(they're all hovering around 1.8%, which is less than 1/32)

Htop shows generally one CPU near 100% on the driver node, it occasionally bounces between different CPUs, but never more than one being engaged.

Also note from my previous point, when I used a cluster of 8 i3.4xlarge machines, which should collectively have easily enough memory to handle this data, it crashes. So I have some concerns that a lot/all of the data is being pulled onto the driver/a single worker node, but not sure how to debug this further.

And yes, to your last point, I'm not expecting to be able to scale this infinitely, training on 3 trillion rows of data is clearly a pipe dream. But given I can train on ~100 million rows on a single machine, I'd be very surprised if it weren't feasible to increase by 1, if not 2 orders of magnitude by going distributed.

@trivialfis
Copy link
Member

Maybe starting with observing the CPU/Memory usage across workers, there's a "workers" tag in the dask dashboard. I can't provide a definite answer on why a specific run is slow without running it myself, but in general, it's the data balance issue. On GPU, sometimes I just repartition the data and the problem can be mitigated.

The XGBoost train function in the dashboard task view is actually a lambda function, one for each worker, do you see dask waiting for them to finish?
If you are using the default tree method (hist), consider using the QuantileDMatrix instead of the DMatrix for the training dataset, the former is much more memory efficient.

I'm also working on improving the scaling at the moment and will add some logging facilities to XGBoost to help debug.

@gdubs89
Copy link
Author

gdubs89 commented Sep 29, 2024

OK, I've got the vanilla dask dashboard working in the meantime (previously only had it working when using dask-databricks), so will now provide a detailed comparison of the dashboards in a variety of cases:

Scenario 1: ~50 million rows, 19 columns

First Hardware Setup, I used a "small, fat" setup, i.e. a small fleet of large machines, driver + 3 workers all of type rd5.16xlarge (total 256 cores, ~2TB memory)

dask-databricks: The whole process of reading in the data, categorizing, creating dmatrices and training for 50 epochs takes about 1 minute, of which ~45 seconds are spent training (and 15 seconds on data prep). When using dask-databricks, I don't get any verbose training output, but based on adding some print statements to the progress, here's a screenshot of the dask dashboard at a point when I'm fairly sure training is going on
image
so we're getting decent CPU utilisation, but still nowhere near full utilisation (which would be 6400%). Also, this amount of data is using a tiny fraction of the memory, making me think I should with this cluster be easily able to scale up by an order of magnitude, and would only need to scale the cluster a bit to get to two orders of magnitude

vanilla dask:
image
Pretty similar CPU and memory utilisation, I also get verbose training output. Data prep takes circa 15 seconds, training about 60 seconds.

For benchmark, this dataset also easily fits in memory on a 6d1d.36xlarge (128 cores, 1 GB RAM). In-memory training for 50 epochs (same process of creating dmatrices and fitting via the learning API) takes about 1 minute.

Scenario 2, ~40 million rows, 19 columns

dask-databricks:
Casting to categoricals and categorizing starts to take significant time here (100s). CPU utilisation is very poor
image
Creating DMatices takes almost 3 minutes, CPU utilisation also poor. We do see all workers engaged and more than one CPU, but nowhere near full CPU engagement
image
Training starts to get quite squiffy, screenshot doesn't really capture it well. You tend to see at any one point, that one worker might have quite high (20+ cpus) CPU utilisation and another 1 or 2 workers have close to 0%
image
Memory utilisation is also not very symmetric in that 3 machines are much more engaged than the 4th. Also quite surprising how much more memory this dataset is taking up than the previous one. All of the extra columns are integer rather than dense/float, but perhaps the categorical type is more data hungry.

vanilla-dask:
Worker utilisation while categorizing:
image
Takes about 80s

Then we get into dmatrix creation
image
Dmatrices take almost 4 minutes to create.

Things get a bit squiffy when we get to training. The CPU utilisation fluctuations a lot, so a screenshot isn't that helpful, but in one I did take here, we see one of the workers close to maxed out (29/32 CPUs fully engaged) but two of them doing almost nothing. So indeed some hypothesis of imbalanced worker utilisation seems fair
image
taken at another point in training
image

Training, once it finally started, took about 6 minutes to complete

In-memory training goes off without a hitch, the actual training stage takes 90s. Categorizing+dmatrix creation takes about 40s, obviously the reading from spark into pandas stage takes a bit longer (like 3 minutes)

I also tried using QuantileDmatrices for the larger dataset, just in the dask-databricks setup. QuantileDmatrices took about 4 minutes to create again.

Here's a training screenshot where it does look like 2 machines are being well-utilised and one is doing basically nothing, but here's another where it looks quite poor
image
image
If you watch the dashboard for a while, it's a little hard to discern what's going on. Basically fluctuation between fairly high utilisation across the board, to times when there's low utilisation across the board, to times when some are highly utilised and some not. Training took 8 minutes

I should also add, that in the larger dataset case, both on vanilla dask and dask-databricks, the whole thing was pretty shonky and inconsistent. It probably fails around 2/3 of the time for various reasons. When it fails, I just detach and re-attach the spark cluster, start a fresh dask cluster, and just run the code again, nothing else changed. Sometimes it works, sometimes it doesn't. One failure mode (this is easier to ascertain in vanilla dask than dask-databricks as you get more output) seems to be that DMatrix creation causes a worker to be terminated and restarted, which in turn means that training has the wrong IP addresses for the workers so fails. But I've also had more esoteric messages, and one case where nothing went wrong but training losses started to blow up, which is very odd indeed.

Finally, in the vanilla dask setup, I tried scaling this up to the ~600million row dataset (full 49 columns). This didn't even make it part the read-in and partition the data stage, it's been 25 minutes and I'm getting almost no CPU utilisation, so I think I'm going to shut this off
image

In conclusion, for this task which can still be done pretty comfortably in-memory on a single big EC2 machine, when using Dask I find:

  • The process is inconsistent, frequently fails, sometimes works, little rhyme or reason as to why
  • DMatrix creation seems to be a heavy process that isn't making much use of parallelisation
  • When it comes to training, CPU utilisation certainly is far from 100% across the board
  • Memory utilisation seems low, meaning I'd expect to be able to scale this up to much larger datasets and worst case it takes forever. But that does not appear to be the case
  • Despite my clusters having significantly more firepower than the single EC2 machine I use for in-memory training, distributed training on this dataset is significantly slower than doing it in memory (not to mention much less reliable)
  • When I try to scale up to an even larger dataset, really not clear what's going on. A simple dd.read_parquet(..).repartition()doesn't seem to work, which obviously isn't an xgboost problem per se admittedly, but perhaps plays into my original post of "still looking for the canonical way to scale xgboost beyond in-memory training"

I will also try to do all of the above with differently shaped clusters which use more and smaller workers, will post updates tomorrow.

@gdubs89
Copy link
Author

gdubs89 commented Sep 30, 2024

Same analysis using 12 r4.4xlarge workers (+ same driver). This gives us about 200 cores and 1.5TB of RAM, but with a much "more distributed" setup

Vanilla Dask

Process takes 5 minutes end to end.
image
CPU utilisation is extremely low, as is memory utilisation (again making it rather implausible that I'm gonna get any OOM error when scaling up to more columns or even scaling up the number of rows by an order of magnitude)

On the larger dataset (i.e. same number of rows but more columns, where all the additional columns are of categorical type), the first time, it failed at the dmatrix creation stage. The second time, it managed to create the dmatrices, but I'm still getting some weird output that suggests it failed to distributed them quite as intended
image
As before, when it gets to training, we see highly heterogenous distribution of CPU utilisation
image

The training then results in the loss starting to explode before it fails
image
image
(I've partially redacted that output as it contained some information about the training data)

Dask Databricks
Noticeably better performance on the smaller dataset. End to end training the read-in + train process took 35s, and CPU utilisation, while far from full, was much better
image

When we get to the bigger dataset, again things are just far more variable than with the smaller one
image
and a screenshot doesn't fully capture it, but what is clear is that

  • Overall CPU utilisation is lower
  • It fluctuates a lot more

However, end to end, the process completed in 4 minutes, and the actual training only took approx 1 of those minutes.
The memory utilisation again is so low, that I figured why not try this on a dataset that's 10x larger than this.

As before, when I try this, it seems to just get stuck at data read-in/repartition stage, with almost zero CPU utilisation
image

So in summary:

  • When I go to this more horizontally scaled dask cluster, things really stop working in vanilla dask.
  • dask-databricks still does OK, potentially outperforms in-memory training by a little bit. But CPU utilisation is still pretty asymmetric
  • despite memory utilisation being so low on even the larger dataset, when I try to scale to a dataset that's 10x the size, I can't get past the data read-in/repartition stage

@trivialfis
Copy link
Member

trivialfis commented Sep 30, 2024

This is a combination of dask data balancing issues, dask memory usage and data spilling issues (the read/partition), XGBoost training performance issues, and optimization in Databricks. Let's get some of the easy issues resolved first.

  • If the output is not printed as expected, it might be caused by a buffered stderr/stdout. I recently ran into an issue that a worker doesn't flush the output until it's killed, accumulating ~600M of logging! There's no workaround in XGBoost at the moment, but you can define your printing callbacks that flush the output.
  • The QDM is primarily used to reduce memory usage during DMatrix construction, and performance is more or less a bonus. From the screenshot, I would suggest it's doing what's expected to do. (reducing memory)
  • The DMatrix construction for the dask interface makes sure all prior lazy computations in dask are materialized, as a result, sometimes you get completely unrelated errors in the DMatrix constructor. A way to separate the processes is using client.persist and distributed.wait before DMatrix.
  • PySpark takes (significantly) longer to construct the DMatrix than dask as it has this weird design that needs to construct and iterate through n_samples numpy arrays instead of a few partitions. On the other hand, the data distribution for PySpark is more stable and well-balanced.

I would suggest that the first thing that needs to be done is to ensure the data is well-balanced based on these hints from the screenshots:

  • The pipeline ran into empty workers, meaning these workers were starving with no data on it.
  • The CPU usage is flaky, which is a sign of workers waiting for each other.
  • OOM error is flaky, which indicates from time to time the data is being concentrated into a single worker.

cc @fjetter @mrocklin for awareness.

@gdubs89
Copy link
Author

gdubs89 commented Sep 30, 2024

Wow OK, I think I've mostly solved the issue of it just "not doing anything" when I try to scale to data sizes which cannot be trained in memory.

The problem was that the raw dataset is ~50billion rows and is spread over ~120k partitions. I am doing a downsampling in pyspark (before starting the dask cluster) and writing a downsampled ~500 million row dataset to disk. Little did I know that spark was retaining the original partitioning and thus writing this 500 million row dataset to disk over 120k partitions, which is A) a lot of partitions and B) extremely fragmented. Either way, it seems this just totally overwhelmed dask.

If I do a repartition in spark before writing to disk, dask can handle it and I now seem to be successfully training on 500 million rows of data, only using ~15% of the RAM on the aforementioned cluster, so I'm gonna scale this up to ~2billion rows and see how we go. So I've successfully moved beyond a data regime which could be trained in-memory 🚀

CPU utilisation is still a bit heterogenous at times, but you do get moments of beauty like this one
image
if you watch the dashboard for long enough👌

Will create some somewhat larger datasets and see how far I can scale this paradigm and report back.

In the meantime, one question around partitioning of the train and eval sets. Is it recommended/necessary for the train and eval sets to have the same number of partitions? I'm a little clear on what's done with each partition under the hood. My concern would be that if you don't have the same number of partitions (i.e. use fewer for the eval set because it's smaller), one or more of the workers might not get an eval set. When I originally had the eval set partitioned with fewer partitions than the train set (in ratio to how big they are), I got a rather difficult to parse error about empty dmatrices.

@trivialfis
Copy link
Member

trivialfis commented Sep 30, 2024

Excellent progress! No, it's not required to have the same number of partitions. Preferably both of them have partitions for all workers (no worker is being starved for either dataset).

XGBoost takes what's given, it doesn't move data or anything. Internally, it just iterates over partitions for each dataset independently. As long as partitions within each dataset are aligned (comes from the same dataframe,for instance), then it's fine.

@gdubs89
Copy link
Author

gdubs89 commented Oct 1, 2024

So I have managed to scale to 2 billion rows, but this does seem to be the point where it started to struggle. Dask started to complain about the size of the graph
image
What's interesting, is that the graph seems to be getting larger. The origin of this is probably that I'm doing some sequential training where I increase the learning rate (to avoid the situation where you end up training for 800 rounds before early-stopping, but the last 500 rounds are only giving very marginal improvements). The error in fact links to here . While this isn't what I'm doing, I wonder whether the fact that I do have a for loop of learning rates and within the loop I'm doing model = dxgb.train(..., xgb_model =model['booster']) is causing some highly nested dask graph to be built, which in turn is probably making the later training rounds less efficient.

Any ideas how to mitigate this? [edit: one idea I had to mitigate this was to write the model to disk and then load it back in, as that might break the graph. And it did, in the sense that the warning now said my graph was only 13Mb compared to 54mb previously (and it kicked in after more loops of the training procedure), but it doesn't seem to have solved the problem, as in I'm still getting a warning about large dask graphs]

Second edit: Interestingly, increasing the max_depth seems to significantly increase the size of the graph in the warning

@trivialfis
Copy link
Member

Any ideas how to mitigate this

My first guess is the booster object is too large, and dask complains about it. If that's the case, the warning is harmless, and there's no need to work around it. We have to transfer the booster somehow. Splitting it up can disable the warning but creates no benefit. If that's not the cause, please share your code. Or at least a gist of the code and the hyper-parameters you use.

@gdubs89
Copy link
Author

gdubs89 commented Oct 3, 2024

Code is unchanged from original post, other than that I've created a loop to increase the learning rate:

lr_rounds = {0.1: 100, 0.2:100, 0.4:100, 0.8:100_000}#when it gets to the final learning rate, want the number of boosting rounds to be functionally infinite and let early stopping determine how long we train for

for LR in sorted(list(lr_rounds.keys())):
   params['learning_rate'] = LR 

   if model is None:
       model = dxgb.train(
              client=client,
              params=params,
              dtrain=dtrain,
              num_boost_round=lr_rounds[LR],
              early_stopping_rounds=10,
              evals=[(dvalid, 'eval')]
              )
   else:
       model = dxgb.train(
              client=client,
              params=params,
              dtrain=dtrain,
              num_boost_round=lr_rounds[LR],
              early_stopping_rounds=10,
              evals=[(dvalid, 'eval')],
              xgb_model=model
              )

(I added a few more bells and whistles to make sure that if it early stops for one of the learning rates before 0.8, that the next boosting round starts from the optimal model rather than the final one, but I don't think that should affect anything)

Broadly speaking this won't train for more than 400 rounds for the data I have, and I'm exploring maxdepths from ~8-16. So 400 trees of depth 16 are not trivial in terms of memory consumption, but also still a fraction of the data volume being handled on each worker.

I'm happy to accept this as harmless if you don't think this is a problem. I haven't had any more problems with training failing or being erratic.

@trivialfis
Copy link
Member

Thank you for sharing! The code looks fine.

but also still a fraction of the data volume being handled on each worker

Dask doesn't usually send large objects across workers, which can hurt performance due to network constraints. But gathering a single booster for the client process should be fine.

Feel free to close the issue if you have no further questions. ;-)

@gdubs89 gdubs89 closed this as completed Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants