Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: add eges model #129

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ jobs:
TEST_DEVICES: ""
run: |
source activate /home/admin/tf12_py2/
if [ ! -e "/tmp/easyrec_data_20220113.tar.gz" ]
then
wget https://easyrec.oss-cn-beijing.aliyuncs.com/data/easyrec_data_20220113.tar.gz -O /tmp/easyrec_data_20220113.tar.gz
fi
tar -zvxf /tmp/easyrec_data_20220113.tar.gz
source scripts/ci_test.sh
- name: LabelAndComment
env:
Expand Down
Binary file added docs/images/models/eges_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/models/eges_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
170 changes: 170 additions & 0 deletions docs/source/models/eges.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# EGES

### 简介

图i2i召回模型, 通过在图上随机游走生成随机路径,然后在路径上使用skip-gram算法进行训练.
![eges_1](../../images/models/eges_1.png)
![eges_2](../../images/models/eges_2.png)

### 配置说明
#### 输入配置
```protobuf
graph_train_input {
node_inputs: 'data/test/graph/taobao_data/ad_feature_5k.csv'
edge_inputs: 'data/test/graph/taobao_data/graph_edges.txt'
}

graph_eval_input {
node_inputs: 'data/test/graph/taobao_data/ad_feature_5k.csv'
edge_inputs: 'data/test/graph/taobao_data/graph_edges.txt'
}
```
- node_inputs: 图中的节点为item, 这个输入给出了item的节点id, weight(采样时使用)和feature(side info)信息
- 示例输入如下:
```
id:int64 weight:float feature:string
521512 1 521512,4282,173332,237,NULL,298.0
476210 1 476210,4292,418411,515,377957,249.0
646682 1 646682,7205,365036,676,321803,9.9
...
```
- edge_inputs: 图中的边描述item在同一个session共现的频率
- 示例输入如下:
```
src_id:int64 dst_id:int64 weight:float
565248 565248 100
565248 786433 2
565248 638980 20
...
```
- node_inputs和edge_inputs在MaxCompute上的输入类似,每一列存放成一个column
- node表包含3列:id, weight, feature
- edge表包含3列:src_id, dst_id, weight
- int64对应的类型是bigint
- float对应的类型是double
- string对应的类型是string

#### 数据配置
```protobuf
data_config {
input_fields {
input_name: 'adgroup_id'
input_type: STRING
}
input_fields {
input_name: 'cate_id'
input_type: STRING
}
input_fields {
input_name: 'campaign_id'
input_type: STRING
}
input_fields {
input_name: 'customer'
input_type: STRING
}
input_fields {
input_name: 'brand'
input_type: STRING
}
input_fields {
input_name: 'price'
input_type: DOUBLE
}

graph_config {
random_walk_len: 10
window_size: 5
negative_num: 10
directed: true
}

batch_size: 64
num_epochs: 2
prefetch_size: 32
input_type: GraphInput
}
```

- input_fields:
- input_name: 输入特征名, 对应odps表的字段名或者csv文件的header名(如果没有header,按照字段顺序一一对应)
- input_type: 数据类型, STRING, DOUBLE, INT32, INT64, 不设置默认为STRING
- graph_config: 图上随机游走相关的参数
- walk_len: 随机游走的长度
- window_size: skip-gram的窗口大小
- negative_num: 负采样时每个正样本对应的负样本数目
- directed: 是否是有向图, 默认是false
- batch_size: 随机游走起始节点的数量
- num_epochs: 数据过多少遍
- prefetch_size: 数据预取的batch_size数目
- input_type: 输入数据格式,针对图类型的算法默认为GraphInput

#### 特征配置
```protobuf
feature_config: {
features: {
input_names: 'adgroup_id'
feature_type: IdFeature
embedding_dim: 16
hash_bucket_size: 100000
}
features: {
input_names: 'cate_id'
feature_type: IdFeature
embedding_dim: 16
hash_bucket_size: 10000
}
...
features: {
input_names: 'brand'
feature_type: IdFeature
embedding_dim: 16
hash_bucket_size: 100000
}
features: {
input_names: 'price'
feature_type: RawFeature
}
}
```
- features.input_names: 特征的输入,对应data_config.input_fields.input_name

#### 模型配置
```protobuf
model_config:{
model_class: "EGES"

feature_groups: {
group_name: "item"
feature_names: 'adgroup_id'
feature_names: 'cate_id'
feature_names: 'campaign_id'
feature_names: 'customer'
feature_names: 'brand'
feature_names: 'price'
wide_deep:DEEP
}
eges {
dnn {
hidden_units: [256, 128, 64, 32]
}
l2_regularization: 1e-6
}
loss_type: SOFTMAX_CROSS_ENTROPY
embedding_regularization: 0.0

group_as_scope: true
}
```
- model_class: 默认为EGES
- feature_groups: 特征组,需要配置一个特征组, group_name为item,不能变
- feature_names: 对应data_config.features.input_names[0](或者feature_name, 如果有设置)
- eges: dnn为特征变换mlp
- loss_type: SOFTMAX_CROSS_ENTROPY,因为有负采样在
- group_as_scope: 使用group_name作为embedding等variable的scope_name,建议设置成true

### 示例Config
[EGES_demo.config](https://easyrec.oss-cn-beijing.aliyuncs.com/config/eges_on_taobao.config)

### 参考论文
[EGES.pdf](https://arxiv.org/pdf/1803.02349.pdf)
1 change: 1 addition & 0 deletions docs/source/models/recall.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
dssm_neg_sampler
mind
co_metric_learning_i2i
eges

冷启动召回模型
========
Expand Down
3 changes: 2 additions & 1 deletion easy_rec/python/compat/early_stopping.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ def custom_early_stop_hook(estimator,
if eval_dir is None:
eval_dir = estimator.eval_dir()

if isinstance(custom_stop_func, str) or isinstance(custom_stop_func, unicode):
if isinstance(custom_stop_func, str) or isinstance(custom_stop_func,
type(u'')):
custom_stop_func = load_by_path(custom_stop_func)

def _custom_stop_fn():
Expand Down
13 changes: 10 additions & 3 deletions easy_rec/python/compat/feature_column/feature_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,12 @@ def _get_logits(): # pylint: disable=missing-docstring
if from_template:
return _get_logits()
else:
reuse = None if scope is None else variable_scope.AUTO_REUSE
with variable_scope.variable_scope(
scope, default_name='input_layer', values=features.values()):
scope,
default_name='input_layer',
values=features.values(),
reuse=reuse):
return _get_logits()


Expand All @@ -239,7 +243,8 @@ def input_layer(features,
trainable=True,
cols_to_vars=None,
cols_to_output_tensors=None,
feature_name_to_output_tensors=None):
feature_name_to_output_tensors=None,
scope=None):
"""Returns a dense `Tensor` as input layer based on given `feature_columns`.

Generally a single example in training data is described with FeatureColumns.
Expand Down Expand Up @@ -287,6 +292,7 @@ def input_layer(features,
cols_to_output_tensors: If not `None`, must be a dictionary that will be
filled with a mapping from '_FeatureColumn' to the associated
output `Tensor`s.
scope: variable scope.

Returns:
A `Tensor` which represents input layer of a model. Its shape
Expand All @@ -303,7 +309,8 @@ def input_layer(features,
trainable=trainable,
cols_to_vars=cols_to_vars,
cols_to_output_tensors=cols_to_output_tensors,
feature_name_to_output_tensors=feature_name_to_output_tensors)
feature_name_to_output_tensors=feature_name_to_output_tensors,
scope=scope)


# TODO(akshayka): InputLayer should be a subclass of Layer, and it
Expand Down
43 changes: 2 additions & 41 deletions easy_rec/python/core/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import division
from __future__ import print_function

import json
import logging
import math
import os
Expand All @@ -13,6 +12,7 @@
import tensorflow as tf

from easy_rec.python.protos.dataset_pb2 import DatasetConfig
from easy_rec.python.utils import graph_utils

try:
import graphlearn as gl
Expand Down Expand Up @@ -76,46 +76,7 @@ def __init__(self, fields, num_sample, num_eval_sample=None):
self._build_field_types(fields)

def _init_graph(self):
if 'TF_CONFIG' in os.environ:
tf_config = json.loads(os.environ['TF_CONFIG'])
if 'ps' in tf_config['cluster']:
# ps mode
tf_config = json.loads(os.environ['TF_CONFIG'])
ps_count = len(tf_config['cluster']['ps'])
task_count = len(tf_config['cluster']['worker']) + 2
cluster = {'server_count': ps_count, 'client_count': task_count}
if tf_config['task']['type'] in ['chief', 'master']:
self._g.init(cluster=cluster, job_name='client', task_index=0)
elif tf_config['task']['type'] == 'worker':
self._g.init(
cluster=cluster,
job_name='client',
task_index=tf_config['task']['index'] + 2)
# TODO(hongsheng.jhs): check cluster has evaluator or not?
elif tf_config['task']['type'] == 'evaluator':
self._g.init(
cluster=cluster,
job_name='client',
task_index=tf_config['task']['index'] + 1)
if self._num_eval_sample is not None and self._num_eval_sample > 0:
self._num_sample = self._num_eval_sample
elif tf_config['task']['type'] == 'ps':
self._g.init(
cluster=cluster,
job_name='server',
task_index=tf_config['task']['index'])
else:
# worker mode
task_count = len(tf_config['cluster']['worker']) + 1
if tf_config['task']['type'] in ['chief', 'master']:
self._g.init(task_index=0, task_count=task_count)
elif tf_config['task']['type'] == 'worker':
self._g.init(
task_index=tf_config['task']['index'] + 1, task_count=task_count)
# TODO(hongsheng.jhs): check cluster has evaluator or not?
else:
# local mode
self._g.init()
graph_utils.graph_init(self._g, os.environ.get('TF_CONFIG', None))

def _build_field_types(self, fields):
self._attr_names = []
Expand Down
2 changes: 1 addition & 1 deletion easy_rec/python/inference/vector_retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

try:
import graphlearn as gl
except:
except ImportError:
logging.WARN(
'GraphLearn is not installed. You can install it by "pip install https://easyrec.oss-cn-beijing.aliyuncs.com/3rdparty/graphlearn-0.7-cp27-cp27mu-linux_x86_64.whl.' # noqa: E501
)
Expand Down
1 change: 1 addition & 0 deletions easy_rec/python/input/csv_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self,
super(CSVInput, self).__init__(data_config, feature_config, input_path,
task_index, task_num)
self._with_header = data_config.with_header
# only for csv file with headers
self._field_names = None

def _parse_csv(self, line):
Expand Down
Loading