스칼라 대 파이썬의 스파크 성능
나는 스칼라보다 파이썬을 선호합니다. 그러나 Spark는 기본적으로 스칼라로 작성되었으므로 코드가 스칼라에서 Python 버전보다 더 빨리 실행될 것으로 예상되었습니다.
그 가정으로, 나는 1GB의 데이터에 대해 매우 일반적인 전처리 코드의 스칼라 버전을 배우고 작성하려고 생각했습니다. 데이터는 Kaggle의 SpringLeaf 경쟁에서 선택 됩니다. 데이터에 대한 개요를 제공하기 위해 (1936 차원 및 145232 행 포함). 데이터는 int, float, string, boolean과 같은 다양한 유형으로 구성됩니다. Spark 처리에 8 개 중 6 개 코어를 사용하고 있습니다. 그렇기 minPartitions=6
때문에 모든 코어에 처리해야 할 것이 있습니다.
스칼라 코드
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
파이썬 코드
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
스칼라 공연 0 단계 (38 분), 1 단계 (18 초)
Python Performance Stage 0 (11 분), Stage 1 (7 초)
둘 다 서로 다른 DAG 시각화 그래프를 생성합니다 (두 그림 모두 Scala ( map
) 및 Python ( reduceByKey
)에 대해 서로 다른 스테이지 0 함수를 표시하기 때문에 )
그러나 본질적으로 두 코드 모두 데이터를 (dimension_id, 문자열 값 목록) RDD로 변환하여 디스크에 저장하려고합니다. 출력은 각 차원에 대한 다양한 통계를 계산하는 데 사용됩니다.
성능면에서 이와 같은 실제 데이터에 대한 스칼라 코드 는 Python 버전보다 4 배 느리게 실행되는 것 같습니다 . 저에게 희소식은 파이썬을 유지하기 위해 좋은 동기를 부여했다는 것입니다. 나쁜 소식은 왜 그런지 이해하지 못했습니까?
코드에 대한 원래 답변은 아래에서 확인할 수 있습니다.
우선, 각각 고유 한 성능 고려 사항이있는 서로 다른 유형의 API를 구별해야합니다.
RDD API
(JVM 기반 오케스트레이션으로 순수한 Python 구조)
이것은 파이썬 코드의 성능과 PySpark 구현의 세부 사항에 가장 큰 영향을받는 구성 요소입니다. 파이썬 성능은 문제가되지 않지만, 고려해야 할 최소한의 요소는 다음과 같습니다.
- JVM 통신 오버 헤드 실제로 파이썬 실행기와주고받는 모든 데이터는 소켓과 JVM 작업자를 통해 전달되어야합니다. 이것은 비교적 효율적인 로컬 통신이지만 여전히 무료는 아닙니다.
프로세스 기반 실행기 (Python) 대 스레드 기반 (단일 JVM 다중 스레드) 실행기 (Scala). 각 Python 실행 프로그램은 자체 프로세스에서 실행됩니다. 부작용으로, JVM보다 강력한 격리 기능을 제공하고 실행기 수명주기에 대한 일부 제어 기능을 제공하지만 잠재적으로 메모리 사용량이 크게 증가합니다.
- 인터프리터 메모리 풋 프린트
- 로드 된 라이브러리의 설치 공간
- 덜 효율적인 방송 (각 프로세스에는 자체 방송 사본이 필요함)
파이썬 코드 자체의 성능. 일반적으로 스칼라는 파이썬보다 빠르지 만 작업마다 다릅니다. 또한 Numba 와 같은 JIT , C 확장 ( Cython ) 또는 Theano 와 같은 특수 라이브러리를 포함한 여러 옵션이 있습니다 . 마지막으로
ML / MLlib (또는 단순히 NumPy 스택)를 사용하지 않으면대체 통역사로 PyPy 를 사용하십시오 . SPARK-3094를 참조하십시오 .- PySpark 구성은
spark.python.worker.reuse
각 작업에 대해 Python 프로세스를 포크하는 것과 기존 프로세스를 재사용하는 것 중에서 선택할 수있는 옵션을 제공합니다 . 후자의 옵션은 값 비싼 가비지 수집 (체계적인 테스트의 결과보다 더 인상적 임)을 피하는 데 유용하지만, 전자 (기본값)는 값 비싼 방송 및 수입의 경우에 최적입니다. - CPython에서 첫 번째 가비지 수집 방법으로 사용되는 참조 횟수는 일반적인 Spark 워크로드 (스트림과 같은 처리, 참조주기 없음)에서 잘 작동하며 긴 GC 일시 중지 위험을 줄입니다.
MLlib
(파이썬과 JVM 실행 혼합)
기본 고려 사항은 몇 가지 추가 문제와 함께 이전과 거의 동일합니다. MLlib와 함께 사용되는 기본 구조는 일반 Python RDD 객체이지만 모든 알고리즘은 Scala를 사용하여 직접 실행됩니다.
It means an additional cost of converting Python objects to Scala objects and the other way around, increased memory usage and some additional limitations we'll cover later.
As of now (Spark 2.x), the RDD-based API is in a maintenance mode and is scheduled to be removed in Spark 3.0.
DataFrame API and Spark ML
(JVM execution with Python code limited to the driver)
These are probably the best choice for standard data processing tasks. Since Python code is mostly limited to high-level logical operations on the driver, there should be no performance difference between Python and Scala.
A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution. This may improved in the future with introduction of the vectorized UDFs (SPARK-21190).
또한 사이에 불필요한 데이터 전달을 방지해야 DataFrames
하고 RDDs
. 파이썬 인터프리터와의 데이터 전송은 말할 것도없고 값 비싼 직렬화 및 역 직렬화가 필요합니다.
Py4J 통화는 대기 시간이 상당히 길다는 점에 주목할 가치가 있습니다. 여기에는 다음과 같은 간단한 호출이 포함됩니다.
from pyspark.sql.functions import col
col("foo")
일반적으로 중요하지는 않지만 (오버 헤드는 일정하고 데이터 양에 의존하지 않습니다) 소프트 실시간 응용 프로그램의 경우 Java 래퍼 캐싱 / 재사용을 고려할 수 있습니다.
GraphX 및 Spark 데이터 세트
현재 (Spark 1.6 2.1) 어느 쪽도 PySpark API를 제공하지 않으므로 PySpark가 Scala보다 무한히 나쁘다고 말할 수 있습니다.
실제로, GraphX 개발은 거의 완전히 정지 및 프로젝트와 유지 관리 모드에서 현재 로 폐쇄 관련 JIRA 티켓을 수정하지 않습니다 . GraphFrames 라이브러리는 Python 바인딩을 사용하는 대체 그래프 처리 라이브러리를 제공합니다.
데이터 세트주관적으로 말하면 Datasets
파이썬 에는 정적으로 유형을 지정할 수있는 장소가 많지 않으며 현재 Scala 구현이 너무 단순하더라도과 동일한 성능 이점을 제공하지 않습니다 DataFrame
.
스트리밍
지금까지 내가 본 것 중에서 Python보다 Scala를 사용하는 것이 좋습니다. PySpark가 구조화 된 스트림을 지원할 경우 향후 변경 될 수 있지만 현재 Scala API는 훨씬 강력하고 포괄적이며 효율적으로 보입니다. 내 경험은 상당히 제한적입니다.
Spark 2.x의 구조적 스트리밍은 언어 간 격차를 줄이는 것처럼 보이지만 현재는 아직 초기 단계입니다. 그럼에도 불구하고 RDD 기반 API는 이미 Databricks 문서 (액세스 날짜 2017-03-03) 에서 "레거시 스트리밍"으로 참조 되므로 추가 통합 노력을 기대하는 것이 합리적입니다.
비 성능 고려 사항
기능 패리티모든 Spark 기능이 PySpark API를 통해 노출되는 것은 아닙니다. 필요한 부품이 이미 구현되어 있는지 확인하고 가능한 제한 사항을 이해하십시오.
MLlib 및 유사한 혼합 컨텍스트를 사용할 때 특히 중요합니다 ( 작업에서 Java / Scala 함수 호출 참조 ). 와 같이 PySpark API의 일부를 공정하게하기 위해 mllib.linalg
Scala보다 포괄적 인 메소드 세트를 제공합니다.
PySpark API는 스칼라 대응 API를 밀접하게 반영하므로 정확히 Pythonic이 아닙니다. 언어간에 매핑하기가 쉽지만 동시에 파이썬 코드는 이해하기가 훨씬 어려울 수 있습니다.
복잡한 아키텍처PySpark 데이터 흐름은 순수한 JVM 실행에 비해 비교적 복잡합니다. PySpark 프로그램에 대해 추론하거나 디버그하기가 훨씬 어렵습니다. 또한 스칼라와 JVM에 대한 기본적인 이해는 일반적으로 거의 필요합니다.
Spark 2.x 이상Dataset
동결 된 RDD API를 통해 API 로의 지속적인 변화 는 파이썬 사용자에게 기회와 도전을 제공합니다. API의 상위 레벨 부분은 Python에서 노출하기가 훨씬 쉽지만 고급 기능은 직접 사용하기가 거의 불가능합니다 .
Moreover native Python functions continue to be second class citizen in the SQL world. Hopefully this will improve in the future with Apache Arrow serialization (current efforts target data collection
but UDF serde is a long term goal).
For projects strongly depending on the Python codebase, pure Python alternatives (like Dask or Ray) could be an interesting alternative.
It doesn't have to be one vs. the other
The Spark DataFrame (SQL, Dataset) API provides an elegant way to integrate Scala/Java code in PySpark application. You can use DataFrames
to expose data to a native JVM code and read back the results. I've explained some options somewhere else and you can find a working example of Python-Scala roundtrip in How to use a Scala class inside Pyspark.
It can be further augmented by introducing User Defined Types (see How to define schema for custom type in Spark SQL?).
What is wrong with code provided in the question
(Disclaimer: Pythonista point of view. Most likely I've missed some Scala tricks)
First of all, there is one part in your code which doesn't make sense at all. If you already have (key, value)
pairs created using zipWithIndex
or enumerate
what is the point in creating string just to split it right afterwards? flatMap
doesn't work recursively so you can simply yield tuples and skip following map
whatsoever.
Another part I find problematic is reduceByKey
. Generally speaking, reduceByKey
is useful if applying aggregate function can reduce the amount of data that has to be shuffled. Since you simply concatenate strings there is nothing to gain here. Ignoring low-level stuff, like the number of references, the amount of data you have to transfer is exactly the same as for groupByKey
.
Normally I wouldn't dwell on that, but as far as I can tell it is a bottleneck in your Scala code. Joining strings on JVM is a rather expensive operation (see for example: Is string concatenation in scala as costly as it is in Java?). It means that something like this _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
which is equivalent to input4.reduceByKey(valsConcat)
in your code is not a good idea.
If you want to avoid groupByKey
you can try to use aggregateByKey
with StringBuilder
. Something similar to this should do the trick:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
but I doubt it is worth all the fuss.
Keeping the above in mind, I've rewritten your code as follows:
Scala:
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Python:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
Results
In local[6]
mode (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz) with 4GB memory per executor it takes (n = 3):
- Scala - mean: 250.00s, stdev: 12.49
- Python - mean: 246.66s, stdev: 1.15
I am pretty sure that most of that time is spent on shuffling, serializing, deserializing and other secondary tasks. Just for fun, here's naive single-threaded code in Python that performs the same task on this machine in less than a minute:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
Extension to above answers -
Scala proves faster in many ways compare to python but there are some valid reasons why python is becoming more popular that scala, let see few of them —
Python for Apache Spark is pretty easy to learn and use. However, this not the only reason why Pyspark is a better choice than Scala. There’s more.
Python API for Spark may be slower on the cluster, but at the end, data scientists can do a lot more with it as compared to Scala. The complexity of Scala is absent. The interface is simple and comprehensive.
Talking about the readability of code, maintenance and familiarity with Python API for Apache Spark is far better than Scala.
Python comes with several libraries related to machine learning and natural language processing. This aids in data analysis and also has statistics that are much mature and time-tested. For instance, numpy, pandas, scikit-learn, seaborn and matplotlib.
Note: Most data scientists use a hybrid approach where they use the best of both the APIs.
Lastly, Scala community often turns out to be lot less helpful to programmers. This makes Python a much valuable learning. If you have enough experience with any statically typed programming language like Java, you can stop worrying about not using Scala altogether.
참고URL : https://stackoverflow.com/questions/32464122/spark-performance-for-scala-vs-python
'development' 카테고리의 다른 글
PhantomJS를 사용하여 양식을 제출하는 방법 (0) | 2020.06.02 |
---|---|
VS2012의 테마를 어디에서 찾을 수 있습니까 (0) | 2020.06.02 |
단일 선택 단추 (INPUT type =“radio”)에 대한 OnChange 이벤트 핸들러가 하나의 값으로 작동하지 않습니다 (0) | 2020.06.02 |
MapReduce에 대한 간단한 설명? (0) | 2020.06.02 |
가져 오기 오류 : numpy라는 모듈이 없습니다. (0) | 2020.06.02 |