从零实现一个简单的 KV 数据库引擎,支持 GET/SET/SCAN 操作,底层使用 B+ 树索引和 WAL 预写日志。本文记录核心模块的设计与 Rust 实现。
一、整体架构
┌──────────────────┐
│ SQL Parser │ ← nom 解析简单的 GET/SET/SCAN 语句
├──────────────────┤
│ Query Engine │ ← 执行层
├──────────────────┤
│ B+Tree Index │ ← 内存中的 B+ 树
├──────────────────┤
│ Page Manager │ ← 4KB 页管理
├──────────────────┤
│ WAL (Write- │ ← 预写日志,保证持久化
│ Ahead Log) │
├──────────────────┤
│ Disk Storage │ ← 数据文件
└──────────────────┘
核心设计决策:
- 页大小固定 4KB,与操作系统页面对齐
- B+ 树的叶子节点存储实际的 KV 数据
- 所有写操作先写 WAL,再更新 B+ 树,最后刷盘
- 支持 GET(点查)、SET(写入)、SCAN(范围查询)三种操作
二、Page 管理
页是存储的最小单位。每个页包含一个 header 和若干条记录:
const PAGE_SIZE: usize = 4096;
const PAGE_HEADER_SIZE: usize = 16;
#[derive(Debug, Clone, Copy)]
#[repr(C)]
struct PageHeader {
page_id: u32,
page_type: u8, // 0=内部节点, 1=叶子节点
num_records: u16,
free_offset: u16, // 空闲空间起始偏移
right_sibling: u32, // 叶子节点的右兄弟(用于 SCAN)
_padding: [u8; 3],
}
struct Page {
data: [u8; PAGE_SIZE],
}
impl Page {
fn new(page_id: u32, page_type: u8) -> Self {
let mut page = Page { data: [0u8; PAGE_SIZE] };
let header = PageHeader {
page_id,
page_type,
num_records: 0,
free_offset: PAGE_HEADER_SIZE as u16,
right_sibling: 0,
_padding: [0; 3],
};
// 写入 header
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const PageHeader as *const u8,
std::mem::size_of::<PageHeader>(),
)
};
page.data[..PAGE_HEADER_SIZE].copy_from_slice(header_bytes);
page
}
fn header(&self) -> &PageHeader {
unsafe { &*(self.data.as_ptr() as *const PageHeader) }
}
fn header_mut(&mut self) -> &mut PageHeader {
unsafe { &mut *(self.data.as_mut_ptr() as *mut PageHeader) }
}
}
页管理器负责页的分配、读取和写回:
use std::collections::HashMap;
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};
struct PageManager {
file: File,
next_page_id: u32,
cache: HashMap<u32, Page>, // 简单的页缓存
}
impl PageManager {
fn open(path: &str) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new()
.read(true).write(true).create(true)
.open(path)?;
let file_len = file.metadata()?.len();
let next_page_id = (file_len / PAGE_SIZE as u64) as u32;
Ok(PageManager {
file,
next_page_id,
cache: HashMap::new(),
})
}
fn allocate(&mut self, page_type: u8) -> u32 {
let id = self.next_page_id;
self.next_page_id += 1;
let page = Page::new(id, page_type);
self.cache.insert(id, page);
id
}
fn get_page(&mut self, page_id: u32) -> std::io::Result<&Page> {
if !self.cache.contains_key(&page_id) {
let mut page = Page { data: [0u8; PAGE_SIZE] };
self.file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))?;
self.file.read_exact(&mut page.data)?;
self.cache.insert(page_id, page);
}
Ok(self.cache.get(&page_id).unwrap())
}
fn flush(&mut self, page_id: u32) -> std::io::Result<()> {
if let Some(page) = self.cache.get(&page_id) {
self.file.seek(SeekFrom::Start(page_id as u64 * PAGE_SIZE as u64))?;
self.file.write_all(&page.data)?;
self.file.flush()?;
}
Ok(())
}
}
三、WAL 预写日志
WAL 是持久化的关键。在修改 B+ 树之前,先把操作记录写入 WAL 文件。如果进程崩溃,可以通过重放 WAL 恢复数据。
use std::io::BufWriter;
#[derive(Debug)]
enum WalRecord {
Set { key: Vec<u8>, value: Vec<u8> },
Delete { key: Vec<u8> },
Checkpoint, // 标记数据已刷盘,之前的记录可以丢弃
}
struct Wal {
writer: BufWriter<File>,
path: String,
}
impl Wal {
fn open(path: &str) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true).append(true).open(path)?;
Ok(Wal {
writer: BufWriter::new(file),
path: path.to_string(),
})
}
fn append(&mut self, record: &WalRecord) -> std::io::Result<()> {
// 格式: [type: 1B][key_len: 4B][key][value_len: 4B][value]
match record {
WalRecord::Set { key, value } => {
self.writer.write_all(&[0x01])?;
self.writer.write_all(&(key.len() as u32).to_le_bytes())?;
self.writer.write_all(key)?;
self.writer.write_all(&(value.len() as u32).to_le_bytes())?;
self.writer.write_all(value)?;
}
WalRecord::Delete { key } => {
self.writer.write_all(&[0x02])?;
self.writer.write_all(&(key.len() as u32).to_le_bytes())?;
self.writer.write_all(key)?;
}
WalRecord::Checkpoint => {
self.writer.write_all(&[0xFF])?;
}
}
self.writer.flush()?;
Ok(())
}
fn replay<F: FnMut(WalRecord)>(path: &str, mut callback: F) -> std::io::Result<()> {
let mut file = File::open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut pos = 0;
while pos < buf.len() {
match buf[pos] {
0x01 => {
pos += 1;
let key_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
pos += 4;
let key = buf[pos..pos+key_len].to_vec();
pos += key_len;
let val_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
pos += 4;
let value = buf[pos..pos+val_len].to_vec();
pos += val_len;
callback(WalRecord::Set { key, value });
}
0x02 => {
pos += 1;
let key_len = u32::from_le_bytes(buf[pos..pos+4].try_into().unwrap()) as usize;
pos += 4;
let key = buf[pos..pos+key_len].to_vec();
pos += key_len;
callback(WalRecord::Delete { key });
}
0xFF => {
pos += 1;
callback(WalRecord::Checkpoint);
}
_ => break,
}
}
Ok(())
}
}
四、B+ 树索引
B+ 树是数据库索引的经典结构。叶子节点存储实际数据并通过链表相连(支持范围扫描),内部节点只存储分隔键和子节点指针。
const B_PLUS_TREE_ORDER: usize = 64; // 每个节点最多 64 个 key
#[derive(Debug, Clone)]
enum BPlusNode {
Internal {
keys: Vec<Vec<u8>>,
children: Vec<u32>, // 子节点的 page_id
},
Leaf {
keys: Vec<Vec<u8>>,
values: Vec<Vec<u8>>,
next_leaf: Option<u32>, // 右兄弟叶子节点
},
}
struct BPlusTree {
root: u32,
page_manager: PageManager,
nodes: HashMap<u32, BPlusNode>, // 简化:内存中维护节点映射
}
impl BPlusTree {
fn new(page_manager: PageManager) -> Self {
let mut tree = BPlusTree {
root: 0,
page_manager,
nodes: HashMap::new(),
};
// 创建初始的空叶子节点作为 root
let root_id = tree.page_manager.allocate(1);
tree.root = root_id;
tree.nodes.insert(root_id, BPlusNode::Leaf {
keys: Vec::new(),
values: Vec::new(),
next_leaf: None,
});
tree
}
fn get(&self, key: &[u8]) -> Option<&Vec<u8>> {
let leaf_id = self.find_leaf(key);
if let Some(BPlusNode::Leaf { keys, values, .. }) = self.nodes.get(&leaf_id) {
if let Ok(idx) = keys.binary_search_by(|k| k.as_slice().cmp(key)) {
return Some(&values[idx]);
}
}
None
}
fn find_leaf(&self, key: &[u8]) -> u32 {
let mut current = self.root;
loop {
match self.nodes.get(¤t) {
Some(BPlusNode::Internal { keys, children }) => {
let idx = keys.partition_point(|k| k.as_slice() <= key);
current = children[idx];
}
Some(BPlusNode::Leaf { .. }) => return current,
None => panic!("node {} not found", current),
}
}
}
fn set(&mut self, key: Vec<u8>, value: Vec<u8>) {
let leaf_id = self.find_leaf(&key);
if let Some(BPlusNode::Leaf { keys, values, .. }) = self.nodes.get_mut(&leaf_id) {
match keys.binary_search_by(|k| k.as_slice().cmp(&key)) {
Ok(idx) => {
// key 已存在,更新 value
values[idx] = value;
}
Err(idx) => {
// 插入新 key
keys.insert(idx, key);
values.insert(idx, value);
// 检查是否需要分裂
if keys.len() >= B_PLUS_TREE_ORDER {
self.split_leaf(leaf_id);
}
}
}
}
}
fn split_leaf(&mut self, leaf_id: u32) {
// 分裂叶子节点:将后半部分移到新节点
let mid = B_PLUS_TREE_ORDER / 2;
let (split_key, new_id);
if let Some(BPlusNode::Leaf { keys, values, next_leaf }) = self.nodes.get_mut(&leaf_id) {
let new_keys = keys.split_off(mid);
let new_values = values.split_off(mid);
split_key = new_keys[0].clone();
let old_next = *next_leaf;
new_id = self.page_manager.allocate(1);
*next_leaf = Some(new_id);
self.nodes.insert(new_id, BPlusNode::Leaf {
keys: new_keys,
values: new_values,
next_leaf: old_next,
});
} else {
return;
}
// 将分裂键插入父节点(简化处理:如果是 root 则新建 root)
if leaf_id == self.root {
let new_root_id = self.page_manager.allocate(0);
self.nodes.insert(new_root_id, BPlusNode::Internal {
keys: vec![split_key],
children: vec![leaf_id, new_id],
});
self.root = new_root_id;
}
// 完整实现需要递归处理父节点分裂,此处省略
}
/// SCAN: 范围查询 [start_key, end_key)
fn scan(&self, start_key: &[u8], end_key: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut results = Vec::new();
let mut leaf_id = self.find_leaf(start_key);
loop {
match self.nodes.get(&leaf_id) {
Some(BPlusNode::Leaf { keys, values, next_leaf }) => {
for (i, k) in keys.iter().enumerate() {
if k.as_slice() >= start_key && k.as_slice() < end_key {
results.push((k.clone(), values[i].clone()));
}
if k.as_slice() >= end_key {
return results;
}
}
match next_leaf {
Some(next) => leaf_id = *next,
None => return results,
}
}
_ => break,
}
}
results
}
}
五、简单的 SQL 解析器
用 nom 解析简单的 GET/SET/SCAN 命令:
use nom::{
branch::alt,
bytes::complete::{tag_no_case, take_while1},
character::complete::{multispace0, multispace1},
combinator::map,
sequence::{preceded, tuple},
IResult,
};
#[derive(Debug)]
enum Command {
Get(String),
Set(String, String),
Scan(String, String),
}
fn parse_identifier(input: &str) -> IResult<&str, &str> {
take_while1(|c: char| c.is_alphanumeric() || c == '_' || c == '-')(input)
}
fn parse_quoted_string(input: &str) -> IResult<&str, &str> {
let (input, _) = tag_no_case("'")(input)?;
let (input, s) = take_while1(|c: char| c != ''')(input)?;
let (input, _) = tag_no_case("'")(input)?;
Ok((input, s))
}
fn parse_value(input: &str) -> IResult<&str, &str> {
alt((parse_quoted_string, parse_identifier))(input)
}
fn parse_get(input: &str) -> IResult<&str, Command> {
map(
preceded(
tuple((tag_no_case("GET"), multispace1)),
parse_value,
),
|key| Command::Get(key.to_string()),
)(input)
}
fn parse_set(input: &str) -> IResult<&str, Command> {
map(
tuple((
tag_no_case("SET"),
multispace1,
parse_value,
multispace1,
parse_value,
)),
|(_, _, key, _, value)| Command::Set(key.to_string(), value.to_string()),
)(input)
}
fn parse_scan(input: &str) -> IResult<&str, Command> {
map(
tuple((
tag_no_case("SCAN"),
multispace1,
parse_value,
multispace1,
parse_value,
)),
|(_, _, start, _, end)| Command::Scan(start.to_string(), end.to_string()),
)(input)
}
fn parse_command(input: &str) -> IResult<&str, Command> {
preceded(multispace0, alt((parse_get, parse_set, parse_scan)))(input)
}
六、整合执行引擎
struct Database {
tree: BPlusTree,
wal: Wal,
}
impl Database {
fn open(data_path: &str, wal_path: &str) -> std::io::Result<Self> {
let page_manager = PageManager::open(data_path)?;
let mut tree = BPlusTree::new(page_manager);
let wal = Wal::open(wal_path)?;
// 重放 WAL 恢复数据
if std::path::Path::new(wal_path).exists() {
Wal::replay(wal_path, |record| {
match record {
WalRecord::Set { key, value } => tree.set(key, value),
WalRecord::Delete { .. } => { /* TODO */ }
WalRecord::Checkpoint => {}
}
})?;
}
Ok(Database { tree, wal })
}
fn execute(&mut self, input: &str) -> String {
match parse_command(input) {
Ok((_, cmd)) => match cmd {
Command::Get(key) => {
match self.tree.get(key.as_bytes()) {
Some(val) => String::from_utf8_lossy(val).to_string(),
None => "(nil)".to_string(),
}
}
Command::Set(key, value) => {
let record = WalRecord::Set {
key: key.as_bytes().to_vec(),
value: value.as_bytes().to_vec(),
};
self.wal.append(&record).unwrap();
self.tree.set(key.into_bytes(), value.into_bytes());
"OK".to_string()
}
Command::Scan(start, end) => {
let results = self.tree.scan(start.as_bytes(), end.as_bytes());
results.iter()
.map(|(k, v)| format!(
"{} -> {}",
String::from_utf8_lossy(k),
String::from_utf8_lossy(v)
))
.collect::<Vec<_>>()
.join("\n")
}
},
Err(e) => format!("Parse error: {}", e),
}
}
}
七、小结
这个实现省略了很多生产级数据库需要的东西:并发控制(锁/MVCC)、内部节点分裂的递归处理、页面淘汰策略(LRU)、崩溃恢复的完整性校验等。但它展示了一个 KV 存储引擎的核心骨架:数据在 B+ 树中组织,页是 I/O 的最小单位,WAL 保证写入的持久性。
Rust 在这类系统编程中的优势很明显——零成本抽象让你可以放心地做内存布局优化,所有权系统防止了悬垂指针和 use-after-free,unsafe 块让底层操作的边界清晰可见。