development

Spark에서 데이터 프레임 열 업데이트

big-blog 2020. 11. 27. 20:58
반응형

Spark에서 데이터 프레임 열 업데이트


새로운 스파크 데이터 프레임 API를 살펴보면 데이터 프레임 열을 수정할 수 있는지 여부가 명확하지 않습니다.

데이터 프레임의 x값을 변경하려면 어떻게해야 y합니까?

에서에게 pandas이 될 것이다df.ix[x,y] = new_value

편집 : 아래에 언급 된 내용을 통합하면 기존 데이터 프레임이 변경 불가능하므로 수정할 수 없지만 원하는 수정 사항으로 새 데이터 프레임을 반환 할 수 있습니다.

다음과 같은 조건에 따라 열의 값을 바꾸려는 경우 np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

열에 대해 일부 작업을 수행하고 데이터 프레임에 추가되는 새 열을 생성하려는 경우 :

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

새 열의 이름이 이전 열과 동일하게하려면 추가 단계를 추가 할 수 있습니다.

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')

열을 수정할 수는 없지만 열에 대해 작업하고 해당 변경 사항을 반영하는 새 DataFrame을 반환 할 수 있습니다. 이를 위해 먼저 UserDefinedFunction적용 할 작업을 구현 한 다음 해당 함수를 대상 열에 만 선택적으로 적용합니다. Python에서 :

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df지금과 같은 스키마를 갖는다 old_df(즉, 가정 old_df.target_column형이었다 StringType도) 그러나 열에있는 모든 값이 target_column될 것이다 new_value.


일반적으로 열을 업데이트 할 때 이전 값을 새 값에 매핑하려고합니다. UDF없이 pyspark에서이를 수행하는 방법은 다음과 같습니다.

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

DataFramesRDD를 기반으로합니다. RDD는 변경 불가능한 구조이며 현장에서 요소를 업데이트 할 수 없습니다. 값을 변경하려면 SQL과 같은 DSL 또는 .NET과 같은 RDD 작업을 사용하여 원본을 변환하여 새 DataFrame을 만들어야합니다 map.

강력 추천 슬라이드 데크 : 대규모 데이터 과학을위한 Spark의 DataFrames 소개 .


maasg가 말한 것처럼 이전 DataFrame에 적용된 맵의 결과에서 새 DataFrame을 만들 수 있습니다. df두 개의 행이 있는 지정된 DataFrame의 예 :

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

열 유형이 변경되면 대신 올바른 스키마를 제공해야합니다 df.schema. org.apache.spark.sql.Row사용 가능한 메소드 대한 API를 확인하십시오 . https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[업데이트] 또는 Scala에서 UDF 사용 :

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

열 이름을 동일하게 유지해야하는 경우 다시 이름을 바꿀 수 있습니다.

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

참고 URL : https://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark

반응형