-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trouble Scaling XGBoost beyond in-memory training on databricks #10853
Comments
Thank you for sharing your experience. Based on your description, you are trying to find some "best practices" for distributed training. I will try to do some more experiments and come up with something more comprehensive like a blog post. But for now, let's start with some checks:
|
Maybe starting with observing the CPU/Memory usage across workers, there's a "workers" tag in the dask dashboard. I can't provide a definite answer on why a specific run is slow without running it myself, but in general, it's the data balance issue. On GPU, sometimes I just repartition the data and the problem can be mitigated. The XGBoost train function in the dashboard task view is actually a lambda function, one for each worker, do you see dask waiting for them to finish? I'm also working on improving the scaling at the moment and will add some logging facilities to XGBoost to help debug. |
This is a combination of dask data balancing issues, dask memory usage and data spilling issues (the read/partition), XGBoost training performance issues, and optimization in Databricks. Let's get some of the easy issues resolved first.
I would suggest that the first thing that needs to be done is to ensure the data is well-balanced based on these hints from the screenshots:
|
Excellent progress! No, it's not required to have the same number of partitions. Preferably both of them have partitions for all workers (no worker is being starved for either dataset). XGBoost takes what's given, it doesn't move data or anything. Internally, it just iterates over partitions for each dataset independently. As long as partitions within each dataset are aligned (comes from the same dataframe,for instance), then it's fine. |
So I have managed to scale to 2 billion rows, but this does seem to be the point where it started to struggle. Dask started to complain about the size of the graph Any ideas how to mitigate this? [edit: one idea I had to mitigate this was to write the model to disk and then load it back in, as that might break the graph. And it did, in the sense that the warning now said my graph was only 13Mb compared to 54mb previously (and it kicked in after more loops of the training procedure), but it doesn't seem to have solved the problem, as in I'm still getting a warning about large dask graphs] Second edit: Interestingly, increasing the max_depth seems to significantly increase the size of the graph in the warning |
My first guess is the |
Code is unchanged from original post, other than that I've created a loop to increase the learning rate:
(I added a few more bells and whistles to make sure that if it early stops for one of the learning rates before 0.8, that the next boosting round starts from the optimal model rather than the final one, but I don't think that should affect anything) Broadly speaking this won't train for more than 400 rounds for the data I have, and I'm exploring maxdepths from ~8-16. So 400 trees of depth 16 are not trivial in terms of memory consumption, but also still a fraction of the data volume being handled on each worker. I'm happy to accept this as harmless if you don't think this is a problem. I haven't had any more problems with training failing or being erratic. |
Thank you for sharing! The code looks fine.
Dask doesn't usually send large objects across workers, which can hurt performance due to network constraints. But gathering a single booster for the client process should be fine. Feel free to close the issue if you have no further questions. ;-) |
I'm currently training a binary classifier using a tiny sample of a dataset. The dataset is of size approx 50bn rows per day, and we persist the data for ~60 days, so in theory I could be training this data on up to ~3TN rows of data. Of course that's probably a little excessive, but currently I'm training on a 0.1% sample of a day's data, i.e. approx 50 million rows.
I do this by doing
df = spark.read.parquet('s3://bucketname/data.pq').sample(fraction=0.001).toPandas()
I can play with this fraction a little bit, I've pushed it as far as 100 million rows and might be able to push it a bit further, but fundamentally the approach of pulling everything into a massive driver node and training in memory is not scalable and it's never going to allow me to train on 1 billion rows, or 10 billion rows, or more.
To that end, I've been looking for the canonical way to scale xgboost, i.e. do distributed training on databricks. I'm open to doing GPU training but my strong suspicion is that I'm far more memory-limited than compute limited (when training on 50million rows on a single EC2 machine, once the data has been read in and converted to dmatrices, the actual training is a breeze, takes 10-15 minutes), so my instinct is to try distributed CPU training.
Also, I'm using the following bells & whistles which I'll need any distributed training to support
For the sake of benchmarking, I've prepared the following 4 datasets:
(in each case there's a train set, the sizes above give the size of the train set, and then there's a corresponding eval set approx 20% of the size)
I first tried to do this using xgboost-dask. This is the solution I landed on:
This "worked" when I used dataset 3 described above, but failed when I used dataset 2. I.e. 50 million rows and about ~20 columns worked but 50 million rows and ~50 columns was too much. I was also a little suspicious that dask wasn't utilising the worker nodes. I can't connect to the dask dashboard, I think it's something I'd need to talk to our databricks admin about (I tried to SSH into the driver but my connection timed out, to my best understanding, we'd need to unblock some port), but the databricks cluster dashboard only ever showed the driver node being engaged (in retrospect, it could also possibly have been just one worker being engaged, if this is deemed relevant I can re-run and check). Note that when I do
print(client)
, it's telling me I have 128 threads (8*16, i.e. the number of worker cores) and ~500gb of RAM, but they don't seem to be being engaged by the training process.If only one machine is being engaged, each of these machines has significantly less memory than the machine I used to train on the 50 million row dataset in memory, so it's not entirely surprising that this fell over at the point where it did. I tested this by firing up a "wonky" cluster, comprised of two
rd5.16xlarge
workers and a driver of the same type. This worked, but again only one machine was being engaged, so we've not gained anything over just training on a single large machine.So my suspicion here is that raw dask doesn't play very well with databricks/spark, so instead I decided to try
dask-databricks
. So basically in the above code, replacewith
Same deal, when I
print(client)
, I see the number of threads/amount of memory I expect. However when running on a cluster of 8i3.4xlarge
workers, I have the same scaling issues as previously, I can run on the 50 milliow row dataset with ~20 columns but when I try on the set with ~50 columns, it falls over.I'm now running a cluster of 12
r5d.8xlarge
machines (I should have usedr5d.16xlarge
like I did before for reproducibility), and the training run for the 50million dataset with 50 columns hasn't technically crashed, but it's been running for 50 minutes now (which, given how big this cluster is compared to the single machine I can train this in memory in in ~10-15 minutes, is bad). When using dask-databricks, I can access the dask dashboard, and while I'm not expert on how to read this, it looks like all CPUs are being used, but only like 1.5/32 cores are being used per worker. This is in line with what the databricks cluster's dashboard is telling me.I also get a warning
which I don't fully know what to do with.
The cluster I'm currently using has at least 3x more RAM and 4x more cores than the largest single EC2 machine, the one that I've been using to train on 50million rows/50 columns (and that I've shown can be pushed a little bit further, at least to 100million rows, maybe to 150m, probably not as far as 200m), and also I would have hoped that when doing distributed training in dask, you'd get much more memory efficient handling of the data than when pulling the data into pandas. And yet I'm not even getting close to being able to replicate the performance I get with a single EC2 instance, which does not seem to bode well for scaling up to 500 million rows and beyond.
Help either with this, or other ways to scale XGBoost beyond in-memory training would be greatly appreciated. I was hoping there would be an accepted way to do distributed xgboost training but alas, it doesn't seem that there is an accepted wisdom on how to do this.
Other notes:
The text was updated successfully, but these errors were encountered: