development

Python에서 다중 처리 대기열을 사용하는 방법은 무엇입니까?

big-blog 2020. 11. 18. 09:20
반응형

Python에서 다중 처리 대기열을 사용하는 방법은 무엇입니까?


다중 처리 대기열이 파이썬에서 작동하는 방식과 구현 방법을 이해하는 데 많은 문제가 있습니다. 공유 파일의 데이터에 액세스하는 두 개의 파이썬 모듈이 있다고 가정 해 보겠습니다.이 두 모듈을 작성자와 판독기라고합시다. 내 계획은 독자와 작성자 모두 요청을 두 개의 개별 다중 처리 대기열에 넣은 다음 세 번째 프로세스가 이러한 요청을 루프에서 팝하고 실행하도록하는 것입니다.

내 주요 문제는 multiprocessing.queue를 올바르게 구현하는 방법을 정말로 모른다는 것입니다. 각 프로세스에 대해 개체를 실제로 인스턴스화 할 수 없다는 것입니다. 개별 대기열이 될 것이기 때문입니다. 모든 프로세스가 공유 대기열과 관련이 있는지 확인하는 방법 (또는 이 경우 대기열)


내 주요 문제는 multiprocessing.queue를 올바르게 구현하는 방법을 정말로 모른다는 것입니다. 각 프로세스에 대해 개체를 실제로 인스턴스화 할 수 없다는 것입니다. 개별 대기열이 될 것이기 때문입니다. 모든 프로세스가 공유 대기열과 관련이 있는지 확인하는 방법 (또는 이 경우 대기열)

이것은 하나의 큐를 공유하는 리더와 라이터의 간단한 예입니다 ... 라이터는 리더에게 많은 정수를 보냅니다. 작성자가 숫자가 부족하면 'DONE'을 보내 독자가 읽기 루프를 벗어나는 것을 알 수 있습니다.

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

" from queue import Queue"에는라는 모듈이 없습니다 . queue대신 multiprocessing사용해야합니다. 따라서 " from multiprocessing import Queue" 처럼 보일 것입니다 .


다음의 죽은 간단한 사용의 multiprocessing.Queuemultiprocessing.Process그 발신자가 별도의 프로세스로 "이벤트"플러스 인수를 보낼 수 있도록 그 파견하는 과정에 "do_"방법에 이벤트. (Python 3.4 이상)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

용법:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

send(가), 부모 프로세스에서 발생하는 do_*자식 프로세스에서 발생합니다.

I left out any exception handling that would obviously interrupt the run loop and exit the child process. You can also customize it by overriding run to control blocking or whatever else.

This is really only useful in situations where you have a single worker process, but I think it's a relevant answer to this question to demonstrate a common scenario with a little more object-orientation.

참고URL : https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python

반응형