Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
AI: GraphSAGE remove the need of the original dataset for hemogeneous…
Browse files Browse the repository at this point in the history
… distributed (#1613)
  • Loading branch information
jerrychenhf authored Jul 1, 2023
1 parent 54f7b8c commit d6d0001
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from contextlib import contextmanager

import numpy as np
import torch
import dgl

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sklearn.metrics import roc_auc_score, accuracy_score, average_precision_score

from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model. \
homogeneous.distributed.utils import get_eids_from_mask, save_node_embeddings
homogeneous.distributed.utils import get_eids_from_mask, save_node_embeddings, get_eids_mask
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.\
homogeneous.utils import get_reverse_eids, get_eids_mask_full_padded
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.utils import parse_reverse_edges
Expand Down Expand Up @@ -67,24 +67,24 @@ def train(self, graph):

# because edge_split and shuffle mapping needs the total number of edges
# we need to padding the mask to the total number of edges
train_mask_padded = get_eids_mask_full_padded(
graph, "train_mask", reverse_etypes)
train_mask = get_eids_mask(
graph, "train_mask", emap, reverse_etypes)
shuffled_val_eids = get_eids_from_mask(
graph, "val_mask", emap, reverse_etypes)
shuffled_test_eids = get_eids_from_mask(
graph, "test_mask", emap, reverse_etypes)

shuffled_reverse_eids = None
if args.exclude_reverse_edges and reverse_etypes:
# The i-th element indicates the ID of the i-th edges reverse edge.
# The i-th element indicates the ID of the i-th edge's reverse edge.
reverse_eids = get_reverse_eids(graph, reverse_etypes)
# Mapping to shuffled id
shuffled_reverse_eids = torch.zeros_like(reverse_eids)
shuffled_reverse_eids[emap] = reverse_eids

# The ids here are original id
train_eids = dgl.distributed.edge_split(
train_mask_padded,
train_mask,
g.get_partition_book(),
force_even=True,
)
Expand Down Expand Up @@ -116,7 +116,7 @@ def _create_data_loaders(
test_sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
# Create dataloader
# "reverse_id" exclude not only edges in minibatch but their reverse edges according to reverse_eids mapping
# reverse_eids - The i-th element indicates the ID of the i-th edges reverse edge.
# reverse_eids - The i-th element indicates the ID of the i-th edge's reverse edge.
exclude = "reverse_id" if reverse_eids is not None else None
train_dataloader = dgl.dataloading.DistEdgeDataLoader(
g,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
get_eids_mask_full_padded


def get_eids_from_mask(g, mask_name, mapping, reverse_etypes=None):
def get_eids_mask(g, mask_name, mapping, reverse_etypes=None):
num_edges = g.num_edges()
mask_padded = get_eids_mask_full_padded(g, mask_name, reverse_etypes)
shuffled_mask = torch.zeros((num_edges,), dtype=torch.bool)
shuffled_mask[mapping] = mask_padded
return shuffled_mask


def get_eids_from_mask(g, mask_name, mapping, reverse_etypes=None):
shuffled_mask = get_eids_mask(g, mask_name, mapping, reverse_etypes)
return torch.nonzero(shuffled_mask, as_tuple=False).squeeze()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
homogeneous.transductive.model import TransductiveGraphSAGEModel
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.\
homogeneous.inductive.model import InductiveGraphSAGEModel
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.utils import get_common_node_features, \
get_common_edge_features


def predict(dataset_dir, model_file,
Expand All @@ -39,9 +41,11 @@ def predict(dataset_dir, model_file,
dataset = dgl.data.CSVDataset(dataset_dir, force_reload=False)
graph = dataset[0] # only one graph

# Shall we force convert or upon the user to decide?
ndata = [node_feature] if inductive and node_feature else None
g = dgl.to_homogeneous(graph, ndata=ndata)
# Shall we force convert or upon the user to decide
# include the features that all the nodes have and all the edge have
ndata = get_common_node_features(graph)
edata = get_common_edge_features(graph)
g = dgl.to_homogeneous(graph, ndata=ndata, edata=edata)

# create model
if inductive:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model. \
homogeneous.utils import get_eids_from_mask, _create_edge_prediction_sampler, get_reverse_eids
from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.utils import \
parse_reverse_edges
parse_reverse_edges, get_common_node_features, get_common_edge_features


class Trainer:
Expand Down Expand Up @@ -61,8 +61,9 @@ def train(self, graph, device):
# The i-th element indicates the ID of the i-th edge’s reverse edge.
reverse_eids = get_reverse_eids(graph, reverse_etypes)

ndata = [args.node_feature] if args.inductive and args.node_feature else None
g = dgl.to_homogeneous(graph, ndata=ndata)
ndata = get_common_node_features(graph)
edata = get_common_edge_features(graph)
g = dgl.to_homogeneous(graph, ndata=ndata, edata=edata)
g = g.to("cuda" if args.mode == "gpu" else "cpu")
self.graph = g

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,24 @@ def exclude_reverse_edge_types(etypes, reverse_etypes):
if reverse_edge_type:
exclude.add(reverse_edge_type)
return valid


def get_common_node_features(g):
return get_common_features(g, True)


def get_common_edge_features(g):
return get_common_features(g, False)


def get_common_features(g, node_or_edge):
types = g.ntypes() if node_or_edge else g.etypes()
y = None
for t in types:
feats = g.nodes[t].data.keys() if node_or_edge else g.edges[t].data.keys()
if y is None:
y = set(feats)
else:
# intersect two sets
y = y.intersection(set(feats))
return list(y) if y else None
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import torch
import dgl

from cloudtik.runtime.ai.modeling.graph_modeling.graph_sage.modeling.model.utils import get_common_node_features, \
get_common_edge_features


def partition_graph(
dataset_dir, output_dir, graph_name,
num_parts, num_hops,
heterogeneous=False,
inductive=False,
node_feature=None):
heterogeneous=False):
print("Random seed used in partitioning")
dgl.random.seed(1)

Expand All @@ -45,8 +46,9 @@ def partition_graph(

# convert graph to homogeneous if needed
if not heterogeneous:
ndata = [node_feature] if inductive and node_feature else None
g = dgl.to_homogeneous(graph, ndata=ndata)
ndata = get_common_node_features(graph)
edata = get_common_edge_features(graph)
g = dgl.to_homogeneous(graph, ndata=ndata, edata=edata)
print(g)
else:
g = graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ def _partition_graph(args):
graph_name=args.graph_name,
num_parts=args.num_parts,
num_hops=args.num_hops,
heterogeneous=args.heterogeneous,
inductive=args.inductive,
node_feature=args.node_feature
heterogeneous=args.heterogeneous
)


Expand Down

0 comments on commit d6d0001

Please sign in to comment.