여러 인수에 대한 Python 다중 처리 pool.map
Python 다중 처리 라이브러리에 여러 인수를 지원하는 pool.map의 변형이 있습니까?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
이에 대한 대답은 버전 및 상황에 따라 다릅니다. 최신 버전의 Python (3.3 이후)에 대한 가장 일반적인 대답은 JF Sebastian에 의해 처음 설명되었습니다 . 1Pool.starmap
일련의 인수 튜플을 허용하는 메소드를 사용합니다 . 그런 다음 각 튜플에서 인수를 자동으로 압축 해제하여 주어진 함수에 전달합니다.
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
이전 버전의 Python에서는 인수를 명시 적으로 풀기 위해 도우미 함수를 작성해야합니다. 을 사용하려면 컨텍스트 관리자 with
로 전환 할 래퍼도 작성해야합니다 Pool
. ( 이 점을 지적한 뮤온 에게 감사합니다 .)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
더 간단한 경우에는 고정 된 두 번째 인수 partial
로을 사용할 수 있지만 Python 2.7 이상에서만 사용할 수 있습니다 .
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1.이 중 많은 부분이 그의 대답에서 영감을 받았으며, 아마도 그 대답은 아마도 대신 받아 들여졌을 것입니다. 그러나 이것이 최상위에 붙어 있기 때문에 미래 독자를 위해 개선하는 것이 가장 좋았습니다.
여러 인수를 지원하는 pool.map의 변형이 있습니까?
Python 3.3에는 pool.starmap()
메소드가 포함되어 있습니다 .
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
이전 버전의 경우 :
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
산출
1 1
2 1
3 1
여기서 어떻게 itertools.izip()
그리고 어떻게 itertools.repeat()
사용 되는지 주목 하십시오.
@unutbu가 언급 한 버그 로 인해 functools.partial()
Python 2.6에서 유사한 기능을 사용할 수 없으므로 간단한 래퍼 함수 func_star()
를 명시 적으로 정의해야합니다. 에서 제안한 해결 방법 도 참조하십시오 .uptimebox
나는 아래가 더 좋을 것이라고 생각한다.
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
산출
[3, 5, 7]
파이썬 3.3 이상 과 함께 사용 하기pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
결과:
1 --- 4
2 --- 5
3 --- 6
원하는 경우 더 많은 인수를 zip () 할 수도 있습니다. zip(a,b,c,d,e)
경우에 당신은 당신이 사용할 필요가 인수로 전달 상수 값 갖고 싶어 import itertools
다음과 zip(itertools.repeat(constant), a)
예를.
JF Sebastian의 itertools에 대해 배웠기 때문에 한 단계 더 나아가 파이썬-2.7 및 python-3.2 (및 이후 버전)의 parmap
병렬화, 오퍼링 map
및 starmap
함수를 처리 하여 여러 위치 인수를 취할 수 있는 패키지를 작성하기로 결정했습니다. .
설치
pip install parmap
병렬화하는 방법 :
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
PyPI 및 github 저장소에 파맵을 업로드했습니다 .
예를 들어 다음과 같이 질문에 대답 할 수 있습니다.
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
# "다수의 주장을 취하는 방법".
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
필요없는 pathos ( 주 : github의 버전 사용)multiprocessing
라는 포크가 있습니다 .지도 함수는 파이썬의 맵에 대한 API를 미러링하므로 map은 여러 인수를 취할 수 있습니다. 을 사용하면 일반적으로 블록 에 갇히지 않고 인터프리터에서 멀티 프로세싱을 수행 할 수도 있습니다 . Pathos는 약간의 업데이트 후 릴리스가 예정되어 있습니다. 주로 python 3.x 로의 변환입니다.starmap
pathos
__main__
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
각각의 새로운 함수에 대한 랩퍼를 작성하지 않도록 다음 두 함수를 사용할 수 있습니다.
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
함수를 사용하여 function
인수의 목록과를 arg_0
, arg_1
그리고 arg_2
다음과 같이 :
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
또 다른 간단한 대안은 함수 매개 변수를 튜플에 싸서 전달해야하는 매개 변수를 튜플에 싸는 것입니다. 큰 데이터 조각을 처리 할 때는 적합하지 않습니다. 나는 그것이 각 튜플마다 사본을 만들 것이라고 믿습니다.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
임의의 순서로 출력을 제공합니다.
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
python2를위한 더 나은 솔루션 :
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
밖[]:
[3, 5, 7]
더 좋은 방법은 랩퍼 기능 을 손 으로 쓰는 대신 데코레이터 를 사용하는 것입니다 . 특히 매핑 할 함수가 많은 경우 데코레이터는 모든 함수에 대해 래퍼를 작성하지 않으므로 시간을 절약 할 수 있습니다. 일반적으로 데코 레이팅 된 기능은 피클 할 수 없지만 우리는 그것을 피하기 위해 사용할 수 있습니다 . 더 많은 토론은 여기 에서 찾을 수 있습니다 .functools
여기 예제
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
그런 다음 압축 된 인수로 매핑 할 수 있습니다
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
물론 Pool.starmap
다른 답변에서 언급했듯이 항상 Python 3 (> = 3.3)에서 사용할 수 있습니다 .
다른 방법은 목록 목록을 하나의 인수 루틴으로 전달하는 것입니다.
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
선호하는 방법으로 인수 목록 목록을 구성 할 수 있습니다.
Python 3.4.4에서 multiprocessing.get_context ()를 사용하여 여러 시작 메소드를 사용하기위한 컨텍스트 오브젝트를 얻을 수 있습니다.
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
아니면 그냥 교체
pool.map(harvester(text,case),case, 1)
으로:
pool.apply_async(harvester(text,case),case, 1)
여기에는 많은 답변이 있지만 어떤 버전에서도 작동하는 Python 2/3 호환 코드를 제공하는 것으로 보이지 않습니다. 당신은 당신의 코드를 원한다면 바로 일을 ,이 중 하나를 파이썬 버전에 대한 작동합니다 :
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
그 후에는 일반적인 파이썬 3 방식으로 멀티 프로세싱을 사용할 수 있습니다. 예를 들면 다음과 같습니다.
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
Python 2 또는 Python 3에서 작동합니다.
공식 문서에는 반복 가능한 인수 하나만 지원한다고 명시되어 있습니다. 그런 경우 apply_async를 사용하고 싶습니다. 당신의 경우에 나는 할 것입니다 :
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
이것은 수영장 에서 사용되는 하나의 인수 함수에 여러 인수를 전달하는 데 사용하는 루틴의 예입니다 .
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()
python2의 경우이 트릭을 사용할 수 있습니다
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
참고 URL : https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments
도와주세요.
'development' 카테고리의 다른 글
collections.defaultdict는 어떻게 작동합니까? (0) | 2020.02.18 |
---|---|
우분투에서 ssh-add로 개인 키를 영구적으로 추가하는 방법은 무엇입니까? (0) | 2020.02.18 |
C #에서 예외를 다시 발생시키는 올바른 방법은 무엇입니까? (0) | 2020.02.18 |
Mac OS X에서 adb 설정 (0) | 2020.02.18 |
배열 상태는 iOS 12 Safari에서 캐시됩니다. (0) | 2020.02.18 |