Spark에서 데이터 프레임 열 업데이트
새로운 스파크 데이터 프레임 API를 살펴보면 데이터 프레임 열을 수정할 수 있는지 여부가 명확하지 않습니다.
데이터 프레임의 행 x
열 값을 변경하려면 어떻게해야 y
합니까?
에서에게 pandas
이 될 것이다df.ix[x,y] = new_value
편집 : 아래에 언급 된 내용을 통합하면 기존 데이터 프레임이 변경 불가능하므로 수정할 수 없지만 원하는 수정 사항으로 새 데이터 프레임을 반환 할 수 있습니다.
다음과 같은 조건에 따라 열의 값을 바꾸려는 경우 np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
열에 대해 일부 작업을 수행하고 데이터 프레임에 추가되는 새 열을 생성하려는 경우 :
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
새 열의 이름이 이전 열과 동일하게하려면 추가 단계를 추가 할 수 있습니다.
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
열을 수정할 수는 없지만 열에 대해 작업하고 해당 변경 사항을 반영하는 새 DataFrame을 반환 할 수 있습니다. 이를 위해 먼저 UserDefinedFunction
적용 할 작업을 구현 한 다음 해당 함수를 대상 열에 만 선택적으로 적용합니다. Python에서 :
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
지금과 같은 스키마를 갖는다 old_df
(즉, 가정 old_df.target_column
형이었다 StringType
도) 그러나 열에있는 모든 값이 target_column
될 것이다 new_value
.
일반적으로 열을 업데이트 할 때 이전 값을 새 값에 매핑하려고합니다. UDF없이 pyspark에서이를 수행하는 방법은 다음과 같습니다.
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
DataFrames
RDD를 기반으로합니다. RDD는 변경 불가능한 구조이며 현장에서 요소를 업데이트 할 수 없습니다. 값을 변경하려면 SQL과 같은 DSL 또는 .NET과 같은 RDD 작업을 사용하여 원본을 변환하여 새 DataFrame을 만들어야합니다 map
.
강력 추천 슬라이드 데크 : 대규모 데이터 과학을위한 Spark의 DataFrames 소개 .
maasg가 말한 것처럼 이전 DataFrame에 적용된 맵의 결과에서 새 DataFrame을 만들 수 있습니다. df
두 개의 행이 있는 지정된 DataFrame의 예 :
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
열 유형이 변경되면 대신 올바른 스키마를 제공해야합니다 df.schema
. org.apache.spark.sql.Row
사용 가능한 메소드 에 대한 API를 확인하십시오 . https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[업데이트] 또는 Scala에서 UDF 사용 :
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
열 이름을 동일하게 유지해야하는 경우 다시 이름을 바꿀 수 있습니다.
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
참고 URL : https://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark
'development' 카테고리의 다른 글
javax.net.ssl.SSLHandshakeException : 웹 서비스 통신 중 핸드 셰이크 중에 원격 호스트가 연결을 닫았습니다. (0) | 2020.11.27 |
---|---|
Swift에서 인수 레이블은 언제 필요합니까? (0) | 2020.11.27 |
IEnumerable을 DataTable로 변환 (0) | 2020.11.27 |
C # GUID 및 SQL 고유 식별자 (0) | 2020.11.27 |
개인 생성자가 필요한 이유는 무엇입니까? (0) | 2020.11.27 |