import json
import os
import pickle
import random
import time
import scipy.sparse as sp
from urllib import request
import numpy as np
import pandas as pd
import scipy
import torch
from ..evaluator import metric
pd.set_option('display.width', 1000)
[docs]def build_adj(attr, edge_index, adj_type='csr'):
if type(attr) == torch.Tensor:
attr = attr.numpy()
if type(edge_index) == torch.Tensor:
edge_index = edge_index.numpy()
if type(edge_index) == tuple:
edge_index = [edge_index[0].numpy(), edge_index[1].numpy()]
if adj_type == 'csr':
adj = sp.csr_matrix((attr, edge_index))
elif adj_type == 'coo':
adj = sp.coo_matrix((attr, edge_index))
return adj
[docs]def adj_to_tensor(adj):
r"""
Description
-----------
Convert adjacency matrix in scipy sparse format to torch sparse tensor.
Parameters
----------
adj : scipy.sparse.csr.csr_matrix
Adjacency matrix in form of ``N * N`` sparse matrix.
Returns
-------
adj_tensor : torch.Tensor
Adjacency matrix in form of ``N * N`` sparse tensor.
"""
if type(adj) != scipy.sparse.coo.coo_matrix:
adj = adj.tocoo()
sparse_row = torch.LongTensor(adj.row).unsqueeze(1)
sparse_col = torch.LongTensor(adj.col).unsqueeze(1)
sparse_concat = torch.cat((sparse_row, sparse_col), 1)
sparse_data = torch.FloatTensor(adj.data)
adj_tensor = torch.sparse.FloatTensor(sparse_concat.t(), sparse_data, torch.Size(adj.shape))
return adj_tensor
[docs]def adj_preprocess(adj, adj_norm_func=None, mask=None, model_type="torch", device='cpu'):
r"""
Description
-----------
Preprocess the adjacency matrix.
Parameters
----------
adj : scipy.sparse.csr.csr_matrix or a tuple
Adjacency matrix in form of ``N * N`` sparse matrix.
adj_norm_func : func of utils.normalize, optional
Function that normalizes adjacency matrix. Default: ``None``.
mask : torch.Tensor, optional
Mask of nodes in form of ``N * 1`` torch bool tensor. Default: ``None``.
model_type : str, optional
Type of model's backend, choose from ["torch", "cogdl", "dgl"]. Default: ``"torch"``.
device : str, optional
Device used to host data. Default: ``cpu``.
Returns
-------
adj : torch.Tensor or a tuple
Adjacency matrix in form of ``N * N`` sparse tensor or a tuple.
"""
if adj_norm_func is not None:
adj = adj_norm_func(adj)
if model_type == "torch":
if type(adj) is tuple or type(adj) is list:
if mask is not None:
adj = [adj_to_tensor(adj_[mask][:, mask]).to(device)
if type(adj_) != torch.Tensor else adj_[mask][:, mask].to(device)
for adj_ in adj]
else:
adj = [adj_to_tensor(adj_).to(device)
if type(adj_) != torch.Tensor else adj_.to(device)
for adj_ in adj]
else:
if type(adj) != torch.Tensor:
if mask is not None:
adj = adj_to_tensor(adj[mask][:, mask]).to(device)
else:
adj = adj_to_tensor(adj).to(device)
else:
if mask is not None:
adj = adj[mask][:, mask].to(device)
else:
adj = adj.to(device)
elif model_type == "dgl":
if type(adj) is tuple:
if mask is not None:
adj = [adj_[mask][:, mask] for adj_ in adj]
else:
adj = [adj_ for adj_ in adj]
else:
if mask is not None:
adj = adj[mask][:, mask]
else:
adj = adj
return adj
[docs]def feat_preprocess(features, feat_norm=None, device='cpu'):
r"""
Description
-----------
Preprocess the features.
Parameters
----------
features : torch.Tensor or numpy.array
Features in form of torch tensor or numpy array.
feat_norm : str, optional
Type of features normalization, choose from ["arctan", "tanh", None]. Default: ``None``.
device : str, optional
Device used to host data. Default: ``cpu``.
Returns
-------
features : torch.Tensor
Features in form of torch tensor on chosen device.
"""
def feat_normalize(feat, norm=None):
if norm == "arctan":
feat = 2 * np.arctan(feat) / np.pi
elif norm == "tanh":
feat = np.tanh(feat)
else:
feat = feat
return feat
if type(features) != torch.Tensor:
features = torch.FloatTensor(features)
elif features.type() != 'torch.FloatTensor':
features = features.float()
if feat_norm is not None:
features = feat_normalize(features, norm=feat_norm)
features = features.to(device)
return features
[docs]def label_preprocess(labels, device='cpu'):
r"""
Description
-----------
Convert labels to torch tensor.
Parameters
----------
labels : torch.Tensor
Labels in form of torch tensor.
device : str, optional
Device used to host data. Default: ``cpu``.
Returns
-------
labels : torch.Tensor
Features in form of torch tensor on chosen device.
"""
if type(labels) != torch.Tensor:
labels = torch.LongTensor(labels)
elif labels.type() != 'torch.LongTensor':
labels = labels.long()
labels = labels.to(device)
return labels
[docs]def inference(model, features, adj, feat_norm=None, adj_norm_func=None, device="cpu"):
"""
Description
-----------
Inference of model.
Parameters
----------
model : torch.nn.module
Model implemented based on ``torch.nn.module``.
features : torch.Tensor or numpy.array
Features in form of torch tensor or numpy array.
adj : scipy.sparse.csr.csr_matrix
Adjacency matrix in form of ``N * N`` sparse matrix.
feat_norm : str, optional
Type of features normalization, choose from ["arctan", "tanh", None]. Default: ``None``.
adj_norm_func : func of utils.normalize, optional
Function that normalizes adjacency matrix. Default: ``None``.
device : str, optional
Device used to host data. Default: ``cpu``.
Returns
-------
logits : torch.Tensor
Output logits of model.
"""
model.to(device)
model.eval()
adj = adj_preprocess(adj,
adj_norm_func=model.adj_norm_func if adj_norm_func is None else adj_norm_func,
model_type=model.model_type,
device=device)
features = feat_preprocess(features,
feat_norm=model.feat_norm if feat_norm is None else feat_norm,
device=device)
logits = model(features, adj)
return logits
[docs]def evaluate(model, features, adj, labels, feat_norm=None, adj_norm_func=None, eval_metric=metric.eval_acc,
mask=None, device="cpu"):
"""
Parameters
----------
model : torch.nn.module
Model implemented based on ``torch.nn.module``.
features : torch.Tensor or numpy.array
Features in form of torch tensor or numpy array.
adj : scipy.sparse.csr.csr_matrix
Adjacency matrix in form of ``N * N`` sparse matrix.
labels : torch.Tensor or numpy.array
Labels in form of torch tensor or numpy array.
feat_norm : str, optional
Type of features normalization, choose from ["arctan", "tanh", None]. Default: ``None``.
adj_norm_func : func of utils.normalize, optional
Function that normalizes adjacency matrix. Default: ``None``.
eval_metric : func of grb.metric, optional
Evaluation metric, like accuracy or F1 score. Default: ``grb.metric.eval_acc``.
mask : torch.tensor, optional
Mask of target nodes. Default: ``None``.
device : str, optional
Device used to host data. Default: ``cpu``.
Returns
-------
score : float
Score on masked nodes.
"""
model.to(device)
model.eval()
adj = adj_preprocess(adj,
adj_norm_func=model.adj_norm_func if adj_norm_func is None else adj_norm_func,
model_type=model.model_type,
device=device)
features = feat_preprocess(features,
feat_norm=model.feat_norm if feat_norm is None else feat_norm,
device=device)
labels = label_preprocess(labels=labels, device=device)
logits = model(features, adj)
if logits.shape[0] > labels.shape[0]:
logits = logits[:labels.shape[0]]
score = eval_metric(logits, labels, mask)
return score
[docs]def fix_seed(seed=0):
r"""
Description
-----------
Fix random process by a seed.
Parameters
----------
seed : int, optional
Random seed. Default: ``0``.
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
[docs]def get_num_params(model):
r"""
Description
-----------
Convert scipy sparse matrix to torch sparse tensor.
Parameters
----------
model : torch.nn.module
Model implemented based on ``torch.nn.module``.
"""
return sum([np.prod(p.size()) for p in model.parameters() if p.requires_grad])
[docs]def save_features(features, file_dir, file_name='features.npy'):
r"""
Description
-----------
Save generated adversarial features.
Parameters
----------
features : torch.Tensor or numpy.array
Features in form of torch tensor or numpy array.
file_dir : str
Directory to save the file.
file_name : str, optional
Name of file to save. Default: ``features.npy``.
"""
if features is not None:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
np.save(os.path.join(file_dir, file_name), features.cpu().detach().numpy())
[docs]def save_adj(adj, file_dir, file_name='adj.pkl'):
r"""
Description
-----------
Save generated adversarial adjacency matrix.
Parameters
----------
adj : scipy.sparse.csr.csr_matrix or a tuple
Adjacency matrix in form of ``N * N`` sparse matrix.
file_dir : str
Directory to save the file.
file_name : str, optional
Name of file to save. Default: ``adj.pkl``.
"""
if adj is not None:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
with open(os.path.join(file_dir, file_name), 'wb') as f:
pickle.dump(adj, f)
[docs]def save_model(model, save_dir, name, verbose=True):
r"""
Description
-----------
Save trained model.
Parameters
----------
model : torch.nn.module
Model implemented based on ``torch.nn.module``.
save_dir : str
Directory to save the model.
name : str
Name of saved model.
verbose : bool, optional
Whether to display logs. Default: ``False``.
"""
if save_dir is None:
cur_time = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
save_dir = "./tmp_{}".format(cur_time)
os.makedirs(save_dir)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
torch.save(model, os.path.join(save_dir, name))
if verbose:
print("Model saved in '{}'.".format(os.path.join(save_dir, name)))
[docs]def get_index_induc(index_a, index_b):
r"""
Description
-----------
Get index under the inductive training setting.
Parameters
----------
index_a : tuple
Tuple of index.
index_b : tuple
Tuple of index.
Returns
-------
index_a_new : tuple
Tuple of mapped index.
index_b_new : tuple
Tuple of mapped index.
"""
i_a, i_b = 0, 0
l_a, l_b = len(index_a), len(index_b)
i_new = 0
index_a_new, index_b_new = [], []
while i_new < l_a + l_b:
if i_a == l_a:
while i_b < l_b:
i_b += 1
index_b_new.append(i_new)
i_new += 1
continue
elif i_b == l_b:
while i_a < l_a:
i_a += 1
index_a_new.append(i_new)
i_new += 1
continue
if index_a[i_a] < index_b[i_b]:
i_a += 1
index_a_new.append(i_new)
i_new += 1
else:
i_b += 1
index_b_new.append(i_new)
i_new += 1
return index_a_new, index_b_new
[docs]def download(url, save_path):
r"""
Description
-----------
Download dataset from URL.
Parameters
----------
url : str
URL to the dataset.
save_path : str
Path to save the downloaded dataset.
"""
print("Downloading from {}.".format(url))
try:
data = request.urlopen(url)
except Exception as e:
print(e)
print("Failed to download the dataset.")
exit(1)
with open(save_path, "wb") as f:
f.write(data.read())
print("Saved to {}.".format(save_path))
[docs]def save_dict_to_xlsx(result_dict, file_dir, file_name="result.xlsx", index=0, verbose=False):
r"""
Description
-----------
Save result dictionary to .xlsx file.
Parameters
----------
result_dict : dict
Dictionary containing evaluation results.
file_dir : str
Directory to save the file.
file_name : str, optional
Name of saved file. Default: ``result.xlsx``.
index : int, optional
Index of dataframe. Default: ``0``.
verbose : bool, optional
Whether to display logs. Default: ``False``.
"""
if not os.path.exists(file_dir):
os.makedirs(file_dir)
df = pd.DataFrame(result_dict, index=[index])
df.to_excel(os.path.join(file_dir, file_name), index=True)
if verbose:
print(df)
[docs]def save_df_to_xlsx(df, file_dir, file_name="result.xlsx", verbose=False):
r"""
Description
-----------
Save dataframe to .xlsx file.
Parameters
----------
df : pandas.DataFrame
Dataframe containing evaluation results.
file_dir : str
Directory to save the file.
file_name : str, optional
Name of saved file. Default: ``result.xlsx``.
verbose : bool, optional
Whether to display logs. Default: ``False``.
"""
if not os.path.exists(file_dir):
os.makedirs(file_dir)
df.to_excel(os.path.join(file_dir, file_name), index=True)
if verbose:
print(df)
[docs]def save_df_to_csv(df, file_dir, file_name="result.csv", verbose=False):
r"""
Description
-----------
Save dataframe to .csv file.
Parameters
----------
df : pandas.DataFrame
Dataframe containing evaluation results.
file_dir : str
Directory to save the file.
file_name : str, optional
Name of saved file. Default: ``result.csv``.
verbose : bool, optional
Whether to display logs. Default: ``False``.
"""
if not os.path.exists(file_dir):
os.makedirs(file_dir)
df.to_csv(os.path.join(file_dir, file_name), index=True)
if verbose:
print(df)
[docs]def save_dict_to_json(result_dict, file_dir, file_name, verbose=False):
r"""
Description
-----------
Save dictinary to .json file.
Parameters
----------
result_dict : dict
Dictionary containing evaluation results.
file_dir : str
Directory to save the file.
file_name : str
Name of saved file.
verbose : bool, optional
Whether to display logs. Default: ``False``.
"""
if not os.path.exists(file_dir):
os.makedirs(file_dir)
with open(os.path.join(file_dir, file_name), 'w') as f:
json.dump(result_dict, f)
if verbose:
print(result_dict)
[docs]def check_symmetry(adj):
r"""
Description
-----------
Check if the adjacency matrix is symmetric.
Parameters
----------
adj : scipy.sparse.csr.csr_matrix
Adjacency matrix in form of ``N * N`` sparse matrix.
Returns
-------
bool
"""
if np.sum(adj[:, -adj.shape[0]:].T == adj[:, -adj.shape[0]:]) == adj.shape[0] ** 2:
return True
else:
return False
[docs]def check_feat_range(features, feat_lim_min, feat_lim_max):
r"""
Description
-----------
Check if the generated features are within the limited range.
Parameters
----------
features : torch.Tensor
Features in form of torch tensor.
feat_lim_min : float
Minimum limit of feature range.
feat_lim_max : float
Maximum limit of feature range.
Returns
-------
bool
"""
if isinstance(features, torch.Tensor):
features = features.detach().cpu().numpy()
if np.min(features) < feat_lim_min or np.max(features) > feat_lim_max:
return False
else:
return True