Storage Engine¶
Overview¶
The Storage engine provides persistent storage for vector data using BoltDB as the underlying key-value store. It handles data serialization, collection management, and integrates with the quantization subsystem for efficient storage.
Architecture¶
graph TB
subgraph StorageEngine
DB[(BoltDB
Key-Value Store)]
Serial[Protobuf
Serialization]
Quant[Quantizer
Integration]
end
Serial --> DB
Quant --> DB
Core Implementation¶
Data Structure¶
Initialization¶
func NewStorage(path string, useQuant bool) (*Storage, error) {
db, err := bbolt.Open(path, 0666, &bbolt.Options{
Timeout: time.Second * 5,
})
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
s := &Storage{
db: db,
useQuant: useQuant,
}
if useQuant {
s.quantizer = NewSQ8Quantizer()
}
return s, nil
}
Collection Management¶
Creating Collections¶
BoltDB automatically creates collections (buckets) when data is first inserted:
func (s *Storage) getOrCreateBucket(colName string) (*bbolt.Bucket, error) {
var bucket *bbolt.Bucket
err := s.db.Update(func(tx *bbolt.Tx) error {
var err error
bucket, err = tx.CreateBucketIfNotExists([]byte(colName))
return err
})
return bucket, err
}
Deleting Collections¶
func (s *Storage) DeleteCollection(colName string) error {
return s.db.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(colName))
})
}
Point Operations¶
Upsert Points¶
func (s *Storage) UpsertPoints(colName string, points []*PointStruct) error {
if s.closed {
return errors.New("storage is closed")
}
return s.db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(colName))
if err != nil {
return err
}
for _, p := range points {
// Handle quantization if enabled
if s.useQuant && s.quantizer != nil && len(p.Vector) > 0 {
quantized := s.quantizer.Quantize(p.Vector)
if p.Payload == nil {
p.Payload = make(map[string]interface{})
}
p.Payload["__quantized_vector"] = quantized
p.Vector = nil // Clear original vector
}
// Serialize point to protobuf
pbPoint := toProtoPoint(p)
data, err := proto.Marshal(pbPoint)
if err != nil {
return err
}
// Store in bucket
if err := b.Put([]byte(p.ID), data); err != nil {
return err
}
}
return nil
})
}
Retrieve Points¶
func (s *Storage) GetPoint(colName, pointID string) (*PointStruct, error) {
if s.closed {
return nil, errors.New("storage is closed")
}
var point *PointStruct
err := s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(colName))
if b == nil {
return errors.New("collection not found")
}
data := b.Get([]byte(pointID))
if data == nil {
return nil
}
pbPoint := &proto.Point{}
if err := proto.Unmarshal(data, pbPoint); err != nil {
return err
}
point = fromProtoPoint(pbPoint)
return nil
})
return point, err
}
Delete Points¶
func (s *Storage) DeletePoints(colName string, pointIDs []string, filter *Filter) error {
if s.closed {
return errors.New("storage is closed")
}
return s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(colName))
if b == nil {
return errors.New("collection not found")
}
for _, id := range pointIDs {
if err := b.Delete([]byte(id)); err != nil {
return err
}
}
return nil
})
}
Iterate Points¶
func (s *Storage) IteratePoints(colName string, handler func(*PointStruct) error) error {
if s.closed {
return errors.New("storage is closed")
}
return s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(colName))
if b == nil {
return errors.New("collection not found")
}
cursor := b.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
pbPoint := &proto.Point{}
if err := proto.Unmarshal(v, pbPoint); err != nil {
return err
}
point := fromProtoPoint(pbPoint)
if err := handler(point); err != nil {
return err
}
}
return nil
})
}
Serialization¶
Protobuf Serialization¶
func toProtoPoint(p *PointStruct) *proto.Point {
pbPoint := &proto.Point{
ID: p.ID,
Version: p.Version,
Vector: make([]byte, len(p.Vector)*4),
}
// Convert float32 vector to bytes
for i, v := range p.Vector {
bits := math.Float32bits(v)
binary.LittleEndian.PutUint32(pbPoint.Vector[i*4:i*4+4], bits)
}
// Convert payload
pbPoint.Payload = make(map[string]*proto.Value)
for k, v := range p.Payload {
pbPoint.Payload[k] = interfaceToValue(v)
}
return pbPoint
}
Protobuf Deserialization¶
func fromProtoPoint(pbPoint *proto.Point) *PointStruct {
// Convert bytes back to float32 vector
vector := make([]float32, len(pbPoint.Vector)/4)
for i := 0; i < len(vector); i++ {
bits := binary.LittleEndian.Uint32(pbPoint.Vector[i*4:i*4+4])
vector[i] = math.Float32frombits(bits)
}
// Convert payload
payload := make(map[string]interface{})
for k, v := range pbPoint.Payload {
payload[k] = valueToInterface(v)
}
return &PointStruct{
ID: pbPoint.ID,
Version: pbPoint.Version,
Vector: vector,
Payload: payload,
}
}
BoltDB Configuration¶
Default Options¶
&bbolt.Options{
Timeout: time.Second * 5,
NoSync: false,
NoGrowSync: false,
MmapSize: 256 * 1024 * 1024, // 256MB
}
Performance Tuning¶
| Parameter | Default | Recommended | Use Case |
|---|---|---|---|
| MmapSize | 256MB | 1-4GB | Large datasets |
| BucketSize | N/A | 10000 | Many points |
| NoSync | false | true* | Crash resilience |
*Warning: Setting NoSync to true risks data loss on crash.
Thread Safety¶
BoltDB supports concurrent reads and exclusive writes:
- Reads: Multiple goroutines can read simultaneously
- Writes: Exclusive access required, blocks other operations
- Transactions: Use read-only transactions for bulk reads
Error Handling¶
| Error | Cause | Handling |
|---|---|---|
| Database locked | Concurrent write | Retry with backoff |
| Collection not found | Invalid name | Check before operation |
| Corrupt data | Disk issue | Validate and restore |
| Out of memory | MmapSize exceeded | Increase mmap size |
Backup and Recovery¶
Backup¶
func (s *Storage) Backup(path string) error {
return s.db.View(func(tx *bbolt.Tx) error {
return tx.CopyFile(path, 0666)
})
}
Restore¶
func (s *Storage) Close() error {
s.closed = true
return s.db.Close()
}
func (s *Storage) Restore(path string) error {
s.db.Close()
err := os.Rename(path, s.db.Path())
if err != nil {
return err
}
s.db, err = bbolt.Open(s.db.Path(), 0666, nil)
return err
}
Best Practices¶
- Regular backups: Schedule periodic backups
- Monitor disk space: BoltDB grows continuously
- Use transactions: Batch operations for better performance
- Close properly: Always close the database to ensure data integrity
- Handle quantization: Enable for large datasets to save storage