Python提供了Queue模块来专门实现消息队列Queue对象
Queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。
queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。
主要有以下成员函数:
Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
Queue.not_empty():判断消息队列是否为非空。同上不可靠。
Queue.full():类似上边,判断消息队列是否满。
Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
Queue.put_nowait(item):相当于put(item, False).
Queue.get(block=True, timeout=None):获取一个消息,其他同put。
Queue.task_done():接收消息的线程通过调用这个函数来说明消息对应的任务已完成。
Queue.join():表示等待,等到队列为空,在执行别的操作。
Queue.terminate():表示强制关闭。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | #!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Queue from threading import Thread import time """ 写一个消费者和生产者, 用多线程方式实现, 通过类的重写的方法实现。 """ class Proceducer(Thread): def __init__( self , queue): super (Proceducer, self ).__init__() self .queue = queue def run( self ): try : for i in xrange ( 1 , 10 ): print ( "put data is {0} to queue" . format (i)) self .queue.put(i) except Exception as e: print ( "put data error" ) raise e class Consumer_even(Thread): def __init__( self , queue): super (Consumer_even , self ).__init__() self .queue = queue def run( self ): try : while not self .queue.empty(): number = self .queue.get(block = True , timeout = 3 ) if number % 2 ! = 0 : print ( "get {0} from queue EVEN, thread name is {1}" . format (number, self .getName())) else : self .queue.put(number) time.sleep( 1 ) except Exception as e: raise e class Consumer_odd(Thread): def __init__( self , queue): super (Consumer_odd , self ).__init__() self .queue = queue def run( self ): try : while not self .queue.empty(): number = self .queue.get(block = True , timeout = 3 ) if number % 2 = = 0 : print ( "get {0} from queue ODD" . format (number)) else : self .queue.put(number) time.sleep( 1 ) except Exception as e: raise e def main(): queue = Queue() p = Proceducer(queue = queue) p.start() p.join() time.sleep( 1 ) c1 = Consumer_even(queue = queue) c2 = Consumer_odd(queue = queue) c1.start() c2.start() c1.join() c2.join() print ( "All thread terminate!" ) if __name__ = = '__main__' : main() |
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | put data is 1 to queue put data is 2 to queue put data is 3 to queue put data is 4 to queue put data is 5 to queue put data is 6 to queue put data is 7 to queue put data is 8 to queue put data is 9 to queue get 1 from queue EVEN, thread name is Thread-2 get 2 from queue ODD get 3 from queue EVEN, thread name is Thread-2 get 4 from queue ODD get 5 from queue EVEN, thread name is Thread-2 get 6 from queue ODD get 7 from queue EVEN, thread name is Thread-2 get 8 from queue ODD get 9 from queue EVEN, thread name is Thread-2 All thread terminate! |