Python Seventh Step: 线程

Jan 12, 2018


本文

http://afra55.github.io/2018/01/12/python-seventh-step/

进程和线程

进程。计算机 程序 只是 存储 在 磁盘 上 的 可执行 二进制( 或 其他 类型) 文件。 只有 把 它们 加载 到 内存 中 并被 操作系统 调用, 才 拥有 其 生命 期。 进程( 有时 称为 重量级 进程) 则是 一个 执行 中的 程序。 每个 进程 都 拥有 自己的 地址 空间、 内存、 数据 栈 以及 其他 用于 跟踪 执行 的 辅助 数据。 操作 系统管理 其上 所有 进程 的 执行, 并为 这些 进程 合理 地 分配 时间。 进程 也可以 通过 派生( fork 或 spawn) 新的 进程 来 执行 其他 任务, 不过 因为 每个 新进 程 也都 拥有 自己的 内存 和 数据 栈 等, 所以 只能 采用 进程 间 通信( IPC) 的 方式 共享 信息

线程( 有时候 称为 轻量级 进程) 与 进程 类似, 不过 它们 是在 同一个 进程 下 执行 的, 并 共享 相同 的 上下文。 可以 将它 们 认为 是在 一个 主 进程 或“ 主 线程” 中 并行 运行 的 一些“ 迷你 进程”

线程 一般 是以 并发 方式 执行 的, 正是 由于 这种 并行 和数 据 共享 机制, 使得 多任务 间的 协作 成为 可能

在单核CPU系统中,是以线程让步的形式来实现并发,让步是指线程在运行过程中被中断或临时挂起(睡眠)

Python 的线程

Python 代码 的 执行 是由 Python 虚拟 机( 又名 解释器 主 循环) 进行 控制 的

Python 在主循环中同时只能有一个控制线程在执行

在任意给定时刻,只有一个程序在运行

对 Python 虚拟 机 的 访问 是由 全局 解释器 锁( GIL) 控制 的。 这个 锁 就是 用来 保证 同时 只能 有一个 线程 运行 的

设置 GIL
切换进入一个线程去运行
执行一个操作:指定数量的字节码指令 或 线程主动让出控制权(可以通过调用 time.sleep(0) 实现)
把线程设置为睡眠状态即切出线程
解锁 GIL
重复上面步骤

当一个线程完成其函数的执行时就会退出,还可以通过 thread.exit() 退出或者 sys.exit(),或者抛出 SystemExit 异常来退出

不能直接终止一个线程

thread 和 threading 模块用来创建和管理线程

thread 模块提供了基本的线程和锁定支持, 当主线程退出时,其他线程都会被强制结束并不会发出警告或适当清理

threading 模块提供了更高级更全面的线程管理

Queue 模块,用于创建队列数据结构在多线程之间进行数据共享

注意:避免使用 thread 模块,尽量使用 threading 模块

多线程适用于 I/O 密集型应用而不是计算密集型应用

thread 模块

锁 对象( lock object, 也叫 原 语 锁、 简单 锁、 互斥 锁、 互斥 和 二进制 信号 量)

thread 模块函数 描述
start_new_thread(function, args, kwargs=None) 派生一个新的线程,使用 args元组 和 kwargs 来执行函数 function
allocate_lock() 分配 LockType 锁对象
exit() 线程退出指令
LockType 锁对象方法 描述
acquire(blocking=True, timeout=-1) 取得锁,默认锁定锁对象,可能阻塞
locked() 判断是否在锁定状态,如果是则返回 True
release() 释放锁

示例

import _thread
from time import sleep, ctime


def loop_0():
    flag = '0'
    print(flag, ctime())
    sleep(6)
    print(flag, ctime())


def loop_1():
    flag = '1'
    print(flag, ctime())
    sleep(3)
    print(flag, ctime())


def main():
    print('start', ctime())
    _thread.start_new_thread(loop_0, ())
    _thread.start_new_thread(loop_1, ())
    sleep(7)    # 主线程睡眠
    print('all down', ctime())


if __name__ == '__main__':
    main()

输出

start Mon Jan 15 14:58:50 2018
1 Mon Jan 15 14:58:50 2018
0 Mon Jan 15 14:58:50 2018
1 Mon Jan 15 14:58:53 2018
0 Mon Jan 15 14:58:56 2018
all down Mon Jan 15 14:58:57 2018

Process finished with exit code 0

若果不阻止主线程执行,则

start Mon Jan 15 15:02:43 2018
all down Mon Jan 15 15:02:43 2018
0 Mon Jan 15 15:02:43 2018
1 Mon Jan 15 15:02:43 2018

Process finished with exit code 0

使用锁对象来实现主线程的挂起

import _thread
from time import sleep, ctime


sleep_list = [6, 3]     # 存储睡眠时间的列表


def loop(index, s_time, lock):
    print(index, 'start', ctime())
    sleep(s_time)
    print(index, 'end', ctime())
    lock.release()  # 释放锁


def main():
    print('start', ctime())
    locks = []
    index_list = range(len(sleep_list))

    for i in index_list:
        lock = _thread.allocate_lock()  # 获得锁对象
        lock.acquire()  # 取得锁, 并锁定锁
        locks.append(lock)  # 将锁放入列表

    for i in index_list:
        _thread.start_new_thread(loop, (i, sleep_list[i], locks[i]))    # 派生线程,传入元组参数

    for i in index_list:    # 暂停主循环
        while locks[i].locked():    # 当锁还在锁定状态时,进行循环
            pass

    print('all down', ctime())


if __name__ == '__main__':
    main()

输出

start Mon Jan 15 15:16:15 2018
1 start Mon Jan 15 15:16:15 2018
0 start Mon Jan 15 15:16:15 2018
1 end Mon Jan 15 15:16:18 2018
0 end Mon Jan 15 15:16:21 2018
all down Mon Jan 15 15:16:21 2018

Process finished with exit code 0

threading 模块

对象 描述
Thread 表示执行线程的对象
Lock 锁原语对象
RLock 重入锁,令单一线程可以(再次)获得已持有的锁
Condition 条件变量对象, 让一个线程等待另一个线程的所需满足条件,例如某个值改变或状态改变
Event 条件变量的通用版本,任意数量的线程等待某个条件的发生,在该事件发生后,所有等待的线程都被激活
Semaphore 为线程间共享的有限资源提供一个‘计数器’, 如果没有可用资源时会被阻塞
BoundedSemaphore Semaphore 的子类,不允许超过初始值 
Timer Thread 子类,在运行前需要等待一段时间
Barrier ‘障碍’,必须达到指定数量的线程后才可以继续
函数 描述
activeCount/active_count() 当前活动的 Thread 对象个数
currentThread/current_thread() 返回当前的 Thread 对象
enumerate() 返回当前活动的 Thread 对象列表
settrace(func) 为所有线程设置一个 trace 对象
setprofile(func) 为所有线程设置一个 profile 对象

守护线程 一般 是一 个 等待 客户 端 请求 服务 的 服务器。 如果 没有 客户 端 请求, 守护 线程 就是 空闲 的。 如果 把 一个 线程 设置 为 守护 线程, 就 表示 这个 线程 是 不重 要的, 进程 退出 时不 需要 等待 这个 线程 执行 完成

主线程在所有非守护线程退出后才会退出

注:如果多线程共享改变同一个数据时,应使用一个锁来使这个数据唯一

Thread

Thread对象属性 描述
name 线程名
ident 线程的标识符
daemon 布尔类型,表示该线程是否时守护线程
Thread对象方法 描述
init(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None) 实例化线程对象
start() 开始执行该线程
run() 定义线程功能的方法,通常在子类中被重写
join(timeout=None) 直到启动的线程终止前一直挂起,除非给定了 timeout 时间,否则一直阻塞
getName() 返回线程名字
setName(name) 设定线程名字
isAlive/is_alive() 布尔值,表示该线程是否存活
isDaemon() 布尔值,判断该线程是否是守护线程
setDaemon(daemonic) 设置守护线程标识,必须在 start() 之前调用

传递函数的示例

import threading
from time import sleep, ctime


sleep_list = [6, 3]     # 存储睡眠时间的列表


def loop(index, s_time):
    print(index, 'start', ctime())
    sleep(s_time)
    print(index, 'end', ctime())


def main():
    print('start', ctime())
    threads = []
    index_list = range(len(sleep_list))

    for i in index_list:
        t = threading.Thread(target=loop, args=(i, sleep_list[i]))
        threads.append(t)

    for i in index_list:
        threads[i].start()

    for i in index_list:
        threads[i].join()   # 挂起主线程,等待其他线程结束, 可以不调用 join() 主动挂起, 让主线程进行其他操作

    print('all down', ctime())


if __name__ == '__main__':
    main()

输出

start Mon Jan 15 16:45:55 2018
0 start Mon Jan 15 16:45:55 2018
1 start Mon Jan 15 16:45:55 2018
1 end Mon Jan 15 16:45:58 2018
0 end Mon Jan 15 16:46:01 2018
all down Mon Jan 15 16:46:01 2018

Process finished with exit code 0

注释掉 join() 后执行输出

start Mon Jan 15 16:47:14 2018
0 start Mon Jan 15 16:47:14 2018
1 start Mon Jan 15 16:47:14 2018
all down Mon Jan 15 16:47:14 2018
1 end Mon Jan 15 16:47:17 2018
0 end Mon Jan 15 16:47:20 2018

Process finished with exit code 0

传递可调用的类的示例

import threading
from time import sleep, ctime


sleep_list = [6, 3]     # 存储睡眠时间的列表


class ThreadFunc(object):

    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self, *args, **kwargs):
        """
        创建新线程时,会调用这个特殊方法
        """
        self.func(*self.args)   # 加 * 指元组的值一一对应函数的参数


def loop(index, s_time):
    print(index, 'start', ctime())
    sleep(s_time)
    print(index, 'end', ctime())


def main():
    print('start', ctime())
    threads = []
    index_list = range(len(sleep_list))

    for i in index_list:
        t = threading.Thread(target=ThreadFunc(loop, args=(i, sleep_list[i]), name=loop.__name__))
        threads.append(t)

    for i in index_list:
        threads[i].start()

    for i in index_list:
        threads[i].join()   # 挂起主线程,等待其他线程结束, 可以不调用 join() 主动挂起, 让主线程进行其他操作

    print('all down', ctime())


if __name__ == '__main__':
    main()

派生 Thread 子类,创建子类实例

import threading
from time import sleep, ctime


sleep_list = [6, 3]     # 存储睡眠时间的列表


class MyThread(threading.Thread):

    def __init__(self, func, args, name=None):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        """
        重写 run() 方法
        """
        self.func(*self.args)


def loop(index, s_time):
    print(index, 'start', ctime())
    sleep(s_time)
    print(index, 'end', ctime())


def main():
    print('start', ctime())
    threads = []
    index_list = range(len(sleep_list))

    for i in index_list:
        t = MyThread(loop, args=(i, sleep_list[i]), name=loop.__name__)
        threads.append(t)

    for i in index_list:
        threads[i].start()

    for i in index_list:
        threads[i].join()   # 挂起主线程,等待其他线程结束, 可以不调用 join() 主动挂起, 让主线程进行其他操作

    print('all down', ctime())


if __name__ == '__main__':
    main()

信号量

信号 量 是最 古老 的 同步 原 语 之一。 它是 一个 计数器, 当 资源 消耗 时 递减, 当 资源 释放 时 递增。

模拟 一个 简化 的 糖果 机。 这个 特制 的 机器 只有 5 个 可 用的 槽 来 保持 库存( 糖果)。 如果 所 有的 槽 都 满了, 糖果 就不能 再加 到这 个 机器 中了; 相似 地, 如果 每个 槽 都 空了, 想要 购买 的 消费者 就 无法 买到 糖果 了

from atexit import register
from random import randrange
from threading import BoundedSemaphore, Lock, Thread
from time import sleep, ctime

lock = Lock()   # 一个锁
MAX = 5         # 最大值
candytray = BoundedSemaphore(MAX)   # 信号量,即计数器


def refill():
    lock.acquire()
    print('填装糖果...', end=' ')
    try:
        candytray.release()     # 增加信号量
    except ValueError:          # 当大于最大值时
        print('已经满了')
    else:
        print('OK')
    lock.release()


def buy():
    lock.acquire()
    print('购买糖果...', end=' ')
    if candytray.acquire(False):    # 减少信号量, 不阻塞
        print('OK')
    else:
        print('卖完了')
    lock.release()


def producer(loops):
    for i in range(loops):
        refill()    # 制作糖果
        sleep(randrange(3))


def consumer(loops):
    for i in range(loops):
        buy()       # 购买糖果
        sleep(randrange(3))


def _main():
    print('开始时间:', ctime())
    nloops = randrange(2, 6)
    print('糖果机 (最大库存数 %d)!' % MAX)
    Thread(target=consumer, args=(randrange(
        nloops, nloops + MAX + 2),)).start()  # buyer
    Thread(target=producer, args=(nloops,)).start()  # vendor


@register       # register 注解,用于在主线程结束时调用该注解方法
def _atexit():
    print('完成时间:', ctime())


if __name__ == '__main__':
    _main()

输出

开始时间: Tue Jan 16 15:31:40 2018
糖果机 (最大库存数 5)!
购买糖果... OK
填装糖果... OK
购买糖果... OK
填装糖果... OK
购买糖果... OK
购买糖果... OK
填装糖果... OK
完成时间: Tue Jan 16 15:31:47 2018

Process finished with exit code 0

queue 模块

队列

queue 模块的类 描述
Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,则队列会在没有空间时阻塞,否则是无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,则队列会在没有空间时阻塞,否则是无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,则队列会在没有空间时阻塞,否则是无限队列
queue 异常 描述
Empty 当对空队列调用 get*() 方法时抛出异常
Full 当对已满的队列使用 put*() 方法时抛出异常
queue 对象的方法 描述
qsize() 返回队列大小(由于反回时,该值可能被其他线程修改,so 这是个近似值)
empty() 返回布尔值,判断队列是否为空
full() 返回布尔值,判断队列是否已满
put(item, block=True, timeout=None) 将 item 放入队列。如果 block 是 True 和 timeout 是 None,则队列会在有可用空间之前阻塞, 通过设置 timeout 来设置最多阻塞时间(秒);如果 block 为 False 则队列在没有空间时 put() 会抛出 Full 异常
put_nowait(item) 即 put(item, block=False)
get(block=true, timeout=none) 从队列中取得元素。如果 block 是 True 和 timeout 是 None, 则队列会在有可用元素之前阻塞,通过设置 timeout 来设置最多阻塞时间(秒);如果 block 为 False 则队列在没有可用元素时 get() 会抛出 Empty 异常
get_nowait() 即 get(block=False)
task_done() 用于表示队列中的某个元素执行完成, 该方法会被 join() 使用
join() 在队列中所有元素执行完毕并调用上面的 task_done() 之前,保持阻塞

一个消费和生产的例子

import threading
from random import randrange
from time import sleep, ctime
import queue


class MyThread(threading.Thread):
    def __init__(self, func, args, name='', verb=False):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
        self.verb = verb

    def get_result(self):
        return self.res

    def run(self):
        if self.verb:
            print('starting', self.name, 'at:', ctime())
        self.res = self.func(*self.args)
        if self.verb:
            print(self.name, 'finished at:', ctime())


def write_q(temp_queue):
    print('放入队列一个...', end='')
    temp_queue.put('xxx', True)     # 放入队列
    print("size now", temp_queue.qsize())


def read_q(temp_queue):
    val = temp_queue.get(True)  # 从队列取出
    print('从队列取出一个... size now', temp_queue.qsize())


def writer(temp_queue, loops):
    for i in range(loops):
        write_q(temp_queue)
        sleep(randrange(1, 4))


def reader(temp_queue, loops):
    for i in range(loops):
        read_q(temp_queue)
        sleep(randrange(2, 6))


funcs = [writer, reader]
nfuncs = range(len(funcs))


def main():
    n_loops = randrange(2, 6)
    q = queue.Queue(32)     # 队列

    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (q, n_loops), funcs[i].__name__)
        threads.append(t)

    for i in nfuncs:
        threads[i].start()

    for i in nfuncs:
        threads[i].join()

    print('all DONE')


if __name__ == '__main__':
    main()

相关模块

subprocess 模块:可用于派生进程,可以 单纯 地 执行任务, 或者 通过 标准 文件( stdin、 stdout、 stderr) 进行 进程 间 通信

multiprocessing 模块:允许 为多 核 或 多 CPU 派生 进程, 其 接口 与 threading 模块 非常 相似。 该 模块 同样 也 包括 在 共享 任务 的 进程 间 传输 数据 的 多种 方式

concurrent.futures 模块:不再 需要 过分 关注 同步 和 线程/ 进程 的 管理 了, 只需 要 指定 一个 给定 了“ worker” 数量 的 线程/ 进程 池, 提交 任务, 然后 整理 结果

mutex 模块:互斥对象

SocketServer 模块:创建管理线程控制的 TCP/UDP 服务器