development

Spark SQL : 열 목록에 집계 함수 적용

big-blog 2020. 12. 11. 19:04
반응형

Spark SQL : 열 목록에 집계 함수 적용


할 때 데이터 프레임의 모든 (또는 목록) 열에 집계 함수를 적용하는 방법이 groupBy있습니까? 즉, 모든 열에 대해이 작업을 피할 수있는 방법이 있습니까?

df.groupBy("col1")
  .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)

집계 함수를 여러 열에 적용하는 방법에는 여러 가지가 있습니다.

GroupedData클래스를 포함하는 일반적인 기능을위한 방법들을 제공하고 count, max, min, meansum같이 직접 다음 사용될 수있는 :

  • 파이썬 :

    df = sqlContext.createDataFrame(
        [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)],
        ("col1", "col2", "col3"))
    
    df.groupBy("col1").sum()
    
    ## +----+---------+-----------------+---------+
    ## |col1|sum(col1)|        sum(col2)|sum(col3)|
    ## +----+---------+-----------------+---------+
    ## | 1.0|      2.0|              0.8|      1.0|
    ## |-1.0|     -2.0|6.199999999999999|      0.7|
    ## +----+---------+-----------------+---------+
    
  • 스칼라

    val df = sc.parallelize(Seq(
      (1.0, 0.3, 1.0), (1.0, 0.5, 0.0),
      (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2))
    ).toDF("col1", "col2", "col3")
    
    df.groupBy($"col1").min().show
    
    // +----+---------+---------+---------+
    // |col1|min(col1)|min(col2)|min(col3)|
    // +----+---------+---------+---------+
    // | 1.0|      1.0|      0.3|      0.0|
    // |-1.0|     -1.0|      0.6|      0.2|
    // +----+---------+---------+---------+
    

선택적으로 집계해야하는 열 목록을 전달할 수 있습니다.

df.groupBy("col1").sum("col2", "col3")

또한 키와 함수를 값으로 열과 함께 사전 / 맵을 전달할 수 있습니다.

  • 파이썬

    exprs = {x: "sum" for x in df.columns}
    df.groupBy("col1").agg(exprs).show()
    
    ## +----+---------+
    ## |col1|avg(col3)|
    ## +----+---------+
    ## | 1.0|      0.5|
    ## |-1.0|     0.35|
    ## +----+---------+
    
  • 스칼라

    val exprs = df.columns.map((_ -> "mean")).toMap
    df.groupBy($"col1").agg(exprs).show()
    
    // +----+---------+------------------+---------+
    // |col1|avg(col1)|         avg(col2)|avg(col3)|
    // +----+---------+------------------+---------+
    // | 1.0|      1.0|               0.4|      0.5|
    // |-1.0|     -1.0|3.0999999999999996|     0.35|
    // +----+---------+------------------+---------+
    

마지막으로 varargs를 사용할 수 있습니다.

  • 파이썬

    from pyspark.sql.functions import min
    
    exprs = [min(x) for x in df.columns]
    df.groupBy("col1").agg(*exprs).show()
    
  • 스칼라

    import org.apache.spark.sql.functions.sum
    
    val exprs = df.columns.map(sum(_))
    df.groupBy($"col1").agg(exprs.head, exprs.tail: _*)
    

유사한 효과를 얻을 수있는 다른 방법이 있지만 대부분의 경우 충분합니다.

또한보십시오:


Another example of the same concept - but say - you have 2 different columns - and you want to apply different agg functions to each of them i.e

f.groupBy("col1").agg(sum("col2").alias("col2"), avg("col3").alias("col3"), ...)

Here is the way to achieve it - though I do not yet know how to add the alias in this case

See the example below - Using Maps

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), StructField("allowed1", IntegerType, true)))
val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), ("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", "diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)

val l = List("allowed", "allowed1")
val exprs = l.map((_ -> "sum")).toMap
claimRDD2DF1.groupBy("pid").agg(exprs) show false
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")

claimRDD2DF1.groupBy("pid").agg(exprs) show false

참고URL : https://stackoverflow.com/questions/33882894/spark-sql-apply-aggregate-functions-to-a-list-of-columns

반응형