development

Python 다중 처리 풀 맵 호출의 진행 상황을 표시 하시겠습니까?

big-blog 2020. 10. 11. 10:54
반응형

Python 다중 처리 풀 맵 호출의 진행 상황을 표시 하시겠습니까?


imap_unordered()호출 을 통해 다중 처리 풀 작업 집합을 성공적으로 수행하는 스크립트가 있습니다 .

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

그러나 내 num_tasks약은 약 250,000이므로 join()10 초 정도 메인 스레드를 잠그고 주 프로세스가 잠겨 있지 않음을 보여주기 위해 점차적으로 명령 줄에 에코 아웃 할 수 있기를 바랍니다. 다음과 같은 것 :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

남은 작업 수를 나타내는 결과 개체 또는 풀 자체에 대한 메서드가 있습니까? multiprocessing.Value개체를 카운터로 사용하려고했지만 ( 작업을 수행 한 후 작업을 do_work호출 함 counter.value += 1) 카운터는 증가를 중지하기 전에 총 값의 ~ 85 % 만 얻습니다.


결과 세트의 개인 속성에 액세스 할 필요가 없습니다.

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

내가 개인적으로 좋아하는 것은 작업이 병렬로 실행되고 커밋되는 동안 멋진 작은 진행률 표시 줄과 완료 ETA를 제공합니다.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

좀 더 자세히 살펴보면서 직접 답을 찾았 __dict__습니다. imap_unordered결과 객체를 살펴보면 _index작업이 완료 될 때마다 증가 하는 속성이 있다는 것을 알았습니다 . 따라서 이것은 로깅을 위해 작동하며 while루프에 래핑됩니다 .

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

그러나 결과 개체는 약간 다르지만를 imap_unorderedfor a map_async로 바꾸면 실행 속도가 훨씬 빨라지는 것을 알았습니다. 대신의 결과 개체 map_async에는 _number_left속성과 ready()메서드가 있습니다.

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

진행 상황을 확인하려고 할 때 이미 작업이 완료되었음을 알았습니다. 이것은 tqdm 사용하여 나를 위해 일한 것입니다 .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

이것은 차단 여부에 관계없이 모든 종류의 다중 처리에서 작동합니다.


나는 이것이 다소 오래된 질문이라는 것을 알고 있지만 파이썬에서 작업 풀의 진행 상황을 추적하고 싶을 때 수행하는 작업은 다음과 같습니다.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

기본적으로 callbak와 함께 apply_async를 사용하므로 (이 경우 반환 된 값을 목록에 추가) 다른 작업을 수행하기 위해 기다릴 필요가 없습니다. 그런 다음 while 루프 내에서 작업의 진행 상황을 확인합니다. 이번에는 더 멋지게 보이도록 위젯을 추가했습니다.

출력 :

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

도움이되기를 바랍니다.


진행률 인쇄물을 만들기 위해 사용자 지정 클래스를 만들었습니다. Maby는 다음을 도와줍니다.

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

Tim이 제안한대로 tqdmimap사용 하여이 문제를 해결할 수 있습니다 . 방금이 문제를 발견하고 imap_unordered솔루션을 조정 하여 매핑 결과에 액세스 할 수 있도록했습니다. 작동 방식은 다음과 같습니다.

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.


Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI's progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

참고URL : https://stackoverflow.com/questions/5666576/show-the-progress-of-a-python-multiprocessing-pool-map-call

반응형