Skip to content

Commit

Permalink
add distributed mode with dask
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Razdoburdin committed Sep 9, 2024
1 parent bba6aa7 commit 51e2725
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 6 deletions.
36 changes: 36 additions & 0 deletions plugin/sycl/tree/hist_row_adder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,42 @@ class BatchHistRowsAdder: public HistRowsAdder<GradientSumT> {
}
};


template <typename GradientSumT>
class DistributedHistRowsAdder: public HistRowsAdder<GradientSumT> {
public:
void AddHistRows(HistUpdater<GradientSumT>* builder,
std::vector<int>* sync_ids, RegTree *p_tree) override {
builder->builder_monitor_.Start("AddHistRows");
const size_t explicit_size = builder->nodes_for_explicit_hist_build_.size();
const size_t subtaction_size = builder->nodes_for_subtraction_trick_.size();
std::vector<int> merged_node_ids(explicit_size + subtaction_size);
for (size_t i = 0; i < explicit_size; ++i) {
merged_node_ids[i] = builder->nodes_for_explicit_hist_build_[i].nid;
}
for (size_t i = 0; i < subtaction_size; ++i) {
merged_node_ids[explicit_size + i] =
builder->nodes_for_subtraction_trick_[i].nid;
}
std::sort(merged_node_ids.begin(), merged_node_ids.end());
sync_ids->clear();
for (auto const& nid : merged_node_ids) {
if ((*p_tree)[nid].IsLeftChild()) {
builder->hist_.AddHistRow(nid);
builder->hist_local_worker_.AddHistRow(nid);
sync_ids->push_back(nid);
}
}
for (auto const& nid : merged_node_ids) {
if (!((*p_tree)[nid].IsLeftChild())) {
builder->hist_.AddHistRow(nid);
builder->hist_local_worker_.AddHistRow(nid);
}
}
builder->builder_monitor_.Stop("AddHistRows");
}
};

} // namespace tree
} // namespace sycl
} // namespace xgboost
Expand Down
62 changes: 62 additions & 0 deletions plugin/sycl/tree/hist_synchronizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,68 @@ class BatchHistSynchronizer: public HistSynchronizer<GradientSumT> {
std::vector<::sycl::event> hist_sync_events_;
};

template <typename GradientSumT>
class DistributedHistSynchronizer: public HistSynchronizer<GradientSumT> {
public:
void SyncHistograms(HistUpdater<GradientSumT>* builder,
const std::vector<int>& sync_ids,
RegTree *p_tree) override {
builder->builder_monitor_.Start("SyncHistograms");
const size_t nbins = builder->hist_builder_.GetNumBins();
for (int node = 0; node < builder->nodes_for_explicit_hist_build_.size(); node++) {
const auto entry = builder->nodes_for_explicit_hist_build_[node];
auto& this_hist = builder->hist_[entry.nid];
// // Store posible parent node
auto& this_local = builder->hist_local_worker_[entry.nid];
common::CopyHist(builder->qu_, &this_local, this_hist, nbins);

if (!(*p_tree)[entry.nid].IsRoot()) {
const size_t parent_id = (*p_tree)[entry.nid].Parent();
auto sibling_nid = entry.GetSiblingId(p_tree, parent_id);
auto& parent_hist = builder->hist_local_worker_[parent_id];

auto& sibling_hist = builder->hist_[sibling_nid];
common::SubtractionHist(builder->qu_, &sibling_hist, parent_hist,
this_hist, nbins, ::sycl::event());
builder->qu_.wait_and_throw();
// Store posible parent node
auto& sibling_local = builder->hist_local_worker_[sibling_nid];
common::CopyHist(builder->qu_, &sibling_local, sibling_hist, nbins);
}
}
builder->ReduceHists(sync_ids, nbins);

ParallelSubtractionHist(builder, builder->nodes_for_explicit_hist_build_, p_tree);
ParallelSubtractionHist(builder, builder->nodes_for_subtraction_trick_, p_tree);

builder->builder_monitor_.Stop("SyncHistograms");
}

void ParallelSubtractionHist(HistUpdater<GradientSumT>* builder,
const std::vector<ExpandEntry>& nodes,
const RegTree * p_tree) {
const size_t nbins = builder->hist_builder_.GetNumBins();
for (int node = 0; node < nodes.size(); node++) {
const auto entry = nodes[node];
if (!((*p_tree)[entry.nid].IsLeftChild())) {
auto& this_hist = builder->hist_[entry.nid];

if (!(*p_tree)[entry.nid].IsRoot()) {
const size_t parent_id = (*p_tree)[entry.nid].Parent();
auto& parent_hist = builder->hist_[parent_id];
auto& sibling_hist = builder->hist_[entry.GetSiblingId(p_tree, parent_id)];
common::SubtractionHist(builder->qu_, &this_hist, parent_hist,
sibling_hist, nbins, ::sycl::event());
builder->qu_.wait_and_throw();
}
}
}
}

private:
std::vector<::sycl::event> hist_sync_events_;
};

} // namespace tree
} // namespace sycl
} // namespace xgboost
Expand Down
3 changes: 3 additions & 0 deletions plugin/sycl/tree/hist_updater.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ class HistUpdater {

protected:
friend class BatchHistSynchronizer<GradientSumT>;
friend class DistributedHistSynchronizer<GradientSumT>;

friend class BatchHistRowsAdder<GradientSumT>;
friend class DistributedHistRowsAdder<GradientSumT>;

struct SplitQuery {
bst_node_t nid;
Expand Down
3 changes: 2 additions & 1 deletion plugin/sycl/tree/updater_quantile_hist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void QuantileHistMaker::SetPimpl(std::unique_ptr<HistUpdater<GradientSumT>>* pim
param_,
int_constraint_, dmat));
if (collective::IsDistributed()) {
LOG(FATAL) << "Distributed mode is not yet upstreamed for sycl";
(*pimpl)->SetHistSynchronizer(new DistributedHistSynchronizer<GradientSumT>());
(*pimpl)->SetHistRowsAdder(new DistributedHistRowsAdder<GradientSumT>());
} else {
(*pimpl)->SetHistSynchronizer(new BatchHistSynchronizer<GradientSumT>());
(*pimpl)->SetHistRowsAdder(new BatchHistRowsAdder<GradientSumT>());
Expand Down
11 changes: 6 additions & 5 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,12 @@ def _check_distributed_params(kwargs: Dict[str, Any]) -> None:
raise TypeError(msg)

if device and device.find(":") != -1:
raise ValueError(
"Distributed training doesn't support selecting device ordinal as GPUs are"
" managed by the distributed frameworks. use `device=cuda` or `device=gpu`"
" instead."
)
if device != "sycl:gpu":
raise ValueError(
"Distributed training doesn't support selecting device ordinal as GPUs are"
" managed by the distributed frameworks. use `device=cuda` or `device=gpu`"
" instead."
)

if kwargs.get("booster", None) == "gblinear":
raise NotImplementedError(
Expand Down
42 changes: 42 additions & 0 deletions tests/python-sycl/test_sycl_simple_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from xgboost import dask as dxgb
from xgboost import testing as tm

from hypothesis import given, strategies, assume, settings, note

import dask.array as da
import dask.distributed


def train_result(client, param, dtrain, num_rounds):
result = dxgb.train(
client,
param,
dtrain,
num_rounds,
verbose_eval=False,
evals=[(dtrain, "train")],
)
return result


class TestSYCLDask:
# The simplest test verify only one node training.
def test_simple(self):
cluster = dask.distributed.LocalCluster(n_workers=1)
client = dask.distributed.Client(cluster)

param = {}
param["tree_method"] = "hist"
param["device"] = "sycl"
param["verbosity"] = 0
param["objective"] = "reg:squarederror"

# X and y must be Dask dataframes or arrays
num_obs = 1e4
num_features = 20
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))
dtrain = dxgb.DaskDMatrix(client, X, y)

result = train_result(client, param, dtrain, 10)
assert tm.non_increasing(result["history"]["train"]["rmse"])

0 comments on commit 51e2725

Please sign in to comment.