Python 다중 처리 풀 맵 호출의 진행 상황을 표시 하시겠습니까?
imap_unordered()
호출 을 통해 다중 처리 풀 작업 집합을 성공적으로 수행하는 스크립트가 있습니다 .
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
그러나 내 num_tasks
약은 약 250,000이므로 join()
10 초 정도 메인 스레드를 잠그고 주 프로세스가 잠겨 있지 않음을 보여주기 위해 점차적으로 명령 줄에 에코 아웃 할 수 있기를 바랍니다. 다음과 같은 것 :
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
남은 작업 수를 나타내는 결과 개체 또는 풀 자체에 대한 메서드가 있습니까? multiprocessing.Value
개체를 카운터로 사용하려고했지만 ( 작업을 수행 한 후 작업을 do_work
호출 함 counter.value += 1
) 카운터는 증가를 중지하기 전에 총 값의 ~ 85 % 만 얻습니다.
결과 세트의 개인 속성에 액세스 할 필요가 없습니다.
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
내가 개인적으로 좋아하는 것은 작업이 병렬로 실행되고 커밋되는 동안 멋진 작은 진행률 표시 줄과 완료 ETA를 제공합니다.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
좀 더 자세히 살펴보면서 직접 답을 찾았 __dict__
습니다. imap_unordered
결과 객체를 살펴보면 _index
작업이 완료 될 때마다 증가 하는 속성이 있다는 것을 알았습니다 . 따라서 이것은 로깅을 위해 작동하며 while
루프에 래핑됩니다 .
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
그러나 결과 개체는 약간 다르지만를 imap_unordered
for a map_async
로 바꾸면 실행 속도가 훨씬 빨라지는 것을 알았습니다. 대신의 결과 개체 map_async
에는 _number_left
속성과 ready()
메서드가 있습니다.
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
진행 상황을 확인하려고 할 때 이미 작업이 완료되었음을 알았습니다. 이것은 tqdm 사용하여 나를 위해 일한 것입니다 .
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
이것은 차단 여부에 관계없이 모든 종류의 다중 처리에서 작동합니다.
나는 이것이 다소 오래된 질문이라는 것을 알고 있지만 파이썬에서 작업 풀의 진행 상황을 추적하고 싶을 때 수행하는 작업은 다음과 같습니다.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
기본적으로 callbak와 함께 apply_async를 사용하므로 (이 경우 반환 된 값을 목록에 추가) 다른 작업을 수행하기 위해 기다릴 필요가 없습니다. 그런 다음 while 루프 내에서 작업의 진행 상황을 확인합니다. 이번에는 더 멋지게 보이도록 위젯을 추가했습니다.
출력 :
4 of 4
['AA', 'BB', 'CC', 'DD']
도움이되기를 바랍니다.
진행률 인쇄물을 만들기 위해 사용자 지정 클래스를 만들었습니다. Maby는 다음을 도와줍니다.
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
Tim이 제안한대로 tqdm
및 imap
을 사용 하여이 문제를 해결할 수 있습니다 . 방금이 문제를 발견하고 imap_unordered
솔루션을 조정 하여 매핑 결과에 액세스 할 수 있도록했습니다. 작동 방식은 다음과 같습니다.
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.
Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI's progress 1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()
'development' 카테고리의 다른 글
Tmux의 어떤 창에 초점이 맞춰져 있는지 어떻게 알 수 있습니까? (0) | 2020.10.12 |
---|---|
백그라운드에서 CoreBluetooth 애플리케이션이 정확히 무엇을 할 수 있습니까? (0) | 2020.10.12 |
DBCP-다른 데이터베이스에 대한 validationQuery (0) | 2020.10.11 |
mysql에서 이름 문자열을 분할하는 방법은 무엇입니까? (0) | 2020.10.11 |
Kotlin 오류 : org.jetbrains.kotlin : kotlin-stdlib-jre7 : 1.0.7을 찾을 수 없습니다. (0) | 2020.10.11 |