Rust 项目实战(二):Mini-LSM

本文属于我的 Rust 学习笔记 系列。

Rust 入门学习笔记以实际例子为主,讲解部分不是从零开始的,所以不建议纯萌新观看,读者最好拥有任意一种面向对象语言的基础,然后自己多多少少看过 Rust 的基本语法,刷过一点 rustlings

来源:原子之音。当然也包含个人的一些补充。 视频 代码

Rust 进阶学习笔记以及实战的来源则五花八门,将会标注在下一行⬇️。

本节出处:迟神的 LSM in a Week 项目。

搞了这么久 Paimon,不研究 LSM Tree 的具体代码实现怎么行。久仰迟神大名,正好借学 Rust 的机会来整一波。想知道这是个什么,可以看知乎上的项目简介

参考答案: 官方 民间

这玩意的官方答案基本上就不是给人看的,尤其是对于 Rust 新手来说,谁会能想到这样写啊!民间版就易读很多。其实不看答案也没问题,迟神给每个子任务都写了测试,只要测试能跑过就说明(至少在当前阶段)代码没什么大问题。

我的代码实现,我会尽量把每个子任务的 commit 拆开,方便回溯。


All

这是项目的整体结构,共三周,每周 7 小节,可以看到还是非常强大和全面的。

本文暂时省略 LSM 的介绍,详细信息可以参考 RocksDB Wiki

本文不能代替项目文档,读者如有疑问还需优先查看项目文档。项目的环境安装、测试准备等在本文中也不再赘述,请看项目文档。

Week 1:Mini-LSM

week1

在 Week 1,我们将构建存储格式,系统的读写路径,并实现一个可用的基于 LSM 树的键值存储。本章结束时,引擎理应具备除持久化外,一个 LSM 树的全部必备功能。

Day1:Memtables

本节是内存表读写的实现。这是一个 LSM 树的最基本内容,放在最前面理所应当。

Task1:基于跳表的内存表

第一个任务的内容很简单,只需要修改mem_table.rs,补全创建内存表的方法,然后实现内存表基础的 get 和 put。

其实第一步就卡住我了,主要原因是一上来就引入了一大堆没见过的第三方库的结构,不过目前用到的不算多所以没卡太久。我们查看 MemTable 的定义

pub struct MemTable {
    map: Arc<SkipMap<Bytes, Bytes>>,
    wal: Option<Wal>,
    id: usize,
    approximate_size: Arc<AtomicUsize>,
}

文档说这个跳表提供了和 Rust 标准库中 BTreeMap 类似的 insert、get 和 iter 接口,那就好说了。先写初始化,既然是初始化,那就给结构体的所有字段赋空值就行。如果不知道默认值或者构造方法,可以直接点进源码去查看。这是我的实现:

/// Create a new mem-table.
pub fn create(_id: usize) -> Self {
    Self {
        map: Arc::new(SkipMap::new()),
        wal: None,
        id: _id,
        approximate_size: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    }
}

其实理论上应该把变量名开头的下划线去掉,因为这时候这些变量已经是使用了的,并非未使用的。不过我知道写完 day1 才意识到这一点,算了就这样吧,以后再说。

接下来写 get。get 肯定就是根据键去取结构体中 map 属性对应的值了。

/// Get a value by key.
pub fn get(&self, _key: &[u8]) -> Option<Bytes> {
    self.map.get(_key).map(|f| f.value().clone())
}

我这里用了闭包,当然也可以不用,也就是多几个 if/else 的事。同理写 put,put 不需要返回 Option,写起来就更简单了,直接 insert 就行:

pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    self.map
        .insert(Bytes::copy_from_slice(_key), Bytes::copy_from_slice(_value));
    Ok(())
}

&[u8] 转 Bytes 的写法可以从 map_bound 方法抄。运行测试,完美!task1 的两个都通过了。

Task2:单个内存表

第二个任务也很简单,需要在存储引擎中进行内存表的读写。本节要求我们实现lsm_storage.rs的LsmStorageInner::get 、 LsmStorageInner::put 和 LsmStorageInner::delete。显然,这个任务需要我们调用第一个任务实现的方法。

注意文档中有句关键的提示:

As our memtable implementation only requires an immutable reference for put, you ONLY need to take the read lock on state in order to modify the memtable.

什么锁?怎么获取锁?什么都不知道怎么办?一翻上下翻找后,我发现 MiniLsm 这个结构体默认已经实现了一些方法,主要来看这个方法:

pub fn force_flush(&self) -> Result<()> {
    if !self.inner.state.read().memtable.is_empty() {
        self.inner
            .force_freeze_memtable(&self.inner.state_lock.lock())?;
    }
    if !self.inner.state.read().imm_memtables.is_empty() {
        self.inner.force_flush_next_imm_memtable()?;
    }
    Ok(())
}

具体在做什么不用管,我们只需要知道它调用的self.inner就是我们正在开发的 LsmStorageInner。这样就有一个大概思路了。如法炮制,还是先来实现 get。这里会比任务 1 中复杂一些,主要在于需要获取锁,然后返回的结果又用 Result 包了一下。其实也没复杂多少,顺手的事:

/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
    Ok(self.state.read().memtable.get(_key).and_then(|bytes| {
        if bytes.is_empty() {
            None
        } else {
            Some(bytes.clone())
        }
    }))
}

这里我依然用了闭包,不用闭包的话这块写出来好丑。然后写 put,根据提示,我们依然只需要获取读锁,那就非常简单了:

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    self.state.read().memtable.put(_key, _value)
}

delete 更简单,根据提示,本课程中的删除就是写一个空值覆盖掉原本的值。那就正好利用上了我们刚实现好的 put 方法了:

/// Remove a key from the storage by writing an empty value.
pub fn delete(&self, _key: &[u8]) -> Result<()> {
    self.put(_key, &[])
}

可以发现并不难嘛。写到这里,我就渐渐有了信心,不慌了。

Task3:冻结内存表

好家伙,刚说完不难,就给我来了个下马威:这么长的说明文字!

其实仔细看下来也还好。本节就是要实现 force_freeze_memtable 方法。正当一头雾水无从下手的时候,我突然从上一个 practice 中得到了一个思路:完全可以面向测试用例编程嘛!看看测试是怎么做的就好了!

#[test]
fn test_task3_storage_integration() {
    // ... 
    storage
        .force_freeze_memtable(&storage.state_lock.lock())
        .unwrap();
    assert_eq!(storage.state.read().imm_memtables.len(), 1);
    let previous_approximate_size = storage.state.read().imm_memtables[0].approximate_size();
    assert!(previous_approximate_size >= 15);
    storage.put(b"1", b"2333").unwrap();
    storage.put(b"2", b"23333").unwrap();
    storage.put(b"3", b"233333").unwrap();
    storage
        .force_freeze_memtable(&storage.state_lock.lock())
        .unwrap();
    assert_eq!(storage.state.read().imm_memtables.len(), 2);
    assert!(
        storage.state.read().imm_memtables[1].approximate_size() == previous_approximate_size,
        "wrong order of memtables?"
    );
    assert!(storage.state.read().imm_memtables[0].approximate_size() > previous_approximate_size);
}

#[test]
fn test_task3_freeze_on_capacity() {
    let dir = tempdir().unwrap();
    let mut options = LsmStorageOptions::default_for_week1_test();
    options.target_sst_size = 1024;
    options.num_memtable_limit = 1000;
    let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap());
    for _ in 0..1000 {
        storage.put(b"1", b"2333").unwrap();
    }
    let num_imm_memtables = storage.state.read().imm_memtables.len();
    assert!(num_imm_memtables >= 1, "no memtable frozen?");
    for _ in 0..1000 {
        storage.delete(b"1").unwrap();
    }
    assert!(
        storage.state.read().imm_memtables.len() > num_imm_memtables,
        "no more memtable frozen?"
    );
}

可以看到,这两个测试分别是主动和被动调用了 force_freeze_memtable。并且这里涉及到一个变量 approximate_size 和两个配置 target_sst_size、num_memtable_limit。也就是说还需要估算一下 memtable 的大小。

先来写 force_freeze_memtable。这个方法要做的事情,就是在它无论因为什么而被调用到了的时候,就把当前 memtable 刷到 imm_memtables里面。注意到文档中还有句话

You can simply assign the next memtable id as self.next_sst_id(). Note that the imm_memtables stores the memtables from the latest one to the earliest one. That is to say, imm_memtables.first() should be the last frozen memtable.

我们先来看看这个 next_sst_id:

pub(crate) fn next_sst_id(&self) -> usize {
    self.next_sst_id
        .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}

next_sst_id 是一个 AtomicUsize。注意到 approximate_size 也是一个 AtomicUsize,这就是在提示我们 approximate_size 仿照这个改就可以了。此外,这段话还告诉我们 imm_memtables 需要头插,那没办法了,只能这么写:

// _state_lock_observer 暂时不知道是做什么用的,先不管它
pub fn force_freeze_memtable(&self, _state_lock_observer: &MutexGuard<'_, ()>) -> Result<()> {
    // 获取写锁的可变引用
    let mut state = self.state.write();
    let state = Arc::make_mut(&mut state);
    // replace 的作用是获取第一个参数的值并拿到所有权,然后将原来的参数替换成第二个参数中的值。
    // 我承认这个地方看了答案并问了 AI,我自己是不知道这个 API 的
    let old_mem = std::mem::replace(
        &mut state.memtable,
        Arc::new(MemTable::create(self.next_sst_id())),
    );
    state.imm_memtables.insert(0, old_mem);
    Ok(())
}

然后我们发现,主动调用的测试是要验证 approximate_size 的,那行吧,开抄。仿照 next_sst_id 在 mem_table.rs 的 put 方法里加一行就行:

pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    self.approximate_size.fetch_add(
        _key.len() + _value.len(),
        // 这里的顺序无所谓,完全可以用 Relaxed,不必死板地照抄
        // 具体不展开解释了,点开看看源码里的注释即可
        std::sync::atomic::Ordering::Relaxed,
    );
    self.map
        .insert(Bytes::copy_from_slice(_key), Bytes::copy_from_slice(_value));
    Ok(())
}

这个时候,主动调用的测试就通过了。

然后再来搞被动调用的。这需要我们处理增/删内存表元素的代码,每修改一个,就要做一次判断,看有没有达到配置的上限。

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    // 不能再简化成一行,需要把读锁拆出来了
    let guard = self.state.read();
    let res = guard.memtable.put(_key, _value);
    if res.is_ok() {
        // 从 UT case 中得知需要做一下判断
        if guard.memtable.approximate_size() > self.options.target_sst_size {
            // 参数不会传?直接照抄 force_flush 方法
            return self.force_freeze_memtable(&self.state_lock.lock());
        }
    }
    res
}

运行测试——坏了,怎么卡住了?卡住肯定是死锁了。为什么会卡住?还记不记得 force_freeze_memtable 中拿了写锁,但是这个方法里调用 force_freeze_memtable 的时候,读锁还在作用域没释放呢。那咋办呢?只能手动 drop 了。我们把变量也改个名字吧,看的更清楚:

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    let read_lock = self.state.read();
    let res = read_lock.memtable.put(_key, _value);
    if res.is_ok() {
        if read_lock.memtable.approximate_size() > self.options.target_sst_size {
            // 在拿写锁前释放读锁
            drop(read_lock);
            return self.force_freeze_memtable(&self.state_lock.lock());
        }
    }
    res
}

好!这时候运行测试,一切正常了!

补充:仔细看看文档,能够看到了这段

Because there could be multiple threads getting data into the storage engine, force_freeze_memtable might be called concurrently from multiple threads. You will need to think about how to avoid race conditions in this case.

原来还需要我们处理并发问题,并且测试测不出来。好吧,我就勉为其难地先加一个双重检查锁:

/// Put a key-value pair into the storage by writing into the current memtable.
pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
    let read_lock = self.state.read();
    let res = read_lock.memtable.put(_key, _value);
    if res.is_ok() {
        if read_lock.memtable.approximate_size() > self.options.target_sst_size {
            let state_lock = self.state_lock.lock();
            if read_lock.memtable.approximate_size() > self.options.target_sst_size {
                drop(read_lock);
                return self.force_freeze_memtable(&state_lock);
            }
        }
    }
    res
}

此处有个问题,如果相同的键多次写入,approximate_size 会一直增加,但实际 map 里的内容并没增加,内存表大小的估值就不准了。从文档看这是符合预期的,不过我没太理解,等我以后想通了再回来修改这一段。

Task4:获取路径

最后一个 Task 相对就比较简单了,其实就是说得能拿到历史数据。这是显然的:当前的 get 方法只从 memtable 里查找键,完全没考虑 immemtables,这哪儿行呢?写进去的数一旦满了刷盘就不作数了是吧?

所以就修改 get 吧。原来的结构肯定不能要了,还是优先闭包,这里显然要用 Some。把 memtable 的改造好后,如法炮制,写个遍历就好了。注意,我们之前向 imm_memtables 写的时候就是从前往后写的,所以读的时候也只需按顺序读,只要读到键,就可以根据他的值是不是空来返回结果,不需要继续读剩下的 imm_memtable 了。所以这么写完全 OK:

/// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter.
pub fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
    let read_lock = self.state.read();
    if let Some(bytes) = read_lock.memtable.get(_key) {
        return Ok((!bytes.is_empty()).then(|| bytes.clone()));
    }
    for imm_memtable in &read_lock.imm_memtables {
        if let Some(bytes) = imm_memtable.get(_key) {
            return Ok((!bytes.is_empty()).then(|| bytes.clone()));
        }
    }
    Ok(None)
}

很好,最后一个测试也通过了!


连做题带写博客,截至目前已经花了我 5 个小时,这才只是 Day1!没办法,毕竟还不熟悉,慢慢来吧。文档最后还有几个思考题,我顶不住了,先下线了。以后再补充吧。

注意

施工中


📝 系列导航