rust 实战 - 实现一个线程工作池 ThreadPool

   2023-02-09 学习力0
核心提示:如何实现一个线程池线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用

如何实现一个线程池

线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。

如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。

线程池Pool

pub struct Pool {
  max_workers: usize, // 定义最大线程数
}

impl Pool {
  fn new(max_workers: usize) -> Pool {}
  fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
}

execute来执行任务,F: FnOnce() + 'static + Send 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。

另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如Vec<Thread> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。
可以看作在一个线程里不断执行获取任务并执行的Worker。

struct Worker where
{
    _id: usize, // worker 编号
}

要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,let (tx, rx) = mpsc::channel() 可以获取到一对发送端和接收端。
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。

这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享,因此需要用Arc<Mutex::<T>>来包裹起来,也就是用锁来解决并发冲突。

Pool的完整定义

pub struct Pool {
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

该是时候定义我们要发给Worker的消息Message了
定义如下的枚举值

type Job = Box<dyn FnOnce() + 'static + Send>;
enum Message {
    ByeBye,
    NewJob(Job),
}

Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。

只剩下实现Worker和Pool的具体逻辑了。

Worker的实现

impl Worker
{
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn( move || {
            loop {
                let receiver = receiver.lock().unwrap();
                let message=  receiver.recv().unwrap();
                match message {
                    Message::NewJob(job) => {
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
            _id: id,
            t: Some(t),
        }
    }
}

let message = receiver.lock().unwrap().recv().unwrap(); 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。
但如果写成

while let message = receiver.lock().unwrap().recv().unwrap() {
};

while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。

我们给Pool实现Drop trait, 让Pool被销毁时,自动暂停掉worker线程的执行。

impl Drop for Pool {
    fn drop(&mut self) {
        for _ in 0..self.max_workers {
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers.iter_mut() {
            if let Some(t) = w.t.take() {
                t.join().unwrap();
            }
        }
    }
}

drop方法里面用了两个循环,而不是在一个循环里做完两件事?

for w in self.workers.iter_mut() {
    if let Some(t) = w.t.take() {
        self.sender.send(Message::ByeBye).unwrap();
        t.join().unwrap();
    }
}

这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。

注意到没有,Worker是被包装在Option内的,这里有两个点需要注意

  1. t.join 需要持有t的所有权
  2. 在我们这种情况下,self.workers只能作为引用被for循环迭代。

这里考虑让Worker持有Option<JoinHandle<()>>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程

struct Worker where
{
    _id: usize,
    t: Option<JoinHandle<()>>,
}

要点总结

  • Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁
  • Vec<Option<T>> 可以解决某些情况下需要T所有权的场景

完整代码

use std::thread::{self, JoinHandle};
use std::sync::{Arc, mpsc, Mutex};


type Job = Box<dyn FnOnce() + 'static + Send>;
enum Message {
    ByeBye,
    NewJob(Job),
}

struct Worker where
{
    _id: usize,
    t: Option<JoinHandle<()>>,
}

impl Worker
{
    fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn( move || {
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();
                match message {
                    Message::NewJob(job) => {
                        println!("do job from worker[{}]", id);
                        job();
                    },
                    Message::ByeBye => {
                        println!("ByeBye from worker[{}]", id);
                        break
                    },
                }  
            }
        });

        Worker {
            _id: id,
            t: Some(t),
        }
    }
}

pub struct Pool {
    workers: Vec<Worker>,
    max_workers: usize,
    sender: mpsc::Sender<Message>
}

impl Pool where {
    pub fn new(max_workers: usize) -> Pool {
        if max_workers == 0 {
            panic!("max_workers must be greater than zero!")
        }
        let (tx, rx) = mpsc::channel();

        let mut workers = Vec::with_capacity(max_workers);
        let receiver = Arc::new(Mutex::new(rx));
        for i in 0..max_workers {
            workers.push(Worker::new(i, Arc::clone(&receiver)));
        }

        Pool { workers: workers, max_workers: max_workers, sender: tx }
    }
    
    pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
    {

        let job = Message::NewJob(Box::new(f));
        self.sender.send(job).unwrap();
    }
}

impl Drop for Pool {
    fn drop(&mut self) {
        for _ in 0..self.max_workers {
            self.sender.send(Message::ByeBye).unwrap();
        }
        for w in self.workers {
            if let Some(t) = w.t.take() {
                t.join().unwrap();
            }
        }
    }
}


#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn it_works() {
        let p = Pool::new(4);
        p.execute(|| println!("do new job1"));
        p.execute(|| println!("do new job2"));
        p.execute(|| println!("do new job3"));
        p.execute(|| println!("do new job4"));
    }
}
 
反对 0举报 0 评论 0
 

免责声明:本文仅代表作者个人观点,与乐学笔记(本网)无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
    本网站有部分内容均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,若因作品内容、知识产权、版权和其他问题,请及时提供相关证明等材料并与我们留言联系,本网站将在规定时间内给予删除等相关处理.

  • bloom-server 基于 rust 编写的 rest api cache 中间件
    bloom-server 基于 rust 编写的 rest api cache
    bloom-server 基于 rust 编写的 rest api cache 中间件,他位于lb 与api worker 之间,使用redis 作为缓存内容存储, 我们需要做的就是配置proxy,同时他使用基于share 的概念,进行cache 的分布存储,包含了请求端口(proxy,访问数据) 以及cache 控制端口(
    03-08
  • #新闻拍一拍# Oracle 调研如何避免让 Java 开发者投奔 Rust 和 Kotlin | Linux 中国
    #新闻拍一拍# Oracle 调研如何避免让 Java 开发
     导读:• 英特尔对迟迟不被 Linux 主线接受的 SGX Enclave 进行了第 38 次修订 • ARM 支持开源的 Panfrost Gallium3D 驱动本文字数:977,阅读时长大约:1分钟作者:硬核老王Oracle 调研如何避免让 Java 开发者投奔 Rust 和 KotlinOracle 委托分析公司 Omd
    03-08
  • Linux系统下Rust快速安装:国内镜像加速
    Linux系统下Rust快速安装:国内镜像加速
    官方网址和方法Install Rust - Rust Programming Language然而速度慢得让人难以置信。利用国内镜像进行windows的Linux子系统的Rust安装。rust 使用国内镜像,快速安装方法参考:RUST安装慢怎么办,使用镜像方式安装_网络_为中华之崛起而编程-CSDN博客我的操作
    03-08
  • Rust到底值不值得学--Rust对比、特色和理念
    前言其实我一直弄不明白一点,那就是计算机技术的发展,是让这个世界变得简单了,还是变得更复杂了。当然这只是一个玩笑,可别把这个问题当真。然而对于IT从业者来说,这可不是一个玩笑。几乎每一次的技术发展,都让这个生态变得更为复杂。“英年早秃”已经成
    03-08
  • 超33000行新代码,为Linux内核添加Rust支持的补丁已准备就绪
    超33000行新代码,为Linux内核添加Rust支持的补
    https://mp.weixin.qq.com/s/oKw9aBJSdmRoO6-rbLAkNw7 月 4 日,一套修订后的补丁被提交至 Linux 内核的邮件列表中,该补丁为在 Linux 内核中以 Rust 作为辅助编程语言提供了支持,借助 Rust 可以提高 Linux 内核和内存的安全。整套补丁包含 17 个子项,不光
    03-08
  • 【译】Rust 的 Result 类型入门
    【译】Rust 的 Result 类型入门
    A Primer on Rust’s Result Type 译文原文链接:https://medium.com/@JoeKreydt/a-primer-on-rusts-result-type-66363cf18e6a原文作者:Joe Kreydt译文出处:https://github.com/suhanyujie/article-transfer-rs译者:suhanyujietips:水平有限,翻译不当之
    03-08
  • Rust实战系列-基本语法
    Rust实战系列-基本语法
    主要介绍 Rust 的语法、基本类型和数据结构,通过实现一个简单版 grep 命令行工具,来理解 Rust 独有的特性。本文是《Rust in action》学习总结系列的第二部分,更多内容请看已发布文章:一、Rust实战系列-Rust介绍“主要介绍 Rust 的语法、基本类型和数据结
    03-08
  • 全栈程序员的新玩具Rust(三)板条箱
    上次用到了stdout,这次我们来写一个更复杂一点的游戏rust的标准库叫做std,默认就会引入。这次我们要用到一个随机数函数,而随机数比较尴尬的一点是这玩意不在标准库中,我们要额外依赖一个库。很多编程方案都有自己的模块化库系统,rust也不例外,不过rust
    02-10
  • 全栈程序员的新玩具Rust(六)第一个WASM程序
    全栈程序员的新玩具Rust(六)第一个WASM程序
    先上代码https://gitee.com/lightsever/rust_study/tree/master/wasm_hello01webassembly就不用再赘述了,耳朵里面快磨出茧子来了。rustwasm是火狐自家的玩具,让我们来继续做实验,让rust飞起来吧。环境安装安装好rust环境之后仍然需要 一个 wasm 工具包carg
    02-10
  • 【Rust】标准库-Result rust数据库
    环境Rust 1.56.1VSCode 1.61.2概念参考:https://doc.rust-lang.org/stable/rust-by-example/std/result.html示例main.rsmod checked {#[derive(Debug)]pub enum MathError {DivisionByZero,NonPositiveLogarithm,NegativeSquareRoot,}pub type MathResult =
    02-09
点击排行