Source code for graphy.qualityfuncs
"""Module implements quality functions for graph decompositions.
"""
from __future__ import division, print_function, absolute_import
import six
range = six.moves.range
import numpy as np
import scipy
import scipy.sparse as sp
from . import partitions
[docs]
class QualityFunction(object):
def __init__(self):
"""Base class for implementing a quality function.
"""
# precompute quantities here
pass
[docs]
def quality(self, membership):
"""Returns a quality score corresponding to membership vector.
Parameters
----------
membership : np.array
Membership vector
Returns
-------
float
Quality
"""
raise NotImplementedError
[docs]
def find_optimal(self, initial_membership=None, num_runs=1, debug_level=0):
"""Find optimal decomposition.
Parameters
----------
initial_membership : np.array, optional
Initial membership assignment. If None specified, each component is
assigned to separate subsystem.
num_runs : int, optional
Number of runs to try, can improve quality of decompositions. Default
is 1.
debug_level : int, optional
Amount of debugging information to display, from 0 (no debugging
information) to 3 (maximal debugging information)
Returns
-------
np.array
Optimal membership array
float
Q value corresponding to optimal membership
"""
return partitions.greedy_search(self.quality, self.N,
initial_membership=initial_membership,
num_runs=num_runs,
debug_level=debug_level)
[docs]
class Modularity(QualityFunction):
def __init__(self, mx):
"""Class that implements Newman's modularity quality function.
Parameters
----------
mx : 2-dimensional np.array
Connectivity matrix of graph to partition into communities.
"""
mx = mx if sp.isspmatrix(mx) else np.asarray(mx)
self.mx = mx.astype('float')
self.ks = self.mx.sum(axis=1)
self.total_stubs = self.ks.sum()
self.N = mx.shape[0]
[docs]
def quality(self, membership):
if len(membership) != self.N:
raise ValueError('Membership should be array of length %d, not %d' %
(self.N, len(membership)))
q = 0
for comm in set(membership):
ixs = membership == comm
q += (self.mx[ixs,:][:,ixs].sum() - self.ks[ixs].sum()**2/self.total_stubs)
return q / self.total_stubs
[docs]
class DirectedModularity(Modularity):
def __init__(self, mx):
"""Class that implements Newman's directed modularity quality function.
EA Leicht, MEJ Newman, "Community structure in directed networks", PRL, 2007.
http://arxiv.org/abs/0709.4500
Parameters
----------
mx : 2-dimensional np.array
Connectivity matrix of graph to partition into communities.
"""
super(DirectedModularity, self).__init__(mx)
self.k_in = self.mx.sum(axis=0).flat
self.k_out = self.mx.sum(axis=1).flat
[docs]
def quality(self, membership):
q = 0.0
for comm in set(membership):
ndxs = membership == comm
q += (self.mx[ndxs,:][:,ndxs].sum() - (self.k_in[ndxs].sum()*self.k_out[ndxs].sum()) / self.total_stubs)
return q / self.total_stubs
[docs]
class InfoMapCodeLength(QualityFunction):
def __init__(self, mx, teleportation_prob=0.0):
"""Class that implements Rosvall's `map equation' quality function.
Parameters
----------
mx : 2-dimensional np.array
Connectivity matrix of graph to partition into communities.
teleportation_prob : float (default 0.0)
Probability of teleporting to random node.
"""
mx = np.asarray(mx)
self.N = mx.shape[0]
mx = mx.astype('float')
# Create transition matrix
self.trans = mx/mx.sum(axis=0)[None,:]
if teleportation_prob > 0:
teleportation_trans = np.ones(mx.shape)
# np.fill_diagonal(teleportation_trans, 0)
teleportation_trans /= teleportation_trans.sum(axis=0)[None,:]
self.trans = (1-teleportation_prob)*self.trans + teleportation_prob*teleportation_trans
# Get equilibrium probs and entropy
evals, evecs = scipy.linalg.eig(self.trans)
equil = np.real(evecs[:,np.isclose(evals,1)])
self.equil = np.ravel(equil / equil.sum())
self.equil_entropy = self.normed_entropy(self.equil)
[docs]
@staticmethod
def normed_entropy(probs):
#probs = probs[probs != 0.0]
psum = sum(probs)
probs = probs / (psum if psum != 0 else 1)
return -np.sum(probs * np.log2(probs))
[docs]
def quality(self, membership, extrainfo=False):
comms = set(membership)
K = len(comms)
exit_probs = []
total_code_len = 0.0
for c in comms:
ixs = membership == c
comm_node_probs = self.equil[ixs]
comm_p = comm_node_probs.sum()
stay_prob = (self.trans[:,ixs][ixs,:].dot(comm_node_probs)).sum()
comm_exit_prob = (comm_p - stay_prob)
if comm_exit_prob > 0:
plist = np.array(comm_node_probs.tolist() + [comm_exit_prob,])
else:
plist = comm_node_probs
comm_code_len = (comm_p + comm_exit_prob) * self.normed_entropy(plist)
total_code_len += comm_code_len
exit_probs.append(comm_exit_prob)
if K > 1:
total_code_len += sum(exit_probs) * self.normed_entropy(np.array(exit_probs))
# Return negative because we want to minimize, not maximize
return -total_code_len