You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2.6 KiB
2.6 KiB
生产者消费者模型
概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者直接将商品交给消费者,如果消费者承受能力不佳,就会发生崩溃,这个时候需要另一个角色--仓库。
生产者 --- 仓库 --- 消费者
优点
解耦–生产者-消费者之间不直接通信,降低了耦合度。
并发
测试代码
import time
import random
from multiprocessing import Queue,Process
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[40m 食客%s 开始吃%s'%(name,res))
def producer(name,q):
for i in range(5):
time.sleep(random.randint(1,2))
res='炒粉%s'%i
q.put(res)
print('\033[40m 厨师%s 生产了%s'%(name,res))
if __name__ == '__main__':
q=Queue()#一个队列
p1=Process(target=producer,args=('老刘',q))
c1=Process(target=consumer,args=('小明',q))
# c2=Process(target=consumer,args=('小红',q))
p1.start()
c1.start()
# c2.start()
p1.join()
q.put(None)
发布者订阅者模型
测试代码
发布者
#!/usr/bin/env python
#coding:utf-8
import redis
pool=redis.ConnectionPool(host='127.0.0.1',
port=6379,db=0,
password='123456')
r=redis.StrictRedis(connection_pool=pool)
while True:
msg=input("publish: >>")
if msg=="over":
print("停止发布")
break
r.publish('spub',msg)
订阅者
#!/usr/bin/env python
import redis
pool=redis.ConnectionPool(host='127.0.0.1',
port=6379,db=0,
password='123456')
r=redis.StrictRedis(connection_pool=pool)
p=r.pubsub()
p.subscribe("spub","cctv1")
for item in p.listen():
print("Listen on channel : %s "%item['channel'].decode())
if item['type']=='message':
data=item['data'].decode()
print("From %s get message : %s"%(item['channel'].decode(),item['data'].decode()))
if item['data']=='over':
print(item['channel'].decode(),'停止发布')
break
p.unsubscribe('spub')
print("取消订阅")