Source code for helios.layouts.force_directed

"""Force-Directed Layout oct-tree"""

import numpy as np
import heliosFR

from fury.stream.tools import IntervalTimer
from helios.layouts.base import NetworkLayoutAsync


[docs]class HeliosFr(NetworkLayoutAsync): """A 2D/3D Force-directed layout method This method is a wrapper for the helios force-directed algorithm, heliosFR. HeliosFr is a force-directed layout algorithm that is based on oct-trees. The algorithm is designed to work with a large number of nodes and edges. References ---------- [1] Fruchterman, T. M. J., & Reingold, E. M. (1991). Graph Drawing by Force-Directed Placement. Software: Practice and Experience, 21(11). """ def __init__( self, edges, network_draw, viscosity=0.3, a=0.0006, b=1, max_workers=8, update_interval_workers=0, velocities=None ): """ Parameters ---------- edges : ndarray network_draw : NetworkDraw viscosity : float, optional a : float, optional b : float, optional max_workers : int, optional number of threads update_interval_workers : float, optional When you set this to a value greater than zero the helios-fr will wait to perform each step. This can be used to reduce the CPU consumption """ self._started = False self._network_draw = network_draw self._interval_timer = None self._nodes_count = network_draw.positions.shape[0] self._update_interval_workers = update_interval_workers self._positions = np.ascontiguousarray( network_draw.positions, dtype=np.float32) self._edges = np.ascontiguousarray(edges, dtype=np.uint64) if velocities is None: velocities = np.zeros((self._nodes_count, 3), dtype=np.float32) self._layout = heliosFR.FRLayout( self._edges, self._positions, velocities, a, b, viscosity, maxWorkers=max_workers, updateInterval=self._update_interval_workers) def start(self, ms=15): """Starts the helios force-directed layout using others threads (async). Parameters ---------- ms : float, optional Interval in milleseconds to update the node and edge positions in the network visualization. """ if self._started: return self._layout.start() self._started = True if ms > 0: if ms < self._update_interval_workers: ms = self._update_interval_workers self._interval_timer = IntervalTimer( ms/1000, self.update) def stop(self): """This will stop the threads running the helios force-directed algorithm """ if not self._started: return if self._interval_timer is not None: self._interval_timer.stop() self._interval_timer = None self._layout.stop() self._started = False def steps(self, iterations): """A Sync version of the helios force-directed algorithm. Parameters ---------- iterations : int """ self._layout.iterate(iterations=iterations) self.update()