diff --git a/3D_app/GNG_3D.py b/3D_app/GNG_3D.py new file mode 100644 index 0000000..99fceca --- /dev/null +++ b/3D_app/GNG_3D.py @@ -0,0 +1,219 @@ +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import numpy as np +from sklearn.datasets import make_swiss_roll +import igraph as ig +from numpy.linalg import norm +from sklearn.datasets import make_moons +from tqdm import tqdm +from sys import stdout + +class GrowingNeuralGas: + + def __init__(self, max_neurons, max_iter, max_age, eb, en, alpha, beta, l, dataset): + + ''' + --------------------------------------------- + Growing Neural Gas' Parameter Declarations + --------------------------------------------- + 1. max_iter ; Maximum # of iterations the + 2. max_age ; Maximum # of age + 3. eb ; fraction of distance betweeen nearest neuron and input signal + 4. en ; fraction of distance betweeen neighboring neurons and input signal + 5. alpha ; multiplying scalar for local error + 6. beta ; multiplying scalar for global error + 7. l + 8. dataset + ''' + + # Parameters declared by user + self.max_neurons = max_neurons + self.max_iter = max_iter + self.max_age = max_age + self.eb = eb + self.en = en + self.alpha = alpha + self.beta = beta + self.l = l + self.dataset_original = dataset.copy() + self.dataset = dataset.copy() + + # Variable for tracking learning evolution + self.verts_evolve = [] + self.edges_evolve = [] + + def initialize_gng(self): + ''' + Initialize Growing Neural Gas + ''' + # Get random datapoints from target dataset + t0 = np.random.randint(0, int(self.dataset.shape[0] / 2)) + t1 = np.random.randint(int(self.dataset.shape[0] / 2), self.dataset.shape[0]) + # Initialize Growing Neural Gas + self.gng = ig.Graph() + self.gng.add_vertex(weight=self.dataset[t0, :].astype(np.float64), error=0) + self.gng.add_vertex(weight=self.dataset[t1, :].astype(np.float64), error=0) + self.gng.add_edge(0, 1, age=0) + + def learning_position(self): + for _ in range(self.l): + # Step 1. Get a random datapoint from target dataset + t = np.random.randint(0, self.dataset.shape[0]) + random_input = self.dataset[t, :] + + # Step 2. Find 2 nearest neuron from random_input + nearest_index = np.array([norm(weight[:2] - random_input[:2])**2 for weight in self.gng.vs['weight']]).argsort() + neuron_s1 = self.gng.vs[nearest_index[0]] + neuron_s2 = self.gng.vs[nearest_index[1]] + + # Step 3. Increase the age of all neighboring edges from nearest neuron (neuron_s1) + for edge_id in self.gng.incident(neuron_s1.index): + self.gng.es[edge_id]['age'] += 1 + + # Step 4. Add error to the nearest neuron + self.gng.vs[neuron_s1.index]['error'] += norm(neuron_s1['weight'][:2] - random_input[:2]) + + # Step 5.1. Update position of nearest neuron + neuron_s1['weight'][:2] += (self.eb * (random_input[:2] - neuron_s1['weight'][:2])) + neuron_s1['weight'][2] += (self.eb * (random_input[2] - neuron_s1['weight'][2])) + # Step 5.2. Update position of nearest neuron's neighbors + for neuron in self.gng.vs[self.gng.neighbors(neuron_s1.index)]: + neuron['weight'][:2] += (self.en * (random_input[:2] - neuron['weight'][:2])) + neuron['weight'][2] += (self.en * (random_input[2] - neuron['weight'][2])) + + # Step 6. Update edge of nearest neurons + EDGE_FLAG = self.gng.get_eid(neuron_s1.index, neuron_s2.index, directed=False, error=False) + if EDGE_FLAG == -1: + self.gng.add_edge(neuron_s1.index, neuron_s2.index, age=0) + else: + self.gng.es[EDGE_FLAG]['age'] = 0 + + # Step 7.1. Delete aging edge + for edge in self.gng.es: + src = edge.source + tgt = edge.target + if edge['age'] > self.max_age: + self.gng.delete_edges(edge.index) + + # Step 7.2. Delete isolated neuron + for neuron in self.gng.vs: + if len(self.gng.incident(neuron)) == 0: + self.gng.delete_vertices(neuron) + + # Step 8. Reduce global error + for neuron in self.gng.vs: + neuron['error'] *= self.beta + + # Step 9.1. Remove generated random input from target dataset + self.dataset = np.delete(self.dataset, t, axis=0) + # Step 9.2. Reset dataset if datapoints are depleted + if self.dataset.shape[0] == 1: + self.dataset = self.dataset_original.copy() + + def update_neuron(self): + # Adding new neuron from previous learning + # Get neuron q and f + if len(self.gng.vs) <= self.max_neurons: + error_index = np.array([error for error in self.gng.vs['error']]).argsort() + neuron_q = self.gng.vs[error_index[-1]] + error = np.array([(neuron['error'], neuron.index) for neuron in self.gng.vs[self.gng.neighbors(neuron_q.index)]]) + error = np.sort(error, axis=0) + neuron_f = self.gng.vs[int(error[-1, 1])] + # Add neuron between neuron q and f + self.gng.add_vertex(weight=(neuron_q['weight'] + neuron_f['weight']) / 2, error=0) + neuron_r = self.gng.vs[len(self.gng.vs) - 1] + # Delete edge between neuron q and f + self.gng.delete_edges(self.gng.get_eid(neuron_q.index, neuron_f.index)) + # Create edge between q-r and r-f + self.gng.add_edge(neuron_q.index, neuron_r.index, age=0) + self.gng.add_edge(neuron_r.index, neuron_f.index, age=0) + # Update neuron error + neuron_q['error'] *= self.alpha + neuron_f['error'] *= self.alpha + neuron_r['error'] = neuron_q['error'] + + def learn(self): + # Initialize GNG + self.initialize_gng() + # GNG learning iteration + for iter, _ in zip(range(0, self.max_iter), tqdm(range(self.max_iter))): + # Track evolution + self.verts_evolve.append(np.array([neuron['weight'] for neuron in self.gng.vs])) + self.edges_evolve.append(np.array([(neuron.source + 1, neuron.target + 1) for neuron in self.gng.es])) + # Learn new posititon + self.learning_position() + self.update_neuron() + return self.gng + + def plot_gng(self, active1,active2, data_point_size=1, neuron_size=30): + """ + Function to plot the neurons and connections of the GNG in 3D space, with color representing the intensity. + Parameters: + - data_point_size: Size of the data points in the plot. + - neuron_size: Size of the neurons in the plot. + """ + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + + # Plot the original data points + if (active1==True): + scatter = ax.scatter(self.dataset_original[:, 0], self.dataset_original[:, 1], self.dataset_original[:, 2], c=self.dataset_original[:, 3], s=data_point_size, cmap='viridis', alpha=0.5, label='Data') + + # Plot the neurons + neuron_positions = np.array([neuron['weight'] for neuron in self.gng.vs]) + scatter=ax.scatter(neuron_positions[:, 0], neuron_positions[:, 1], neuron_positions[:, 2], c=neuron_positions[:, 3], s=neuron_size, cmap='viridis', label='Neurons') + fig.colorbar(scatter, ax=ax, label='Intensity') + # Plot the connections + if(active2==True): + for edge in self.gng.es: + src, tgt = edge.source, edge.target + src_pos, tgt_pos = self.gng.vs[src]['weight'], self.gng.vs[tgt]['weight'] + ax.plot([src_pos[0], tgt_pos[0]], [src_pos[1], tgt_pos[1]], [src_pos[2], tgt_pos[2]], 'k-', alpha=0.5) + + ax.set_title('Growing Neural Gas Result') + ax.legend() + plt.show() + + + + +def plot_gng(x1,x2,active2, neuron_size=30): + """ + Function to plot the neurons and connections of the GNG in 3D space, with color representing the intensity. + Parameters: + - data_point_size: Size of the data points in the plot. + - neuron_size: Size of the neurons in the plot. + """ + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + + # Plot the original data points + #if (active1==True): + #scatter = ax.scatter(self.dataset_original[:, 0], self.dataset_original[:, 1], self.dataset_original[:, 2], c=self.dataset_original[:, 3], s=data_point_size, cmap='viridis', alpha=0.5, label='Data') + + # Plot the neurons + neuron_positions = x1 + scatter=ax.scatter(neuron_positions[:, 0], neuron_positions[:, 1], neuron_positions[:, 2], c=neuron_positions[:, 3], s=neuron_size, cmap='viridis', label='Neurons') + fig.colorbar(scatter, ax=ax, label='Intensity') + # Plot the connections + if(active2==True): + for edge in x2: + src, tgt = edge[0], edge[1] + src_pos, tgt_pos = x1[src][:], x1[tgt][:] + ax.plot([src_pos[0], tgt_pos[0]], [src_pos[1], tgt_pos[1]], [src_pos[2], tgt_pos[2]], 'k-', alpha=0.5) + + + + ax.set_title('Growing Neural Gas Result') + ax.legend() + plt.show() + + +def position_point_espace(data, seuil,y): + tab = [] + size_data = np.shape(data) + for i in range(size_data[0]): + for j in range(size_data[1]): + if data[i, j] > seuil: + tab.append([y,i, j, data[i, j]]) + return tab \ No newline at end of file