Keep all neighbor data in a single Vec

This commit is contained in:
Dirkjan Ochtman 2021-02-17 15:14:14 +01:00
parent d6b0171e7a
commit b7c61fc752
2 changed files with 189 additions and 155 deletions

View File

@ -1,6 +1,7 @@
use std::cmp::{max, Ordering, Reverse}; use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap; use std::collections::BinaryHeap;
use std::collections::HashSet; use std::collections::HashSet;
use std::ops::Range;
#[cfg(feature = "indicatif")] #[cfg(feature = "indicatif")]
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicUsize};
@ -10,13 +11,13 @@ use ordered_float::OrderedFloat;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
#[cfg(feature = "serde")] #[cfg(feature = "serde")]
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
mod types; mod types;
pub use types::PointId; pub use types::PointId;
use types::{Candidate, Layer, LayerId, UpperNode, Visited, ZeroNode, INVALID}; use types::{Candidate, Layer, LayerId, Meta, Visited, ZeroNode, INVALID};
/// Parameters for building the `Hnsw` /// Parameters for building the `Hnsw`
pub struct Builder { pub struct Builder {
@ -125,8 +126,8 @@ impl Default for Heuristic {
pub struct Hnsw<P> { pub struct Hnsw<P> {
ef_search: usize, ef_search: usize,
points: Vec<P>, points: Vec<P>,
zero: Vec<ZeroNode>, meta: Meta,
layers: Vec<Vec<UpperNode>>, neighbors: Vec<PointId>,
} }
impl<P> Hnsw<P> impl<P> Hnsw<P>
@ -157,28 +158,15 @@ where
return ( return (
Self { Self {
ef_search, ef_search,
zero: Vec::new(),
points: Vec::new(), points: Vec::new(),
layers: Vec::new(), neighbors: Vec::new(),
meta: Meta::default(),
}, },
Vec::new(), Vec::new(),
); );
} }
// Determine the number and size of layers. let meta = Meta::new(ml, points.len());
let mut sizes = Vec::new();
let mut num = points.len();
loop {
let next = (num as f32 * ml) as usize;
if next < M {
break;
}
sizes.push((num - next, num));
num = next;
}
sizes.push((num, num));
sizes.reverse();
// Give all points a random layer and sort the list of nodes by descending order for // Give all points a random layer and sort the list of nodes by descending order for
// construction. This allows us to copy higher layers to lower layers as construction // construction. This allows us to copy higher layers to lower layers as construction
@ -193,74 +181,51 @@ where
let mut new_points = Vec::with_capacity(points.len()); let mut new_points = Vec::with_capacity(points.len());
let mut new_nodes = Vec::with_capacity(points.len()); let mut new_nodes = Vec::with_capacity(points.len());
let mut out = vec![INVALID; points.len()]; let mut out = vec![INVALID; points.len()];
for (_, idx) in shuffled { let mut at_layer = meta.next_lower(None).unwrap();
for (i, (_, idx)) in shuffled.into_iter().enumerate() {
let pid = PointId(new_nodes.len() as u32); let pid = PointId(new_nodes.len() as u32);
let layer = sizes if i == at_layer.1 {
.iter() at_layer = meta.next_lower(Some(at_layer.0)).unwrap();
.enumerate() }
.find_map(|(i, &size)| match (pid.0 as usize) < size.1 {
true => Some(i),
false => None,
})
.unwrap();
new_points.push(points[idx].clone()); new_points.push(points[idx].clone());
new_nodes.push((LayerId(sizes.len() - layer - 1), pid)); new_nodes.push((at_layer.0, pid));
out[idx] = pid; out[idx] = pid;
} }
let (points, nodes) = (new_points, new_nodes); let (points, nodes) = (new_points, new_nodes);
debug_assert_eq!(nodes.first().unwrap().0, LayerId(meta.len() - 1));
debug_assert_eq!(nodes.last().unwrap().0, LayerId(0)); debug_assert_eq!(nodes.last().unwrap().0, LayerId(0));
debug_assert_eq!(nodes.first().unwrap().0, LayerId(sizes.len() - 1));
// The layer from the first node is our top layer, or the zero layer if we have no nodes.
let top = match nodes.first() {
Some((top, _)) => *top,
None => LayerId(0),
};
// Figure out how many nodes will go on each layer. This helps us allocate memory capacity
// for each layer in advance, and also helps enable batch insertion of points.
let num_layers = sizes.len();
let mut ranges = Vec::with_capacity(top.0);
for (i, (size, cumulative)) in sizes.into_iter().enumerate() {
let start = cumulative - size;
// Skip the first point, since we insert the enter point separately
ranges.push((LayerId(num_layers - i - 1), max(start, 1)..cumulative));
}
// Insert the first point so that we have an enter point to start searches with. // Insert the first point so that we have an enter point to start searches with.
let mut layers = vec![vec![]; top.0]; let mut neighbors = vec![INVALID; meta.neighbors()];
let zero = points let mut layers = meta.layers_mut(&mut neighbors);
.iter() let (zero, upper) = layers.split_first_mut().unwrap();
.map(|_| RwLock::new(ZeroNode::default())) let zero = zero.zero_nodes();
.collect::<Vec<_>>();
let pool = SearchPool::new(points.len()); let pool = SearchPool::new(points.len());
#[cfg(feature = "indicatif")] #[cfg(feature = "indicatif")]
let done = AtomicUsize::new(0); let done = AtomicUsize::new(0);
for (layer, range) in ranges { for layer in meta.descending() {
let num = if layer.is_zero() { M * 2 } else { M }; let num = if layer.is_zero() { M * 2 } else { M };
#[cfg(feature = "indicatif")] #[cfg(feature = "indicatif")]
if let Some(bar) = &progress { if let Some(bar) = &progress {
bar.set_message(format!("Building index (layer {})", layer.0)); bar.set_message(format!("Building index (layer {})", layer.0));
} }
let end = range.end; let Range { start, end } = meta.points(layer);
nodes[range].into_par_iter().for_each(|(_, pid)| { nodes[start..end].into_par_iter().for_each(|(_, pid)| {
let node = zero.as_slice()[*pid].write(); let node = zero.as_slice()[*pid].write();
let (mut search, mut insertion) = pool.pop(); let (mut search, mut insertion) = pool.pop();
let point = &points.as_slice()[*pid]; let point = &points.as_slice()[*pid];
search.reset(); search.reset();
search.push(PointId(0), point, &points); search.push(PointId(0), point, &points);
for cur in top.descend() { for cur in meta.descending() {
search.ef = if cur <= layer { ef_construction } else { 1 }; search.ef = if cur <= layer { ef_construction } else { 1 };
match cur > layer { match cur > layer {
true => { true => {
search.search(point, layers[cur.0 - 1].as_slice(), &points, num); search.search(point, upper[cur.0 - 1].as_ref(), &points, num);
search.cull(); search.cull();
} }
false => { false => {
@ -292,15 +257,10 @@ where
pool.push((search, insertion)); pool.push((search, insertion));
}); });
// For layers above the zero layer, make a copy of the current state of the zero layer // Copy the current state of the zero layer
// with `nearest` truncated to `M` elements. match layer.0 {
if !layer.is_zero() { 0 => break,
let mut upper = Vec::new(); n => upper[n - 1].copy_from_zero(&zero[..end]),
(&zero[..end])
.into_par_iter()
.map(|zero| UpperNode::from_zero(&zero.read()))
.collect_into_vec(&mut upper);
layers[layer.0 - 1] = upper;
} }
} }
@ -312,9 +272,9 @@ where
( (
Self { Self {
ef_search, ef_search,
zero: zero.into_iter().map(|node| node.into_inner()).collect(), neighbors,
meta,
points, points,
layers,
}, },
out, out,
) )
@ -337,17 +297,15 @@ where
search.visited.reserve_capacity(self.points.len()); search.visited.reserve_capacity(self.points.len());
search.push(PointId(0), point, &self.points); search.push(PointId(0), point, &self.points);
for cur in LayerId(self.layers.len()).descend() { for cur in self.meta.descending() {
let (ef, num) = match cur.is_zero() { let (ef, num) = match cur.is_zero() {
true => (self.ef_search, M * 2), true => (self.ef_search, M * 2),
false => (1, M), false => (1, M),
}; };
search.ef = ef; search.ef = ef;
match cur.0 { let layer = self.meta.layer(cur, &self.neighbors);
0 => search.search(point, self.zero.as_slice(), &self.points, num), search.search(point, layer, &self.points, num);
l => search.search(point, self.layers[l - 1].as_slice(), &self.points, num),
}
if !cur.is_zero() { if !cur.is_zero() {
search.cull(); search.cull();
@ -376,12 +334,12 @@ where
/// ///
/// Creates the new node, initializing its `nearest` array and updates the nearest neighbors /// Creates the new node, initializing its `nearest` array and updates the nearest neighbors
/// for the new node's neighbors if necessary before appending the new node to the layer. /// for the new node's neighbors if necessary before appending the new node to the layer.
fn insert<P: Point>( fn insert<'a, P: Point>(
new: PointId, new: PointId,
mut node: parking_lot::RwLockWriteGuard<ZeroNode>, mut node: parking_lot::RwLockWriteGuard<ZeroNode<'a>>,
insertion: &mut Search, insertion: &mut Search,
search: &mut Search, search: &mut Search,
layer: &[RwLock<ZeroNode>], layer: &'a [RwLock<ZeroNode<'a>>],
points: &[P], points: &[P],
heuristic: &Option<Heuristic>, heuristic: &Option<Heuristic>,
) { ) {

View File

@ -1,15 +1,157 @@
use std::cmp::max;
use std::hash::Hash; use std::hash::Hash;
use std::ops::{Deref, Index}; use std::ops::{Deref, Index, Range};
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
use rayon::slice::ParallelSliceMut;
#[cfg(feature = "serde")] #[cfg(feature = "serde")]
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "serde-big-array")]
use serde_big_array::big_array;
use crate::{Hnsw, Point, M}; use crate::{Hnsw, Point, M};
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
#[derive(Debug, Default)]
pub(crate) struct Meta(pub(crate) Vec<LayerMeta>);
impl Meta {
pub(crate) fn new(ml: f32, mut num: usize) -> Self {
let mut inner = Vec::new();
let mut neighbors = 0;
loop {
let mut next = (num as f32 * ml) as usize;
if next < M {
next = 0;
}
let start = neighbors;
neighbors += num * M * if inner.len() == 0 { 2 } else { 1 };
inner.push(LayerMeta {
max: num - next,
total: num,
start,
end: neighbors,
});
if next == 0 {
break;
}
num = next;
}
Self(inner)
}
pub(crate) fn next_lower(&self, cur: Option<LayerId>) -> Option<(LayerId, usize)> {
let idx = cur.map(|l| l.0 - 1).unwrap_or(self.len() - 1);
self.0.get(idx).map(|meta| (LayerId(idx), meta.total))
}
pub(crate) fn layer<'a>(&self, layer: LayerId, neighbors: &'a [PointId]) -> LayerSlice<'a> {
let meta = &self.0[layer.0];
LayerSlice {
neighbors: &neighbors[meta.start..meta.end],
stride: if layer.is_zero() { M * 2 } else { M },
}
}
pub(crate) fn layers_mut<'a>(
&self,
mut neighbors: &'a mut [PointId],
) -> Vec<LayerSliceMut<'a>> {
let mut layers = Vec::with_capacity(self.0.len());
let mut pos = 0;
for meta in self.0.iter() {
let len = meta.end - meta.start;
let stride = if pos == 0 { M * 2 } else { M };
let (cur, rest) = neighbors.split_at_mut(len);
layers.push(LayerSliceMut {
neighbors: cur,
stride,
});
neighbors = rest;
pos += len;
}
layers
}
pub(crate) fn descending<'a>(&'a self) -> impl Iterator<Item = LayerId> + 'a {
(0..self.0.len()).into_iter().rev().map(LayerId)
}
pub(crate) fn points(&self, layer: LayerId) -> Range<usize> {
let meta = &self.0[layer.0];
max(meta.total - meta.max, 1)..meta.total
}
pub(crate) fn neighbors(&self) -> usize {
self.0.last().unwrap().end
}
pub(crate) fn len(&self) -> usize {
self.0.len()
}
}
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
#[derive(Debug)]
pub(crate) struct LayerMeta {
max: usize,
total: usize,
start: usize,
end: usize,
}
pub(crate) struct LayerSliceMut<'a> {
neighbors: &'a mut [PointId],
stride: usize,
}
impl<'a> LayerSliceMut<'a> {
pub(crate) fn copy_from_zero(&mut self, zero: &[RwLock<ZeroNode<'_>>]) {
let stride = self.stride;
self.neighbors
.par_chunks_mut(stride)
.zip(zero)
.for_each(|(dst, src)| {
dst.copy_from_slice(&src.read()[..stride]);
});
}
pub(crate) fn zero_nodes(&mut self) -> Vec<RwLock<ZeroNode<'_>>> {
self.neighbors
.chunks_exact_mut(self.stride)
.map(|n| RwLock::new(ZeroNode(n)))
.collect::<Vec<_>>()
}
pub(crate) fn as_ref(&self) -> LayerSlice<'_> {
LayerSlice {
neighbors: self.neighbors.as_ref(),
stride: self.stride,
}
}
}
pub(crate) struct LayerSlice<'a> {
neighbors: &'a [PointId],
stride: usize,
}
impl<'a> Layer for LayerSlice<'a> {
type Slice = &'a [PointId];
fn nearest_iter(&self, pid: PointId) -> NearestIter<Self::Slice> {
let start = pid.0 as usize * self.stride;
let end = start + self.stride;
assert!(self.neighbors.len() >= end);
NearestIter::new(&self.neighbors[start..end])
}
}
pub(crate) struct Visited { pub(crate) struct Visited {
store: Vec<u8>, store: Vec<u8>,
generation: u8, generation: u8,
@ -58,36 +200,10 @@ impl Visited {
} }
} }
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] #[derive(Debug)]
#[derive(Clone, Copy, Debug, Default)] pub(crate) struct ZeroNode<'a>(pub(crate) &'a mut [PointId]);
pub(crate) struct UpperNode([PointId; M]);
impl UpperNode { impl<'a> ZeroNode<'a> {
pub(crate) fn from_zero(node: &ZeroNode) -> Self {
let mut nearest = [INVALID; M];
nearest.copy_from_slice(&node.0[..M]);
Self(nearest)
}
}
impl<'a> Layer for &'a [UpperNode] {
type Slice = &'a [PointId];
fn nearest_iter(&self, pid: PointId) -> NearestIter<Self::Slice> {
NearestIter::new(&self[pid.0 as usize].0)
}
}
#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
#[derive(Clone, Copy, Debug)]
pub(crate) struct ZeroNode(
#[cfg_attr(feature = "serde", serde(with = "BigArray"))] pub(crate) [PointId; M * 2],
);
#[cfg(feature = "serde-big-array")]
big_array! { BigArray; }
impl ZeroNode {
pub(crate) fn rewrite(&mut self, mut iter: impl Iterator<Item = PointId>) { pub(crate) fn rewrite(&mut self, mut iter: impl Iterator<Item = PointId>) {
for slot in self.0.iter_mut() { for slot in self.0.iter_mut() {
if let Some(pid) = iter.next() { if let Some(pid) = iter.next() {
@ -120,13 +236,7 @@ impl ZeroNode {
} }
} }
impl Default for ZeroNode { impl<'a> Deref for ZeroNode<'a> {
fn default() -> ZeroNode {
ZeroNode([INVALID; M * 2])
}
}
impl Deref for ZeroNode {
type Target = [PointId]; type Target = [PointId];
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
@ -134,15 +244,7 @@ impl Deref for ZeroNode {
} }
} }
impl<'a> Layer for &'a [ZeroNode] { impl<'a> Layer for &'a [RwLock<ZeroNode<'a>>] {
type Slice = &'a [PointId];
fn nearest_iter(&self, pid: PointId) -> NearestIter<Self::Slice> {
NearestIter::new(&self[pid.0 as usize])
}
}
impl<'a> Layer for &'a [RwLock<ZeroNode>] {
type Slice = MappedRwLockReadGuard<'a, [PointId]>; type Slice = MappedRwLockReadGuard<'a, [PointId]>;
fn nearest_iter(&self, pid: PointId) -> NearestIter<Self::Slice> { fn nearest_iter(&self, pid: PointId) -> NearestIter<Self::Slice> {
@ -198,37 +300,11 @@ where
pub(crate) struct LayerId(pub usize); pub(crate) struct LayerId(pub usize);
impl LayerId { impl LayerId {
pub(crate) fn descend(&self) -> impl Iterator<Item = LayerId> {
DescendingLayerIter { next: Some(self.0) }
}
pub(crate) fn is_zero(&self) -> bool { pub(crate) fn is_zero(&self) -> bool {
self.0 == 0 self.0 == 0
} }
} }
struct DescendingLayerIter {
next: Option<usize>,
}
impl Iterator for DescendingLayerIter {
type Item = LayerId;
fn next(&mut self) -> Option<Self::Item> {
Some(LayerId(match self.next? {
0 => {
self.next = None;
0
}
next => {
self.next = Some(next - 1);
next
}
}))
}
}
/// A potential nearest neighbor
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct Candidate { pub struct Candidate {
pub(crate) distance: OrderedFloat<f32>, pub(crate) distance: OrderedFloat<f32>,
@ -284,8 +360,8 @@ impl<P: Point> Index<PointId> for [P] {
} }
} }
impl Index<PointId> for [RwLock<ZeroNode>] { impl<'a> Index<PointId> for [RwLock<ZeroNode<'a>>] {
type Output = RwLock<ZeroNode>; type Output = RwLock<ZeroNode<'a>>;
fn index(&self, index: PointId) -> &Self::Output { fn index(&self, index: PointId) -> &Self::Output {
&self[index.0 as usize] &self[index.0 as usize]