sourcecode

Pyspark: json 문자열 열을 구문 분석합니다.

copyscript 2023. 2. 11. 09:32
반응형

Pyspark: json 문자열 열을 구문 분석합니다.

pyspark 데이터 프레임이 하나 있는데json여기서 각 행은 json의 유니코드 문자열입니다.각 행을 구문 분석하고 각 행이 구문 분석된 json인 새 데이터 프레임을 반환하고 싶습니다.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

각 행에 대한 매핑을 시도했습니다.json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

하지만 이것은TypeError: expected string or buffer

이 문제의 일부분은, R&A에서 변환했을 때,dataframe에 대해서rdd스키마 정보가 손실되어 스키마 정보에 수동으로 입력해 보았습니다.

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

하지만 난 똑같아TypeError.

답을 보면, 이 행들을 평탄하게 만드는 것 같습니다.flatMap도움이 될 수도 있지만, 그것 또한 성공하지 못하고 있습니다.

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

다음의 에러가 표시됩니다.AttributeError: 'unicode' object has no attribute 'get'.

Spark 2.1+의 경우 를 사용하여 다음과 같이 데이터 프레임 내의 다른 비json 열을 보존할 수 있습니다.

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

스파크가 json 문자열 열의 스키마를 파생할 수 있도록 합니다.그 다음에df.json열은 더 이상 StringType이 아니라 올바르게 디코딩된 json 구조(예: 중첩됨)입니다.StrucType및 기타 모든 컬럼df그대로 보존됩니다.

다음과 같이 json 컨텐츠에 액세스할 수 있습니다.

df.select(col('json.header').alias('header'))

json 문자열이 있는 데이터 프레임을 구조화된 데이터 프레임으로 변환하는 것은 실제로 데이터 프레임을 문자열의 RDD로 변환하는 경우 매우 간단합니다(http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets) 참조).

예를 들어 다음과 같습니다.

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

JSON 형식이 완전하지 않으면 기존 답변은 작동하지 않습니다.예를 들어, RDD 기반의 스키마 추론에서는 JSON이 곱슬 괄호로 둘러싸여 있는 것을 상정하고 있습니다.{}잘못된 스키마를 제공합니다(에 기재되어 있습니다).null값) 예를 들어 데이터가 다음과 같은 경우:

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

JSON이 다른 JSON 오브젝트에 존재하도록 삭제함으로써 이 문제를 회피하기 위한 함수를 작성했습니다.

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

주의:psf=pyspark.sql.functions.

다음은 @nolan-conaway의 간결한(Spark SQL) 버전입니다.parseJSONCols기능.

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

추신. 폭발 기능도 추가했습니다. :P

몇 가지 HIVE SQL 유형을 알아야 합니다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def map2json(dict):
    import json
    return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())

spark.sql("select map2json(map('a', '1'))").show()

각 JSON의 스키마를 모르는 경우(다른 경우도 있습니다), 다음을 사용할 수 있습니다.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

데이터 세트에 있는 다른 열은 유지되지 않습니다.출처 : https://github.com/apache/spark/pull/22775

이 답변은 JSON 문자열이 객체가 아닌 JSON 어레이일 경우 컨텍스트 추가를 위한 것입니다(rep이 없기 때문에 코멘트할 수 없습니다).Martin Tapp의 솔리드 답변을 사용하면 열에 null 값이 반환됩니다.

dr;dr

JSON 문자열이 다음과 같은 배열 개체인 경우:

[{"a":1, "b":1.0}]

spark.read.json는 이러한 배열에 포함된 요소의 스키마를 포함하는 데이터 프레임을 반환합니다.는 배열 자체를 포함하지 않습니다. from_json에 들지 않기 , 이는 '', '아까', '아까', '아까', '아까', '아까', '아까'로 유추한 스키마를 정리할 수 .spark.read.json an ArrayType그러면 올바르게 해석됩니다(모든 것에 대해 null 값을 반환하지 않음).

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType

array_item_schema = \
  spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema

json_array_schema = ArrayType(array_item_schema, True)

arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))

objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))

도입부

놀런 코너웨이의 부록으로, JSON이 그 형태일 때

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

의 pyspark는 pyspark입니다.spark.read.json()는 배열을 단일 행이 아닌 행으로 변환할 개체의 집합으로 간주합니다.

PySpark 3.3.0 쉘에서의 실행 예를 참조하십시오.

>>> myjson        = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson   = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
|  a|  b|
+---+---+
|1.0|  1|
|2.0|  2|
|3.0|  3|
|4.0|  4|
+---+---+

>>> spark_read_df.printSchema()
root
 |-- a: double (nullable = true)
 |-- b: long (nullable = true)

라는 것을 알 수 .myjson ★★★★★★★★★★★★★★★★★」myotherjsonJSON 객체의 JSON 배열이 확장되어 포함된 각 객체에 대한 행이 지정되었습니다. 문자열 중 인 JSON 1도 되었습니다.rawobjectjson그냥 생물일 뿐이죠.여기서는 설명서가 조금 부족한 것 같습니다.어레이 오브젝트의 취급에 대해서는 언급하지 않았기 때문입니다.

이제 JSON 문자열 열을 사용하여 데이터 프레임을 만듭니다..rawobjectjson 되겠지만from_json그럼 각 문자열에 동일한 스키마가 필요합니다(존재하는 경우 최상위 배열도 포함됩니다).

>>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
...     (myjson,),
...     (myotherjson,),
... ]
>>> json_df_schema = StructType([
...     StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
|        json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+

그럼 서 제가 .spark.read.jsonfrom_json오브젝트에 JSON 컬럼을 읽었지만, 완전한 컬럼을 계속 반환했습니다.Nolan Conaway가 언급했듯이 스키마가 에 전달될 때 이러한 현상이 발생합니다.from_json지정된 문자열에 적용할 수 없습니다.

되지만, '어레이'로된다는 것입니다.spark_read_df.printSchema() 「」에 의해서 를 나타냅니다.spark.read.json()는 어레이 레벨을 무시합니다.

솔루션

그래서 저는 스키마 내의 최상위 어레이를 읽기만 하면 되는 솔루션을 선택하게 되었습니다.

from pyspark.sql import functions as F

# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()

# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)

json_extracted_df = raw_json_df.select(
    F.from_json('json_strings', json_array_schema)
        .alias('json_arrays')
)
>>> json_extracted_df.show()
+--------------------+
|         json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
|          [{3.0, 3}]|
+--------------------+

>>> json_extracted_df.printSchema()
root
 |-- json_arrays: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: double (nullable = true)
 |    |    |-- b: long (nullable = true)

수 .pyspark.sql.functions.explode:

>>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+

>>> exploded_df.printSchema()
root
 |-- objects: struct (nullable = true)
 |    |-- a: double (nullable = true)
 |    |-- b: long (nullable = true)

언급URL : https://stackoverflow.com/questions/41107835/pyspark-parse-a-column-of-json-strings

반응형