python-多个进程从同一管道异步读取

如何将数据从一个管道馈送到三个不同的进程?
nulfp = open(os.devnull, "w")
piper = Popen([
"come command",
"some params"
], stdout = PIPE, stderr = nulfp.fileno())
pipe_consumer_1 = Popen([
"come command",
"some params"
], stdin = piper.stdout, stderr = nulfp.fileno())
pipe_consumer_2 = Popen([
"come command",
"some params"
], stdin = piper.stdout, stderr = nulfp.fileno())
pipe_consumer_3 = Popen([
"come command",
"some params"
], stdin = piper.stdout, stderr = nulfp.fileno())
pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper.communicate()
如果我运行上面的代码,它将产生一个损坏的文件.这意味着管道使用者可能没有从管道器读取全部输出.
这一个工作正常,但速度慢得多:
nulfp = open(os.devnull, "w")
piper_1 = Popen([
"come command",
"some params"
], stdout = PIPE, stderr = nulfp.fileno())
piper_2 = Popen([
"come command",
"some params"
], stdout = PIPE, stderr = nulfp.fileno())
piper_3 = Popen([
"come command",
"some params"
], stdout = PIPE, stderr = nulfp.fileno())
pipe_consumer_1 = Popen([
"come command",
"some params"
], stdin = piper_1.stdout, stderr = nulfp.fileno())
pipe_consumer_2 = Popen([
"come command",
"some params"
], stdin = piper_2.stdout, stderr = nulfp.fileno())
pipe_consumer_3 = Popen([
"come command",
"some params"
], stdin = piper_3.stdout, stderr = nulfp.fileno())
pipe_consumer_1.communicate()
pipe_consumer_2.communicate()
pipe_consumer_3.communicate()
piper_1.communicate()
piper_2.communicate()
piper_3.communicate()
有什么建议可以使第一个代码片段与第二个代码片段相同?如果我采用第一种工作方法,则该过程将在1/3的时间内完成.
解决方法:
这仅使用一个字节的“块”,但是您可以理解.
from subprocess import Popen, PIPE
cat_proc = '/usr/bin/cat'
consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)
with open('inputfile', 'r') as infile:
for byte in infile:
for consumer in consumers:
consumer.stdin.write(byte)
测试时,使用者输出文件与输入文件匹配.
编辑:
这是从具有1K块的过程中读取的.
from subprocess import Popen, PIPE
cat_proc = '/usr/bin/cat'
consumers = (Popen([cat_proc], stdin = PIPE, stdout = open('consumer1', 'w')),
Popen([cat_proc], stdin = PIPE, stdout = open('consumer2', 'w')),
Popen([cat_proc], stdin = PIPE, stdout = open('consumer3', 'w'))
)
producer = Popen([cat_proc, 'inputfile'], stdout = PIPE)
while True:
byte = producer.stdout.read(1024)
if not byte: break
for consumer in consumers:
consumer.stdin.write(byte)