sourcetip

스파크 상태의 데이터 프레임 열 업데이트

fileupload 2023. 10. 15. 17:34
반응형

스파크 상태의 데이터 프레임 열 업데이트

새로운 스파크 DataFrame API를 보면 데이터프레임 열 수정이 가능한지 여부가 불투명합니다.

행의 값을 변경하려면 어떻게 해야 합니까?x기둥.y데이터 프레임에 대한?

pandas다음과 같습니다.

df.ix[x,y] = new_value

편집: 아래에 말씀하신 내용을 통합하면 기존 데이터 프레임을 불변으로 수정할 수 없고, 원하는 수정사항으로 새로운 데이터 프레임을 반환할 수 있습니다.

조건에 따라 열에 있는 값을 교체하려는 경우 다음과 같습니다.np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

열에 대한 일부 작업을 수행하고 데이터 프레임에 추가되는 새 열을 생성하려는 경우:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

새 열에 이전 열과 이름이 같도록 하려면 다음 단계를 추가할 수 있습니다.

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')

열을 수정할 수는 없지만 열에 대해 작업을 수행하고 해당 변경 사항을 반영하는 새 DataFrame을 반환할 수 있습니다.그것을 위해 당신은 먼저UserDefinedFunction해당 기능을 적용한 후 대상 열에만 선택적으로 적용하는 작업을 수행합니다.Python의 경우:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df현재와 동일한 스키마를 가지고 있습니다.old_df(라고 assuming합니다.old_df.target_column유형의StringType(또한) 열에 있는 모든 값을 포함합니다.target_column될 것이다new_value.

열을 업데이트할 때 일반적으로 이전 값을 새 값으로 매핑합니다.UDF 없이 파이스파크에서 이를 수행할 수 있는 방법은 다음과 같습니다.

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

DataFrames는 RDD를 기반으로 합니다. RDD는 불변의 구조이며 현장에서 요소 업데이트를 허용하지 않습니다.값을 변경하려면 SQL과 같은 DSL이나 RDD 연산을 사용하여 원래의 것을 변환하여 새 DataFrame을 만들어야 합니다.map.

매우 권장되는 슬라이드 데크:대규모 데이터 과학을 위한 스파크의 데이터 프레임 소개.

이전 DataFrame에 적용된 맵의 결과로부터 새 DataFrame을 생성할 수 있다고 masg가 말하는 것처럼.주어진 DataFrame에 대한 예df두 행:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

열의 유형이 변경되는 경우 다음 대신 올바른 스키마를 지정해야 합니다.df.schema. 의 api를 확인해보세요.org.apache.spark.sql.Row사용 가능한 방법: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[업데이트] 또는 스칼라에서 UDF 사용:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

열 이름을 동일하게 유지해야 하는 경우 이름을 다시 지정할 수 있습니다.

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

pyspark.sql.functions에서 col을 가져오고 문자열( 문자열 a, 문자열 b, 문자열 c)을 기준으로 다섯 번째 열을 정수(0,1,2)로 업데이트하여 새 DataFrame으로 만듭니다.

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))

언급URL : https://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark

반응형