本文主要介绍了python 多线程实现多任务的方法示例,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧
1 多线程实现多任务
1.1 什么是线程?
进程是操作系统分配程序执行资源的单位,而线程是进程的一个实体,是CPU调度和分配的单位。一个进程肯定有一个主线程,我们可以在一个进程里创建多个线程来实现多任务。
1.2 一个程序实现多任务的方法
实现多任务,我们可以用几种方法。
(1)在主进程里面开启多个子进程,主进程和多个子进程一起处理任务。
(2)在主进程里开启多个子线程,主线程和多个子线程一起处理任务。
(3)在主进程里开启多个协程,多个协程一起处理任务。
注意:因为用多个线程一起处理任务,会产生线程安全问题,所以在开发中一般使用多进程+多协程来实现多任务。
1.3 多线程的创建方式
1.3.1 创建threading.Thread对象
1 2 3 |
|
我们来模拟一下多线程实现多任务。
假如你在用网易云音乐一边听歌一边下载。网易云音乐就是一个进程。假设网易云音乐内部程序是用多线程来实现多任务的,网易云音乐开两个子线程。一个用来缓存音乐,用于现在的播放。一个用来下载用户要下载的音乐的。这时候的代码框架是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
输出:
观察上面的输出代码可以知道:
CPU是按照时间片轮询的方式来执行子线程的。cpu内部会合理分配时间片。时间片到a程序的时候,a程序如果在休眠,就会自动切换到b程序。
严谨来说,CPU在某个时间点,只在执行一个任务,但是由于CPU运行速度和切换速度快,因为看起来像多个任务在一起执行而已。
1.3.2 继承threading.Thread,并重写run
除了上面的方法创建线程,还有另一种方法。可以编写一个类,继承threaing.Thread类,然后重写父类的run方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
输出:
运行时无序的,说明已经启用了多任务。
下面是threading.Thread提供的线程对象方法和属性:
start():创建线程后通过start启动线程,等待CPU调度,为run函数执行做准备;
run():线程开始执行的入口函数,函数体中会调用用户编写的target函数,或者执行被重载的run函数;
join([timeout]):阻塞挂起调用该函数的线程,直到被调用线程执行完成或超时。通常会在主线程中调用该方法,等待其他线程执行完成。
name、getName()&setName():线程名称相关的操作;
ident:整数类型的线程标识符,线程开始执行前(调用start之前)为None;
isAlive()、is_alive():start函数执行之后到run函数执行完之前都为True;
daemon、isDaemon()&setDaemon():守护线程相关;
1.4 线程何时开启,何时结束
(1)子线程何时开启,何时运行 当调用thread.start()时 开启线程,再运行线程的代码
(2)子线程何时结束 子线程把target指向的函数中的语句执行完毕后,或者线程中的run函数代码执行完毕后,立即结束当前子线程
(3)查看当前线程数量 通过threading.enumerate()可枚举当前运行的所有线程
(4)主线程何时结束 所有子线程执行完毕后,主线程才结束
示例一:
1 2 3 4 5 6 7 8 9 10 11 |
|
输出:
为什么主进程(主线程)的代码会先出现呢?因为CPU采用时间片轮询的方式,如果轮询到子线程,发现他要休眠1s,他会先去运行主线程。所以说CPU的时间片轮询方式可以保证CPU的最佳运行。
那如果我想主进程输出的那句话运行在结尾呢?该怎么办呢?这时候就需要用到 join() 方法了。
1.5 线程的 join() 方法
1 2 3 4 5 6 7 8 9 10 11 12 |
|
输出:
join() 方法可以阻塞主线程(注意只能阻塞主线程,其他子线程是不能阻塞的),直到 t1 子线程执行完,再解阻塞。
1.6 多线程共享全局变量出现的问题
我们开两个子线程,全局变量是0,我们每个线程对他自加1,每个线程加一百万次,这时候就会出现问题了,来,看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
输出
1459526 # 第一个子线程结束后全局变量一共加到这个数
1588806 # 第二个子线程结束后全局变量一共加到这个数
1588806 # 两个线程都结束后,全局变量一共加到这个数
奇怪了,我不是每个线程都自加一百万次吗?照理来说,应该最后的结果是200万才对的呀。问题出在哪里呢?
我们知道CPU是采用时间片轮询的方式进行几个线程的执行。
假设我CPU先轮询到work1(),num此时为100,在我运行到第10行时,时间结束了!此时,赋值了,但是还没有自加!即temp=100,num=100。
然后,时间片轮询到了work2(),进行赋值自加。num=101了。
又回到work1()的断点处,num=temp+1,temp=100,所以num=101。
就这样!num少了一次自加!在次数多了之后,这样的错误积累在一起,结果只得到158806!
这就是线程安全问题!
1.7 互斥锁可以弥补部分线程安全问题。(互斥锁和GIL锁是不一样的东西!)
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
互斥锁为资源引入一个状态:锁定/非锁定
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
互斥锁有三个常用步骤:
1 2 3 |
|
下面让我们用互斥锁来解决上面例子的线程安全问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
输出:
1945267 # 第一个子线程结束后全局变量一共加到这个数
2000000 # 第二个子线程结束后全局变量一共加到这个数
2000000 # 两个线程都结束后,全局变量一共加到这个数
1.8 线程池ThreadPoolExecutor
从Python3.2
开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类,实现了对threading
和multiprocessing
的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:
主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
当一个线程完成的时候,主线程能够立即知道。
让多线程和多进程的编码接口一致。
1.8.1 创建线程池
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
输出:
ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。
使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。
使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。
使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。
1.8.2 as_completed
上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed
方法一次取出所有任务的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
as_completed()
方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield
这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。
1.8.3 map
除了上面的as_completed
方法,还可以使用executor.map
方法,但是有一点不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
使用map
方法,无需提前使用submit
方法,map
方法与python
标准库中的map
含义相同,都是将序列中的每个元素都执行同一个函数。上面的代码就是对urls
的每个元素都执行get_html
函数,并分配各线程池。可以看到执行结果与上面的as_completed
方法的结果不同,输出顺序和urls
列表的顺序相同,就算2s的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。
1.8.4 wait
wait
方法可以让主线程阻塞,直到满足设定的要求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
wait
方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when
默认为ALL_COMPLETED
,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出main
。等待条件还可以设置为FIRST_COMPLETED
,表示第一个任务完成就停止等待。
2 多进程实行多任务
2.1 多线程的创建方式
创建进程的方式和创建线程的方式类似:
实例化一个multiprocessing.Process的对象,并传入一个初始化函数对象(initial function )作为新建进程执行入口;
继承multiprocessing.Process,并重写run函数;
2.1.1 方式1
在开始之前,我们要知道什么是进程。道理很简单,你平时电脑打开QQ客户端,就是一个进程。再打开一个QQ客户端,又是一个进程。那么,在python中如何用一篇代码就可以开启几个进程呢?通过一个简单的例子来演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
输出:
可以看到子进程对象是由multiprocessing模块中的Process类创建的。除了p1,p2两个被创建的子进程外。当然还有主进程。主进程就是我们从头到尾的代码,包括子进程也是由主进程创建的。
注意的点有:
(1)首先解释一下并发:并发就是当任务数大于cpu核数时,通过操作系统的各种任务调度算法,实现多个任务“一起”执行。(实际上总有一些任务不在执行,因为切换任务相当快,看上去想同时执行而已。)
(2)当是并发的情况下,子进程与主进程的运行都是没有顺序的,CPU会采用时间片轮询的方式,哪个程序先要运行就先运行哪个。
(3)主进程会默认等待所有子进程执行完毕后,它才会退出。所以在上面的例子中,p1,p2子进程是死循环进程,主进程的最后一句代码print("I am main task")虽然运行完了,但是主进程并不会关闭,他会一直等待着子进程。
(4)主进程默认创建的是非守护进程。注意,结合3.和5.看。
(5)但是!如果子进程是守护进程的话,那么主进程运行完最后一句代码后,主进程会直接关闭,不管你子进程运行完了没有!
2.1.2 方式2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
输出:
这里可以思考一下,如果像多线程一样,存在一个全局的变量share_data,不同进程同时访问share_data会有问题吗?
由于每一个进程拥有独立的内存地址空间且互相隔离,因此不同进程看到的share_data是不同的、分别位于不同的地址空间,同时访问不会有问题。这里需要注意一下。
2.2 守护进程
测试下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
输出:
I am main task
输出结果是不是有点奇怪。为什么p1,p2子进程都没有输出的?
让我们来整理一下思路:
创建p1,p2子进程
设置p1,p2子进程为守护进程
p1,p2子进程开启
p1,p2子进程代码里面都有休眠时间,所以cpu为了不浪费时间,先做主进程后续的代码。
执行主进程后续的代码,print("I am main task")
主进程后续的代码执行完成了,所以剩下的子进程是守护进程的,全都要关闭了。但是,如果主进程的代码执行完了,有两个子进程,一个是守护的,一个非守护的,怎么办呢?其实,他会等待非守护的那个子进程运行完,然后三个进程一起关闭。
p1,p2还在休眠时间内就被终结生命了,所以什么输出都没有。
例如,把P1设为非守护进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
输出:
里面涉及到两个知识点:
(1)当主进程结束后,会发一个消息给子进程(守护进程),守护进程收到消息,则立即结束
(2)CPU是按照时间片轮询的方式来运行多进程的。哪个合适的哪个运行,如果你的子进程里都有time.sleep。那我CPU为了不浪费资源,肯定先去干点其他的事情啊。
那么,守护进程随时会被中断,他的存在意义在哪里的?
其实,守护进程主要用来做与业务无关的任务,无关紧要的任务,可有可无的任务,比如内存垃圾回收,某些方法的执行时间的计时等。
2.3 创建的子进程要传入参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
输出:
子进程要运行的函数需要传入变量a,b,一个元组,一个字典。我们创建子进程的时候,变量a,b要放进元组里面,task函数取的时候会把前两个取出来,分别赋值给a,b了。
2.4 子进程几个常用的方法
p.start | 开始执行子线程 |
p.name | 查看子进程的名称 |
p.pid | 查看子进程的id |
p.is_alive | 判断子进程是否存活 |
p.join(timeout) |
阻塞主进程,当子进程p运行完毕后,再解开阻塞,让主进程运行后续的代码 如果timeout=2,就是阻塞主进程2s,这2s内主进程不能运行后续的代码。过了2s后,就算子进程没有运行完毕,主进程也能运行后续的代码 |
p.terminate | 终止子进程p的运行 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
评论 0 |
发表评论 取消回复