用Rust实现一个简易数据库引擎

从零实现一个简单的 KV 数据库引擎,支持 GET/SET/SCAN 操作,底层使用 B+ 树索引和 WAL 预写日志。本文记录核心模块的设计与 Rust 实现。

一、整体架构

┌──────────────────┐
│   SQL Parser     │  ← nom 解析简单的 GET/SET/SCAN 语句
├──────────────────┤
│  Query Engine    │  ← 执行层
├──────────────────┤
│  B+Tree Index    │  ← 内存中的 B+ 树
├──────────────────┤
│  Page Manager    │  ← 4KB 页管理
├──────────────────┤
│  WAL (Write-     │  ← 预写日志,保证持久化
│   Ahead Log)     │
├──────────────────┤
│  Disk Storage    │  ← 数据文件
└──────────────────┘

核心设计决策:

  • 页大小固定 4KB,与操作系统页面对齐
  • B+ 树的叶子节点存储实际的 KV 数据
  • 所有写操作先写 WAL,再更新 B+ 树,最后刷盘
  • 支持 GET(点查)、SET(写入)、SCAN(范围查询)三种操作

二、Page 管理

页是存储的最小单位。每个页包含一个 header 和若干条记录:

const PAGE_SIZE: usize = 4096;
const PAGE_HEADER_SIZE: usize = 16;

#[derive(Debug, Clone, Copy)]
#[repr(C)]
struct PageHeader {
    page_id: u32,
    page_type: u8,      // 0=内部节点, 1=叶子节点
    num_records: u16,
    free_offset: u16,    // 空闲空间起始偏移
    right_sibling: u32,  // 叶子节点的右兄弟(用于 SCAN)
    _padding: [u8; 3],
}

struct Page {
    data: [u8; PAGE_SIZE],
}

impl Page {
    fn new(page_id: u32, page_type: u8) -> Self {
        let mut page = Page { data: [0u8; PAGE_SIZE] };
        let header = PageHeader {
            page_id,
            page_type,
            num_records: 0,
            free_offset: PAGE_HEADER_SIZE as u16,
            right_sibling: 0,
            _padding: [0; 3],
        };
        // 写入 header
        let header_bytes = unsafe {
            std::slice::from_raw_parts(
                &header as *const PageHeader as *const u8,
                std::mem::size_of::<PageHeader>(),
            )
        };
        page.data[..PAGE_HEADER_SIZE].copy_from_slice(header_bytes);
        page
    }

    fn header(&self) -> &PageHeader {
        unsafe { &*(self.data.as_ptr() as *const PageHeader) }
    }

    fn header_mut(&mut self) -> &mut PageHeader {
        unsafe { &mut *(self.data.as_mut_ptr() as *mut PageHeader) }
    }
}

页管理器负责页的分配、读取和写回:

use std::collections::HashMap;
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};

struct PageManager {
    file: File,
    next_page_id: u32,
    cache: HashMap<u32, Page>,  // 简单的页缓存
}

impl PageManager {
    fn open(path: &str) -> std::io::Result<Self> {
        let file = std::fs::OpenOptions::new()
            .read(true).write(true).create(true)
            .open(path)?;
        let file_len = file.metadata()?.len();
        let next_page_id = (file_len / PAGE_SIZE as u64) as u32;
        Ok(PageManager {
            file,
            next_page_id,
            cache: HashMap::new(),
        })
    }

    fn allocate(&mut self, page_type: u8) -> u32 {
        let id = self.next_page_id;
        self.next_page_id += 1;
        let page = Page::new(id, page_type);
        self.cache.insert(id, page);
        id
    }

    fn get_page(&mut self, page_id: u32) -> std::io::Result<&Page> {
        if !self.cache.contains_key(&page_id) {
            let mut page = Page { data: [0u8; PAGE_SIZE] };
            self.file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))?;
            self.file.read_exact(&mut page.data)?;
            self.cache.insert(page_id, page);
        }
        Ok(self.cache.get(&page_id).unwrap())
    }

    fn flush(&mut self, page_id: u32) -> std::io::Result<()> {
        if let Some(page) = self.cache.get(&page_id) {
            self.file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))?;
            self.file.write_all(&page.data)?;
            self.file.flush()?;
        }
        Ok(())
    }
}

三、WAL 预写日志

WAL 是持久化的关键。在修改 B+ 树之前,先把操作记录写入 WAL 文件。如果进程崩溃,可以通过重放 WAL 恢复数据。

use std::io::BufWriter;

#[derive(Debug)]
enum WalRecord {
    Set { key: Vec<u8>, value: Vec<u8> },
    Delete { key: Vec<u8> },
    Checkpoint,  // 标记数据已刷盘,之前的记录可以丢弃
}

struct Wal {
    writer: BufWriter<File>,
    path: String,
}

impl Wal {
    fn open(path: &str) -> std::io::Result<Self> {
        let file = std::fs::OpenOptions::new()
            .create(true).append(true).open(path)?;
        Ok(Wal {
            writer: BufWriter::new(file),
            path: path.to_string(),
        })
    }

    fn append(&mut self, record: &WalRecord) -> std::io::Result<()> {
        // 格式: [type: 1B][key_len: 4B][key][value_len: 4B][value]
        match record {
            WalRecord::Set { key, value } => {
                self.writer.write_all(&[0x01])?;
                self.writer.write_all(&(key.len() as u32).to_le_bytes())?;
                self.writer.write_all(key)?;
                self.writer.write_all(&(value.len() as u32).to_le_bytes())?;
                self.writer.write_all(value)?;
            }
            WalRecord::Delete { key } => {
                self.writer.write_all(&[0x02])?;
                self.writer.write_all(&(key.len() as u32).to_le_bytes())?;
                self.writer.write_all(key)?;
            }
            WalRecord::Checkpoint => {
                self.writer.write_all(&[0xFF])?;
            }
        }
        self.writer.flush()?;
        Ok(())
    }

    fn replay<F: FnMut(WalRecord)>(path: &str, mut callback: F) -> std::io::Result<()> {
        let mut file = File::open(path)?;
        let mut buf = Vec::new();
        file.read_to_end(&mut buf)?;

        let mut pos = 0;
        while pos < buf.len() {
            match buf[pos] {
                0x01 => {
                    pos += 1;
                    let key_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
                    pos += 4;
                    let key = buf[pos..pos+key_len].to_vec();
                    pos += key_len;
                    let val_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
                    pos += 4;
                    let value = buf[pos..pos+val_len].to_vec();
                    pos += val_len;
                    callback(WalRecord::Set { key, value });
                }
                0x02 => {
                    pos += 1;
                    let key_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
                    pos += 4;
                    let key = buf[pos..pos+key_len].to_vec();
                    pos += key_len;
                    callback(WalRecord::Delete { key });
                }
                0xFF => {
                    pos += 1;
                    callback(WalRecord::Checkpoint);
                }
                _ => break,
            }
        }
        Ok(())
    }
}

四、B+ 树索引

B+ 树是数据库索引的经典结构。叶子节点存储实际数据并通过链表相连(支持范围扫描),内部节点只存储分隔键和子节点指针。

const B_PLUS_TREE_ORDER: usize = 64; // 每个节点最多 64 个 key

#[derive(Debug, Clone)]
enum BPlusNode {
    Internal {
        keys: Vec<Vec<u8>>,
        children: Vec<u32>,  // 子节点的 page_id
    },
    Leaf {
        keys: Vec<Vec<u8>>,
        values: Vec<Vec<u8>>,
        next_leaf: Option<u32>,  // 右兄弟叶子节点
    },
}

struct BPlusTree {
    root: u32,
    page_manager: PageManager,
    nodes: HashMap<u32, BPlusNode>,  // 简化:内存中维护节点映射
}

impl BPlusTree {
    fn new(page_manager: PageManager) -> Self {
        let mut tree = BPlusTree {
            root: 0,
            page_manager,
            nodes: HashMap::new(),
        };
        // 创建初始的空叶子节点作为 root
        let root_id = tree.page_manager.allocate(1);
        tree.root = root_id;
        tree.nodes.insert(root_id, BPlusNode::Leaf {
            keys: Vec::new(),
            values: Vec::new(),
            next_leaf: None,
        });
        tree
    }

    fn get(&self, key: &[u8]) -> Option<&Vec<u8>> {
        let leaf_id = self.find_leaf(key);
        if let Some(BPlusNode::Leaf { keys, values, .. }) = self.nodes.get(&leaf_id) {
            if let Ok(idx) = keys.binary_search_by(|k| k.as_slice().cmp(key)) {
                return Some(&values[idx]);
            }
        }
        None
    }

    fn find_leaf(&self, key: &[u8]) -> u32 {
        let mut current = self.root;
        loop {
            match self.nodes.get(&current) {
                Some(BPlusNode::Internal { keys, children }) => {
                    let idx = keys.partition_point(|k| k.as_slice() <= key);
                    current = children[idx];
                }
                Some(BPlusNode::Leaf { .. }) => return current,
                None => panic!("node {} not found", current),
            }
        }
    }

    fn set(&mut self, key: Vec<u8>, value: Vec<u8>) {
        let leaf_id = self.find_leaf(&key);

        if let Some(BPlusNode::Leaf { keys, values, .. }) = self.nodes.get_mut(&leaf_id) {
            match keys.binary_search_by(|k| k.as_slice().cmp(&key)) {
                Ok(idx) => {
                    // key 已存在,更新 value
                    values[idx] = value;
                }
                Err(idx) => {
                    // 插入新 key
                    keys.insert(idx, key);
                    values.insert(idx, value);

                    // 检查是否需要分裂
                    if keys.len() >= B_PLUS_TREE_ORDER {
                        self.split_leaf(leaf_id);
                    }
                }
            }
        }
    }

    fn split_leaf(&mut self, leaf_id: u32) {
        // 分裂叶子节点:将后半部分移到新节点
        let mid = B_PLUS_TREE_ORDER / 2;
        let (split_key, new_id);

        if let Some(BPlusNode::Leaf { keys, values, next_leaf }) = self.nodes.get_mut(&leaf_id) {
            let new_keys = keys.split_off(mid);
            let new_values = values.split_off(mid);
            split_key = new_keys[0].clone();
            let old_next = *next_leaf;

            new_id = self.page_manager.allocate(1);
            *next_leaf = Some(new_id);

            self.nodes.insert(new_id, BPlusNode::Leaf {
                keys: new_keys,
                values: new_values,
                next_leaf: old_next,
            });
        } else {
            return;
        }

        // 将分裂键插入父节点(简化处理:如果是 root 则新建 root)
        if leaf_id == self.root {
            let new_root_id = self.page_manager.allocate(0);
            self.nodes.insert(new_root_id, BPlusNode::Internal {
                keys: vec![split_key],
                children: vec![leaf_id, new_id],
            });
            self.root = new_root_id;
        }
        // 完整实现需要递归处理父节点分裂,此处省略
    }

    /// SCAN: 范围查询 [start_key, end_key)
    fn scan(&self, start_key: &[u8], end_key: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
        let mut results = Vec::new();
        let mut leaf_id = self.find_leaf(start_key);

        loop {
            match self.nodes.get(&leaf_id) {
                Some(BPlusNode::Leaf { keys, values, next_leaf }) => {
                    for (i, k) in keys.iter().enumerate() {
                        if k.as_slice() >= start_key && k.as_slice() < end_key {
                            results.push((k.clone(), values[i].clone()));
                        }
                        if k.as_slice() >= end_key {
                            return results;
                        }
                    }
                    match next_leaf {
                        Some(next) => leaf_id = *next,
                        None => return results,
                    }
                }
                _ => break,
            }
        }
        results
    }
}

五、简单的 SQL 解析器

nom 解析简单的 GET/SET/SCAN 命令:

use nom::{
    branch::alt,
    bytes::complete::{tag_no_case, take_while1},
    character::complete::{multispace0, multispace1},
    combinator::map,
    sequence::{preceded, tuple},
    IResult,
};

#[derive(Debug)]
enum Command {
    Get(String),
    Set(String, String),
    Scan(String, String),
}

fn parse_identifier(input: &str) -> IResult<&str, &str> {
    take_while1(|c: char| c.is_alphanumeric() || c == '_' || c == '-')(input)
}

fn parse_quoted_string(input: &str) -> IResult<&str, &str> {
    let (input, _) = tag_no_case("'")(input)?;
    let (input, s) = take_while1(|c: char| c != ''')(input)?;
    let (input, _) = tag_no_case("'")(input)?;
    Ok((input, s))
}

fn parse_value(input: &str) -> IResult<&str, &str> {
    alt((parse_quoted_string, parse_identifier))(input)
}

fn parse_get(input: &str) -> IResult<&str, Command> {
    map(
        preceded(
            tuple((tag_no_case("GET"), multispace1)),
            parse_value,
        ),
        |key| Command::Get(key.to_string()),
    )(input)
}

fn parse_set(input: &str) -> IResult<&str, Command> {
    map(
        tuple((
            tag_no_case("SET"),
            multispace1,
            parse_value,
            multispace1,
            parse_value,
        )),
        |(_, _, key, _, value)| Command::Set(key.to_string(), value.to_string()),
    )(input)
}

fn parse_scan(input: &str) -> IResult<&str, Command> {
    map(
        tuple((
            tag_no_case("SCAN"),
            multispace1,
            parse_value,
            multispace1,
            parse_value,
        )),
        |(_, _, start, _, end)| Command::Scan(start.to_string(), end.to_string()),
    )(input)
}

fn parse_command(input: &str) -> IResult<&str, Command> {
    preceded(multispace0, alt((parse_get, parse_set, parse_scan)))(input)
}

六、整合执行引擎

struct Database {
    tree: BPlusTree,
    wal: Wal,
}

impl Database {
    fn open(data_path: &str, wal_path: &str) -> std::io::Result<Self> {
        let page_manager = PageManager::open(data_path)?;
        let mut tree = BPlusTree::new(page_manager);
        let wal = Wal::open(wal_path)?;

        // 重放 WAL 恢复数据
        if std::path::Path::new(wal_path).exists() {
            Wal::replay(wal_path, |record| {
                match record {
                    WalRecord::Set { key, value } => tree.set(key, value),
                    WalRecord::Delete { .. } => { /* TODO */ }
                    WalRecord::Checkpoint => {}
                }
            })?;
        }

        Ok(Database { tree, wal })
    }

    fn execute(&mut self, input: &str) -> String {
        match parse_command(input) {
            Ok((_, cmd)) => match cmd {
                Command::Get(key) => {
                    match self.tree.get(key.as_bytes()) {
                        Some(val) => String::from_utf8_lossy(val).to_string(),
                        None => "(nil)".to_string(),
                    }
                }
                Command::Set(key, value) => {
                    let record = WalRecord::Set {
                        key: key.as_bytes().to_vec(),
                        value: value.as_bytes().to_vec(),
                    };
                    self.wal.append(&record).unwrap();
                    self.tree.set(key.into_bytes(), value.into_bytes());
                    "OK".to_string()
                }
                Command::Scan(start, end) => {
                    let results = self.tree.scan(start.as_bytes(), end.as_bytes());
                    results.iter()
                        .map(|(k, v)| format!(
                            "{} -> {}",
                            String::from_utf8_lossy(k),
                            String::from_utf8_lossy(v)
                        ))
                        .collect::<Vec<_>>()
                        .join("\n")
                }
            },
            Err(e) => format!("Parse error: {}", e),
        }
    }
}

七、小结

这个实现省略了很多生产级数据库需要的东西:并发控制(锁/MVCC)、内部节点分裂的递归处理、页面淘汰策略(LRU)、崩溃恢复的完整性校验等。但它展示了一个 KV 存储引擎的核心骨架:数据在 B+ 树中组织,页是 I/O 的最小单位,WAL 保证写入的持久性

Rust 在这类系统编程中的优势很明显——零成本抽象让你可以放心地做内存布局优化,所有权系统防止了悬垂指针和 use-after-free,unsafe 块让底层操作的边界清晰可见。