Source code for grb.attack.speit

import random

import numpy as np
import scipy.sparse as sp
import torch
import torch.nn.functional as F

from .base import InjectionAttack, EarlyStop
from ..evaluator import metric
from ..utils import utils


[docs]class SPEIT(InjectionAttack): r""" Description ----------- SPEIT graph injection attack, 1st place solution of KDD CUP 2020 Graph Adversarial Attack & Defense. (`SPEIT <https://github.com/Stanislas0/KDD_CUP_2020_MLTrack2_SPEIT>`__). Parameters ---------- lr : float Learning rate of feature optimization process. n_epoch : int Epoch of perturbations. n_inject_max : int Maximum number of injected nodes. n_edge_max : int Maximum number of edges of injected nodes. feat_lim_min : float Minimum limit of features. feat_lim_max : float Maximum limit of features. loss : func of torch.nn.functional, optional Loss function compatible with ``torch.nn.functional``. Default: ``F.nll_loss``. eval_metric : func of grb.evaluator.metric, optional Evaluation metric. Default: ``metric.eval_acc``. inject_mode : str, optional Mode of injection. Choose from ``["random", "random-inter", "multi-layer"]``. Default: ``random``. device : str, optional Device used to host data. Default: ``cpu``. early_stop : bool, optional Whether to early stop. Default: ``False``. verbose : bool, optional Whether to display logs. Default: ``True``. """ def __init__(self, lr, n_epoch, n_inject_max, n_edge_max, feat_lim_min, feat_lim_max, loss=F.nll_loss, eval_metric=metric.eval_acc, inject_mode='random', device='cpu', early_stop=False, verbose=True): self.device = device self.lr = lr self.n_epoch = n_epoch self.n_inject_max = n_inject_max self.n_edge_max = n_edge_max self.feat_lim_min = feat_lim_min self.feat_lim_max = feat_lim_max self.loss = loss self.eval_metric = eval_metric self.inject_mode = inject_mode self.verbose = verbose # Early stop if early_stop: self.early_stop = EarlyStop(patience=1000, epsilon=1e-4) else: self.early_stop = early_stop
[docs] def attack(self, model, adj, features, target_mask, adj_norm_func): r""" Description ----------- Attack process consists of injection and feature update. Parameters ---------- model : torch.nn.module Model implemented based on ``torch.nn.module``. adj : scipy.sparse.csr.csr_matrix Adjacency matrix in form of ``N * N`` sparse matrix. features : torch.FloatTensor Features in form of ``N * D`` torch float tensor. target_mask : torch.Tensor Mask of attack target nodes in form of ``N * 1`` torch bool tensor. adj_norm_func : func of utils.normalize Function that normalizes adjacency matrix. Returns ------- adj_attack : scipy.sparse.csr.csr_matrix Adversarial adjacency matrix in form of :math:`(N + N_{inject})\times(N + N_{inject})` sparse matrix. features_attack : torch.FloatTensor Features of nodes after attacks in form of :math:`N_{inject}` * D` torch float tensor. """ model.to(self.device) n_total, n_feat = features.shape features = utils.feat_preprocess(features=features, device=self.device) adj_tensor = utils.adj_preprocess(adj=adj, adj_norm_func=adj_norm_func, model_type=model.model_type, device=self.device) pred_orig = model(features, adj_tensor) origin_labels = torch.argmax(pred_orig, dim=1) adj_attack = self.injection(adj=adj, n_inject=self.n_inject_max, n_node=n_total, target_mask=target_mask, mode=self.inject_mode) features_attack = np.zeros([self.n_inject_max, n_feat]) features_attack = self.update_features(model=model, adj_attack=adj_attack, features=features, features_attack=features_attack, origin_labels=origin_labels, target_mask=target_mask, adj_norm_func=adj_norm_func) return adj_attack, features_attack
[docs] def injection(self, adj, n_inject, n_node, target_mask, target_node=None, mode='random-inter'): r""" Description ----------- Randomly inject nodes to target nodes. Parameters ---------- adj : scipy.sparse.csr.csr_matrix Adjacency matrix in form of ``N * N`` sparse matrix. n_inject : int Number of injection. n_node : int Number of all nodes. target_mask : torch.Tensor Mask of attack target nodes in form of ``N * 1`` torch bool tensor. target_node : np.ndarray IDs of target nodes to attack with priority. mode : str, optional Mode of injection. Choose from ``["random", "random-inter", "multi-layer"]``. Default: ``random-inter``. Returns ------- adj_attack : scipy.sparse.csr.csr_matrix Adversarial adjacency matrix in form of :math:`(N + N_{inject})\times(N + N_{inject})` sparse matrix. """ def get_inject_list(adj, n_inter, inject_id, inject_tmp_list): i = 1 res = [] while len(res) < n_inter and i < len(inject_tmp_list): if adj[inject_id, inject_tmp_list[i]] == 0: res.append(inject_tmp_list[i]) i += 1 return res def update_active_edges(active_edges, n_inject_edges, threshold): for i in active_edges: if n_inject_edges[i] >= threshold: active_edges.pop(active_edges.index(i)) return active_edges def inject(target_node_list, n_inject, n_test, mode='random-inter'): n_target = len(target_node_list) adj = np.zeros((n_inject, n_test + n_inject)) if mode == 'random-inter': # target_node_list: a list of target nodes to be attacked n_inject_edges = np.zeros(n_inject) active_edges = [i for i in range(n_inject)] # create edges between injected node and target node for i in range(n_target): if not active_edges: break inject_id = np.random.choice(active_edges, 1) n_inject_edges[inject_id] += 1 active_edges = update_active_edges(active_edges, n_inject_edges, threshold=self.n_edge_max) adj[inject_id, target_node_list[i]] = 1 # create edges between injected nodes for i in range(len(active_edges)): if not active_edges: break inject_tmp_list = sorted(active_edges, key=lambda x: n_inject_edges[x]) inject_id = inject_tmp_list[0] n_inter = self.n_edge_max - n_inject_edges[inject_id] inject_list = get_inject_list(adj, n_inter, inject_id, inject_tmp_list) n_inject_edges[inject_list] += 1 n_inject_edges[inject_id] += len(inject_list) active_edges = update_active_edges(active_edges, n_inject_edges, threshold=self.n_edge_max) if inject_list: adj[inject_id, n_test + np.array(inject_list)] = 1 adj[inject_list, n_test + inject_id] = 1 elif mode == 'multi-layer': n_inject_edges = np.zeros(n_inject) n_inject_layer_1, n_inject_layer_2 = int(n_inject * 0.9), int(n_inject * 0.1) n_edge_max_layer_1 = int(self.n_edge_max * 0.9) active_edges = [i for i in range(n_inject_layer_1)] # create edges between noise node and test node for i in range(n_target): if not active_edges: break inject_id = np.random.choice(active_edges, 1) n_inject_edges[inject_id] += 1 active_edges = update_active_edges(active_edges, n_inject_edges, threshold=n_edge_max_layer_1) adj[inject_id, target_node_list[i]] = 1 # create edges between injected nodes for i in range(len(active_edges)): if not active_edges: break inject_tmp_list = sorted(active_edges, key=lambda x: n_inject_edges[x]) inject_id = inject_tmp_list[0] n_inter = n_edge_max_layer_1 - n_inject_edges[inject_id] inject_list = get_inject_list(adj, n_inter, inject_id, inject_tmp_list) n_inject_edges[inject_list] += 1 n_inject_edges[inject_id] += len(inject_list) active_edges = update_active_edges(active_edges, n_inject_edges, threshold=n_edge_max_layer_1) if inject_list: adj[inject_id, n_test + np.array(inject_list)] = 1 adj[inject_list, n_test + inject_id] = 1 noise_active_layer2 = [i for i in range(n_inject_layer_2)] noise_edge_layer2 = np.zeros(n_inject_layer_2) for i in range(n_inject_layer_1): if not noise_active_layer2: break inject_list = np.random.choice(noise_active_layer2, 10) noise_edge_layer2[inject_list] += 1 noise_active_layer2 = update_active_edges(noise_active_layer2, noise_edge_layer2, threshold=self.n_edge_max) adj[inject_list + n_inject_layer_1, i + n_test] = 1 adj[i, inject_list + n_inject_layer_1 + n_test] = 1 else: print("Mode ERROR: 'mode' should be one of ['random', 'random-inter', 'multi-layer']") return adj if mode == 'random': test_index = torch.where(target_mask)[0] n_test = test_index.shape[0] new_edges_x = [] new_edges_y = [] new_data = [] for i in range(n_inject): islinked = np.zeros(n_test) for j in range(self.n_edge_max): x = i + n_node yy = random.randint(0, n_test - 1) while islinked[yy] > 0: yy = random.randint(0, n_test - 1) y = test_index[yy] new_edges_x.extend([x, y]) new_edges_y.extend([y, x]) new_data.extend([1, 1]) add1 = sp.csr_matrix((n_inject, n_node)) add2 = sp.csr_matrix((n_node + n_inject, n_inject)) adj_attack = sp.vstack([adj, add1]) adj_attack = sp.hstack([adj_attack, add2]) adj_attack.row = np.hstack([adj_attack.row, new_edges_x]) adj_attack.col = np.hstack([adj_attack.col, new_edges_y]) adj_attack.data = np.hstack([adj_attack.data, new_data]) return adj_attack else: # construct injected adjacency matrix test_index = torch.where(target_mask)[0] n_test = test_index.shape[0] adj_inject = inject(target_node, n_inject, n_test, mode) adj_inject = sp.hstack([sp.csr_matrix((n_inject, n_node - n_test)), adj_inject]) adj_inject = sp.csr_matrix(adj_inject) adj_attack = sp.vstack([adj, adj_inject[:, :n_node]]) adj_attack = sp.hstack([adj_attack, adj_inject.T]) adj_attack = sp.coo_matrix(adj_attack) return adj_attack
[docs] def update_features(self, model, adj_attack, features, features_attack, origin_labels, target_mask, adj_norm_func): r""" Description ----------- Adversarial feature generation of injected nodes. Parameters ---------- model : torch.nn.module Model implemented based on ``torch.nn.module``. adj_attack : scipy.sparse.csr.csr_matrix Adversarial adjacency matrix in form of :math:`(N + N_{inject})\times(N + N_{inject})` sparse matrix. features : torch.FloatTensor Features in form of ``N * D`` torch float tensor. features_attack : torch.FloatTensor Features of nodes after attacks in form of :math:`N_{inject}` * D` torch float tensor. origin_labels : torch.LongTensor Labels of target nodes originally predicted by the model. target_mask : torch.Tensor Mask of target nodes in form of ``N * 1`` torch bool tensor. adj_norm_func : func of utils.normalize Function that normalizes adjacency matrix. Returns ------- features_attack : torch.FloatTensor Updated features of nodes after attacks in form of :math:`N_{inject}` * D` torch float tensor. """ lr = self.lr n_epoch = self.n_epoch feat_lim_min, feat_lim_max = self.feat_lim_min, self.feat_lim_max n_total = features.shape[0] adj_attacked_tensor = utils.adj_preprocess(adj=adj_attack, adj_norm_func=adj_norm_func, model_type=model.model_type, device=self.device) features_attack = utils.feat_preprocess(features=features_attack, device=self.device) features_attack.requires_grad_(True) optimizer = torch.optim.Adam([features_attack], lr=lr) model.eval() for i in range(n_epoch): features_concat = torch.cat((features, features_attack), dim=0) pred = model(features_concat, adj_attacked_tensor) pred_loss = -self.loss(pred[:n_total][target_mask], origin_labels[target_mask]).to(self.device) optimizer.zero_grad() pred_loss.backward(retain_graph=True) optimizer.step() with torch.no_grad(): features_attack.clamp_(feat_lim_min, feat_lim_max) test_score = metric.eval_acc(pred[:n_total][target_mask], origin_labels[target_mask]) if self.early_stop: self.early_stop(test_score) if self.early_stop.stop: print("Attacking: Early stopped.") self.early_stop = EarlyStop() return features_attack if self.verbose: print( "Attacking: Epoch {}, Loss: {:.5f}, Surrogate test score: {:.5f}".format(i, pred_loss, test_score), end='\r' if i != n_epoch - 1 else '\n') return features_attack