精选文章 Python爬虫的经典多线程方式,生产者与消费者模型

Python爬虫的经典多线程方式,生产者与消费者模型

作者:Python新世界 时间: 2020-08-05 01:09:37
Python新世界 2020-08-05 01:09:37

在之前的文章当中我们曾经说道,在多线程并发的场景当中,如果我们需要感知线程之间的状态,交换线程之间的信息是一件非常复杂和困难的事情。因为我们没有更高级的系统权限,也没有上帝视角,很难知道目前运行的状态的全貌,所以想要设计出一个稳健运行没有bug的功能,不仅非常困难,而且调试起来非常麻烦。

很多人学习python,不知道从何学起。
很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手。
很多已经做案例的人,却不知道如何去学习更加高深的知识。
那么针对这三类人,我给大家提供一个好的学习平台,免费领取视频教程,电子书籍,以及课程的源代码!
QQ群:1097524789

生产消费者模式

在日常开发当中, 从一个线程向另外的线程传输数据又是一件家常便饭的事情 。举个最简单的例子,我们在处理网页请求的时候,需要打印下来这一次请求的相关日志。打印日志是一次IO行为,这是非常消耗时间的,所以我们不能放在请求当中同步进行,否则会影响系统的性能。最好的办法就是启动一系列线程专门负责打印,后端的线程只负责响应请求,相关的日志以消息的形式传送给打印线程打印。

这个简单的不能再简单的功能当中涉及了诸多细节,我们来盘点几个。首先IO线程的数据都是从后台线程来的,假如一段时间内没有请求,那么这些线程都应该休眠,应该在有请求的时候才会启动。其次,如果某一段时间内请求非常多,导致IO线程一时间来不及打印所有的数据,那么当下的请求应该先暂存起来,等IO线程”忙过来“之后再进行处理。

把这些细节都考虑到,自己来设计功能还是挺麻烦的。好在这个问题前人已经替我们想过了,并且得出了一个非常经典的设计模式,使用它可以很好的解决这个问题。这个模式就是 生产消费者模式 。

这个设计模式的原理其实非常简单,我们来看张图就明白了。

Python爬虫的经典多线程方式,生产者与消费者模型1

Python爬虫的经典多线程方式,生产者与消费者模型2

Java并发-- 生产者-消费者模式| 点滴积累

线程根据和数据的关系分为 生产者线程和消费者线程 ,其中生产者线程负责生产数据,产生了数据之后会存储到任务队列当中。消费者线程从这个队列获取需要消费的数据,它和生产者线程之间不会直接交互,避免了线程之间互相依赖的问题。

另外一个细节是这里的任务队列并不是普通的队列,一般情况下是一个 阻塞队列 。也就是说当消费者线程尝试从其中获取数据的时候,如果队列是空的,那么这些消费者线程会自动挂起等待,直到它获得了数据为止。有阻塞队列当然也有非阻塞队列,如果是非阻塞队列的话,当我们尝试从其中获取数据的时候,如果它当中没有数据的话,并不会挂起等待,而是会返回一个空值。

当然阻塞队列的挂起等待时间也是可以设置的,我们可以让它一直等待下去,也可以设置一个最长等待时间 。如果超过这个时间也会返回空,不同的队列应用在不同的场景当中,我们需要根据场景性质做出调整。

代码实现

看完了设计模式的原理,我们下面来试着用代码来实现一下。

在一般的高级语言当中都有现成的队列的库,由于在生产消费者模式当中用到的是阻塞型queue,有阻塞性的队列当然也就有非阻塞型的队列。我们在用之前需要先了解清楚,如果用错了队列会导致整个程序出现问题。在Python当中,我们最常用的queue就是一个 支持多线程场景的阻塞队列 ,所以我们直接拿来用就好了。

由于这个设计模式非常简单,这个代码并不长只有几行:

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while True:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

我们运行一下就会发现它是可行的,并且由于队列 先进先出 的限制,可以保证了consumer线程读取到的内容的 顺序和producer生产的顺序是一致的 。

如果我们运行一下这个代码会发现它是不会结束的,因为consumer和producer当中都用到了while True构建的死循环,假设我们希望可以控制程序的结束,应该怎么办?

其实也很简单,我们也可以利用队列。我们创建一个特殊的信号量,约定好当consumer接受到这个特殊值的时候就停止程序。这样当我们要结束程序的时候,我们只需要把这个信号量加入队列即可。

singal = object()

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
    que.put(singal)
        
def consumer(que):
    while True:
        data = que.get()
        if data is singal:
            # 继续插入singal
            que.put(singal)
            break
        print(data)

这里有一个细节是我们在consumer当中,当读取到singal的时候,在跳出循环之前我们又把singal放回了队列。原因也很简单,因为有时候consumer线程不止一个,这个singal上游 只放置了一个,只会被一个线程读取进来 ,其他线程并不会知道已经获得了singal的消息,所以还是会继续执行。

而当consumer关闭之前放入singal就可以保证每一个consumer在关闭的之前都会再传递一个结束的信号给其他未关闭的consumer读取。这样一个一个的传递,就可以保证所有consumer都关闭。

这里还有一个小细节,虽然利用队列可以解决生产者和消费者通信的问题,但是上游的生产者并不知道下游的消费者是否已经执行完成了。假如我们想要知道,应该怎么办?

Python的设计者们也考虑到了这个问题,所以他们在Queue这个类当中加入了 task_done和join方法 。利用task_done,消费者可以通知queue这一个任务已经执行完成了。而通过调用join,可以等待所有的consumer完成。

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        que.task_done()
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

que.join()

除了使用task_done之外,我们还可以在que传递的消息当中加入一个Event,这样我们还可以继续感知到每一个Event执行的情况。

优先队列与其他设置

我们之前在介绍一些分布式调度系统的时候曾经说到过,在调度系统当中,调度者会用一个优先队列来管理所有的任务。当有机器空闲的时候,会有限调度那些优先级高的任务。

其实这个调度系统也是基于我们刚才介绍的生产消费者模型开发的,只不过 将调度队列从普通队列换成了优先队列 而已。所以如果我们也希望我们的consumer能够根据任务的优先级来改变执行顺序的话,也可以使用优先队列来进行管理任务。

关于优先队列的实现我们已经很熟悉了,但是有一个问题是我们需要实现挂起等待的阻塞功能。这个我们自己实现是比较麻烦的,但好在我们可以通过调用相关的库来实现。比如threading中的Condition, Condition是一个条件变量可以通知其他线程,也可以实现挂起等待 。

from threading import Thread, Condition

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._cv = Condition()
        
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            # 通知下游,唤醒wait状态的线程
            self._cv.notify()

    def get(self):
        with self._cv:
            # 如果对列为空则挂起
            while len(self._queue) == 0:
                self._cv.wait()
            # 否则返回优先级最大的
            return heapq.heappop(self._queue)[-1]

最后介绍一下Queue的其他设置,比如我们可以 通过size参数设置队列的大小 ,由于这是一个阻塞式队列,所以如果我们设置了队列的大小,那么当队列被装满的时候,往其中插入数据的操作也会被阻塞。此时producer线程会被挂起,一直到队列不再满为止。

当然我们也可以通过block参数 将队列的操作设置成非阻塞 。比如que.get(block=False),那么当队列为空的时候,将会抛出一个队列为空的异常。同样,que.put(data, block=False)时也一样会得到一个队列已满的异常。

总结

今天这篇文章当中我们主要介绍了多线程场景中经典的生产消费者模式,这个模式在许多场景当中都有使用。比如kafka等消息系统,以及yarn等调度系统等等,几乎只要是涉及到多线程上下游通信的,往往都会用到。也正因此它的使用场景太广了,所以它 经常在各种面试当中出现 ,也可以认为是工程师必须知道的几种基础设计模式之一。

另外,队列也是一个在设计模式以及使用场景当中经常出现的数据结构。从侧面也说明了,为什么算法和数据结构非常重要,许多大公司喜欢问一些算法题,也是因为 有实际的使用场景 ,并且的的确确能锻炼工程师的思维能力。经常有同学问我算法和数据结构的使用案例,这就是一个很好的例子。

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:python 类的继承(自己的一点理解)

下一篇:Unity渲染教程(九):复杂材质 https://www.jianshu.com/p/5e3af869870f

您可能感兴趣

  • 大数据内推就一定能进?

    最近到了招聘旺季,发现一些朋友很纠结一个问题:做后端开发和做大数据开发?这个问题还是比较普遍的。 其实,后端开发,更专注于一种技术栈的开发,对于成熟的开发框架而言,的确市面上的竞争压力会比较大,竞聘者除了技术功底够硬,更多的是要对业务充分的熟悉。而大数据开发,由于兴起时间较晚,再加上国家政策的扶持,人才需求远远没有饱和,相比较起来,竞争的确要小一些,薪资和前景更有吸引力。 但这并不意味着面试...

  • 数据存储(1):从数据存储看人类文明-数据存储器发展历程

    传统文本存储 泥版/钟鼎/甲骨/莎草纸/羊皮纸等文字存储 传统的考古学家和历史学家认为,楔形文字起源于美索不达米亚特殊的渔猎生活方式。这是较为通行的看法,西方的各种百科全书大都持这一观点。约在公元前3400年左右,楔形文字雏形产生,多为图像。到公元前 500 年左右, 这种文字甚至成了西亚大部分地区通用的商业交往媒介。楔形文字一直被使用到公元元年前后,使用情景如同现今的拉丁文。 有了文字后,...

  • 提高社会的微生物素养:势在必行!

    写在前面 我思,故我们在! 微生物早已和人类及地球水乳交融,微生物及其功能正在得到铺天盖地的研究。毫不夸张的说,微生物是地球最早的也是最后一个客人,没有微生物,万物皆不复。然而微生物学,作为一门专业课而非通识课程,在大学课堂上才非闪亮登场(高中生物更多的是动植物)。今年的肺炎病毒疫情正在给全世界人民上课,倘若微生物学知识普及开来,何以会如此被动? 上至国家领导,下至村野匹夫,都应了解和学习一...

  • 电子专业毕业后,到底能做什么?

    在我大一刚入学的第一天晚上,班助把我们几个宿舍的男生叫到一起,大概说了些军训的注意事项以及班助是什么。说完以后班助说你们有什么想问的问题,现在可以问了。当时有个人问了一个大家最想问的问题:我们这电子信息专业以后是干嘛的? 转眼之间四年匆匆过去,当我大四要走的时候,坐上载满我行李的出租车,我心里想到,我这马上就该踏上工作岗位了,还不知道我们这专业以后要干嘛,当年刚上大三的班助是怎么忽悠我们的?...

  • 基于 Kafka 与 Debezium 构建实时数据同步

    点击上方 "zhisheng"关注, 星标或置顶一起成长 Flink 从入门到精通 系列文章 起源 在进行架构转型与分库分表之前,我们一直采用非常典型的单体应用架构:主服务是一个 Java WebApp,使用 Nginx 并选择 Session Sticky 分发策略做负载均衡和会话保持;背后是一个 MySQL 主实例,接了若干 Slave 做读写分离。在整个转型开始之前,我们就知道这会是一...

  • 聊一聊前端性能优化

    什么是 CRP? CRP又称关键渲染路径,引用MDN对它的解释: ❝ 关键渲染路径是指浏览器通过把 HTML、CSS 和 JavaScript 转化成屏幕上的像素的步骤顺序。优化关键渲染路径可以提高渲染性能。关键渲染路径包含了 Document Object Model (DOM),CSS Object Model (CSSOM),渲染树和布局。 ❞ 优化关键渲染路径可以提升首屏渲染时间。理...

  • 太强了!P8架构师都还在学的微服务+MySQL+Kafka+boot2.x+虚拟机PDF

    关乎于程序员,除了做项目来提高自身的技术,还有一种提升自己的专业技能就是:多!看!书! 毕竟,书是学习的海洋呢! So,Java程序员你们准备好了吗?双手奉上Java程序员必读之热门书单。 1.《深入浅出Spring Boot2.x》 随着近年来微服务的流行,越来越多的企业需要快速的开发,而Spring Boot除了以注解为主的开发,还有其他的绑定,例如,对服务器进行了绑定和绑定和默认对Sp...

  • 全面综述:图像特征提取与匹配技术

    作者:William 来源:自动驾驶全栈工程师知乎专栏,https://www.zhihu.com/people/william.hyin/columns 特征提取和匹配是许多计算机视觉应用中的一个重要任务,广泛运用在运动结构、图像检索、目标检测等领域。每个计算机视觉初学者最先了解的特征检测器几乎都是1988年发布的HARRIS。在之后的几十年时间内各种各样的特征检测器/描述符如雨后春笋般出...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。