스파크 상태의 데이터 프레임 열 업데이트
새로운 스파크 DataFrame API를 보면 데이터프레임 열 수정이 가능한지 여부가 불투명합니다.
행의 값을 변경하려면 어떻게 해야 합니까?x
기둥.y
데이터 프레임에 대한?
인pandas
다음과 같습니다.
df.ix[x,y] = new_value
편집: 아래에 말씀하신 내용을 통합하면 기존 데이터 프레임을 불변으로 수정할 수 없고, 원하는 수정사항으로 새로운 데이터 프레임을 반환할 수 있습니다.
조건에 따라 열에 있는 값을 교체하려는 경우 다음과 같습니다.np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
열에 대한 일부 작업을 수행하고 데이터 프레임에 추가되는 새 열을 생성하려는 경우:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
새 열에 이전 열과 이름이 같도록 하려면 다음 단계를 추가할 수 있습니다.
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
열을 수정할 수는 없지만 열에 대해 작업을 수행하고 해당 변경 사항을 반영하는 새 DataFrame을 반환할 수 있습니다.그것을 위해 당신은 먼저UserDefinedFunction
해당 기능을 적용한 후 대상 열에만 선택적으로 적용하는 작업을 수행합니다.Python의 경우:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
현재와 동일한 스키마를 가지고 있습니다.old_df
(라고 assuming합니다.old_df.target_column
유형의StringType
(또한) 열에 있는 모든 값을 포함합니다.target_column
될 것이다new_value
.
열을 업데이트할 때 일반적으로 이전 값을 새 값으로 매핑합니다.UDF 없이 파이스파크에서 이를 수행할 수 있는 방법은 다음과 같습니다.
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
DataFrames
는 RDD를 기반으로 합니다. RDD는 불변의 구조이며 현장에서 요소 업데이트를 허용하지 않습니다.값을 변경하려면 SQL과 같은 DSL이나 RDD 연산을 사용하여 원래의 것을 변환하여 새 DataFrame을 만들어야 합니다.map
.
매우 권장되는 슬라이드 데크:대규모 데이터 과학을 위한 스파크의 데이터 프레임 소개.
이전 DataFrame에 적용된 맵의 결과로부터 새 DataFrame을 생성할 수 있다고 masg가 말하는 것처럼.주어진 DataFrame에 대한 예df
두 행:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
열의 유형이 변경되는 경우 다음 대신 올바른 스키마를 지정해야 합니다.df.schema
. 의 api를 확인해보세요.org.apache.spark.sql.Row
사용 가능한 방법: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[업데이트] 또는 스칼라에서 UDF 사용:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
열 이름을 동일하게 유지해야 하는 경우 이름을 다시 지정할 수 있습니다.
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
pyspark.sql.functions에서 col을 가져오고 문자열( 문자열 a, 문자열 b, 문자열 c)을 기준으로 다섯 번째 열을 정수(0,1,2)로 업데이트하여 새 DataFrame으로 만듭니다.
from pyspark.sql.functions import col, when
data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
언급URL : https://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark
'sourcetip' 카테고리의 다른 글
두 개의 사용자 지정 게시물 유형 동일한 범주 (0) | 2023.10.15 |
---|---|
부분 문자열 색인 가져오기 (0) | 2023.10.15 |
jQuery 1.8+를 포함하여 jQuery Contains 대소문자를 구분하지 않게 하려면 어떻게 해야 합니까? (0) | 2023.10.15 |
재로드 및 #해킹 없이 윈도우 위치를 변경할 수 있는 방법은? (0) | 2023.10.15 |
사용자가 웹페이지의 스크린샷을 찍는 것을 방지하는 방법은 무엇입니까? (0) | 2023.10.15 |