Skip to content

Storage Engine

Overview

The Storage engine provides persistent storage for vector data using BoltDB as the underlying key-value store. It handles data serialization, collection management, and integrates with the quantization subsystem for efficient storage.

Architecture

graph TB
    subgraph StorageEngine
        DB[(BoltDB
Key-Value Store)] Serial[Protobuf
Serialization] Quant[Quantizer
Integration] end Serial --> DB Quant --> DB

Core Implementation

Data Structure

type Storage struct {
    db        *bbolt.DB
    closed    bool
    quantizer Quantizer
    useQuant  bool
}

Initialization

func NewStorage(path string, useQuant bool) (*Storage, error) {
    db, err := bbolt.Open(path, 0666, &bbolt.Options{
        Timeout: time.Second * 5,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to open database: %w", err)
    }

    s := &Storage{
        db:       db,
        useQuant: useQuant,
    }

    if useQuant {
        s.quantizer = NewSQ8Quantizer()
    }

    return s, nil
}

Collection Management

Creating Collections

BoltDB automatically creates collections (buckets) when data is first inserted:

func (s *Storage) getOrCreateBucket(colName string) (*bbolt.Bucket, error) {
    var bucket *bbolt.Bucket
    err := s.db.Update(func(tx *bbolt.Tx) error {
        var err error
        bucket, err = tx.CreateBucketIfNotExists([]byte(colName))
        return err
    })
    return bucket, err
}

Deleting Collections

func (s *Storage) DeleteCollection(colName string) error {
    return s.db.Update(func(tx *bbolt.Tx) error {
        return tx.DeleteBucket([]byte(colName))
    })
}

Point Operations

Upsert Points

func (s *Storage) UpsertPoints(colName string, points []*PointStruct) error {
    if s.closed {
        return errors.New("storage is closed")
    }

    return s.db.Update(func(tx *bbolt.Tx) error {
        b, err := tx.CreateBucketIfNotExists([]byte(colName))
        if err != nil {
            return err
        }

        for _, p := range points {
            // Handle quantization if enabled
            if s.useQuant && s.quantizer != nil && len(p.Vector) > 0 {
                quantized := s.quantizer.Quantize(p.Vector)
                if p.Payload == nil {
                    p.Payload = make(map[string]interface{})
                }
                p.Payload["__quantized_vector"] = quantized
                p.Vector = nil // Clear original vector
            }

            // Serialize point to protobuf
            pbPoint := toProtoPoint(p)
            data, err := proto.Marshal(pbPoint)
            if err != nil {
                return err
            }

            // Store in bucket
            if err := b.Put([]byte(p.ID), data); err != nil {
                return err
            }
        }
        return nil
    })
}

Retrieve Points

func (s *Storage) GetPoint(colName, pointID string) (*PointStruct, error) {
    if s.closed {
        return nil, errors.New("storage is closed")
    }

    var point *PointStruct
    err := s.db.View(func(tx *bbolt.Tx) error {
        b := tx.Bucket([]byte(colName))
        if b == nil {
            return errors.New("collection not found")
        }

        data := b.Get([]byte(pointID))
        if data == nil {
            return nil
        }

        pbPoint := &proto.Point{}
        if err := proto.Unmarshal(data, pbPoint); err != nil {
            return err
        }

        point = fromProtoPoint(pbPoint)
        return nil
    })

    return point, err
}

Delete Points

func (s *Storage) DeletePoints(colName string, pointIDs []string, filter *Filter) error {
    if s.closed {
        return errors.New("storage is closed")
    }

    return s.db.Update(func(tx *bbolt.Tx) error {
        b := tx.Bucket([]byte(colName))
        if b == nil {
            return errors.New("collection not found")
        }

        for _, id := range pointIDs {
            if err := b.Delete([]byte(id)); err != nil {
                return err
            }
        }
        return nil
    })
}

Iterate Points

func (s *Storage) IteratePoints(colName string, handler func(*PointStruct) error) error {
    if s.closed {
        return errors.New("storage is closed")
    }

    return s.db.View(func(tx *bbolt.Tx) error {
        b := tx.Bucket([]byte(colName))
        if b == nil {
            return errors.New("collection not found")
        }

        cursor := b.Cursor()
        for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
            pbPoint := &proto.Point{}
            if err := proto.Unmarshal(v, pbPoint); err != nil {
                return err
            }
            point := fromProtoPoint(pbPoint)
            if err := handler(point); err != nil {
                return err
            }
        }
        return nil
    })
}

Serialization

Protobuf Serialization

func toProtoPoint(p *PointStruct) *proto.Point {
    pbPoint := &proto.Point{
        ID:      p.ID,
        Version: p.Version,
        Vector:  make([]byte, len(p.Vector)*4),
    }

    // Convert float32 vector to bytes
    for i, v := range p.Vector {
        bits := math.Float32bits(v)
        binary.LittleEndian.PutUint32(pbPoint.Vector[i*4:i*4+4], bits)
    }

    // Convert payload
    pbPoint.Payload = make(map[string]*proto.Value)
    for k, v := range p.Payload {
        pbPoint.Payload[k] = interfaceToValue(v)
    }

    return pbPoint
}

Protobuf Deserialization

func fromProtoPoint(pbPoint *proto.Point) *PointStruct {
    // Convert bytes back to float32 vector
    vector := make([]float32, len(pbPoint.Vector)/4)
    for i := 0; i < len(vector); i++ {
        bits := binary.LittleEndian.Uint32(pbPoint.Vector[i*4:i*4+4])
        vector[i] = math.Float32frombits(bits)
    }

    // Convert payload
    payload := make(map[string]interface{})
    for k, v := range pbPoint.Payload {
        payload[k] = valueToInterface(v)
    }

    return &PointStruct{
        ID:      pbPoint.ID,
        Version: pbPoint.Version,
        Vector:  vector,
        Payload: payload,
    }
}

BoltDB Configuration

Default Options

&bbolt.Options{
    Timeout:    time.Second * 5,
    NoSync:     false,
    NoGrowSync: false,
    MmapSize:   256 * 1024 * 1024, // 256MB
}

Performance Tuning

Parameter Default Recommended Use Case
MmapSize 256MB 1-4GB Large datasets
BucketSize N/A 10000 Many points
NoSync false true* Crash resilience

*Warning: Setting NoSync to true risks data loss on crash.

Thread Safety

BoltDB supports concurrent reads and exclusive writes:

  • Reads: Multiple goroutines can read simultaneously
  • Writes: Exclusive access required, blocks other operations
  • Transactions: Use read-only transactions for bulk reads

Error Handling

Error Cause Handling
Database locked Concurrent write Retry with backoff
Collection not found Invalid name Check before operation
Corrupt data Disk issue Validate and restore
Out of memory MmapSize exceeded Increase mmap size

Backup and Recovery

Backup

func (s *Storage) Backup(path string) error {
    return s.db.View(func(tx *bbolt.Tx) error {
        return tx.CopyFile(path, 0666)
    })
}

Restore

func (s *Storage) Close() error {
    s.closed = true
    return s.db.Close()
}

func (s *Storage) Restore(path string) error {
    s.db.Close()
    err := os.Rename(path, s.db.Path())
    if err != nil {
        return err
    }
    s.db, err = bbolt.Open(s.db.Path(), 0666, nil)
    return err
}

Best Practices

  1. Regular backups: Schedule periodic backups
  2. Monitor disk space: BoltDB grows continuously
  3. Use transactions: Batch operations for better performance
  4. Close properly: Always close the database to ensure data integrity
  5. Handle quantization: Enable for large datasets to save storage