AWS Tip

Best AWS, DevOps, Serverless, and more from top Medium writers .

Follow publication

AWS Glue — Querying Nested JSON with Relationalize Transform

--

AWS Glue has transform Relationalize that can convert nested JSON into columns that you can then write to S3 or import into relational databases. As an example -

Initial Schema:

>>> df.printSchema()
root
|-- Id: string (nullable = true)
|-- LastUpdated: long (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- Properties: struct (nullable = true)
| |-- choices: string (nullable = true)
| |-- database: string (nullable = true)
| |-- object: string (nullable = true)
| |-- store_time: string (nullable = true)

After Relationalize transformation:

>>> df.printSchema()
root
|-- Id: string (nullable = true)
|-- LastUpdated: long (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- Properties.choices: string (nullable = true)
|-- Properties.database: string(nullable=true)
|-- Properties.object: string (nullable = true)
|-- Properties.stored_time: string (nullable = true)

What if “Properties” was “string” type? Would relationalize work in that case? The answer is NO.

In this blog I will walk you through on how you can use relationalize if the column is “string” type.

I have a AWS Glue job to pull the data from DynamoDB in another account. Lets look at one of the records from table:-

{'LastUpdatedBy': 'System', 
'FixedProperties': '{"choices":"one",
"timetstamp":"1577194786280",
"Id":"bce07b2a8ef5",
"score":null}',
'LastUpdated': Decimal('1578964270325'),
'StatementId': 'c24ad711696e'
}

The “FixedProperties” key is a string containing json records. Now lets look at steps to convert it to struct type.

1. Create AWS Glue DynamicFrame.

dynamic_dframe = glueContext.create_dynamic_frame.from_rdd(spark.sparkContext.parallelize(table_items),'table_items')

2. Describe the Glue DynamicFrame Schema.

dynamic_dframe.printSchema()root
|-- FixedProperties: string
|-- LastUpdated: decimal
|-- LastUpdatedBy: string
|-- StatementId: string

As you would have noticed, Glue DynamicFrame recognizes key “FixedProperties” as string because value is in single quotes. So how do we convert it to struct type?

One of the way is to use pyspark functionality — to_json. To use this you will first need to convert the Glue DynamicFrame to Apache Spark dataframe using .toDF()

If the schema is the same for all records you can convert to a struct type by defining the schema like this:

schema = StructType([StructField("choices", StringType(), True),
StructField("Id", StringType(), True),
StructField("score", StringType(), True),
StructField("timetstamp", StringType(), True)],
)
df.withColumn("FixedProperties", from_json(
col("FixedProperties"), schema)).show(truncate=False)
#+-----------------+-------------+-------------+------------------------------------+
#|StatementId |LastUpdated |LastUpdatedBy|FixedProperties |
#+-----------------+-------------+-------------+------------------------------------+
#|c24ad711696e |1578964270325|System |[one, bce07b2a8ef5, , 1577194786280]|
#+-----------------+-------------+-------------+------------------------------------+

The other way which I would say is the simpler way, is using AWS Glue “Unbox” transformer. It unboxes string into DynamicFrame.

unbox = Unbox.apply(frame = dynamic_dframe, path = "FixedProperties", format="json")unbox.printSchema()root
|-- FixedProperties: struct
| |-- choices: string
| |-- Id: string
| |-- timetstamp: string
| |-- score: null
|-- LastUpdated: decimal
|-- LastUpdatedBy: string
|-- StatementId: string

Isn’t that simple :)

3. The “LastUpdated” contains epoch time so lets convert to timestamp.

spark_df = unbox.toDF()spark_df_transformed = (
spark_df
.withColumn("LastUpdated", f.from_unixtime(f.col("LastUpdated")/1000).cast(t.TimestampType()))
.withColumn("Stored_time", f.from_unixtime(f.col("FixedProperties.timestamp")/1000).cast(t.TimestampType()))
)

4. Review the Spark dataframe schema.

spark_df_transformed.printSchema()root
|-- FixedProperties: struct (nullable = true)
| |-- choices: string (nullable = true)
| |-- Id: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- score: null (nullable = true)
|-- LastUpdated: timestamp (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- StatementId: string (nullable = true)
|-- Stored_timestamp: timestamp (nullable = true)

5. Covert the Apache Spark dataframe to Glue DynamicFrame and drop the FixedProperties.timestamp. (there could be better way of doing it.)

dy_df = DynamicFrame.fromDF(spark_df_transformed, glueContext, "dy_df")dropped_fpt = DropFields.apply(frame = dy_df, paths = ["FixedProperties.timestamp"],transformation_ctx="dropped_fpt")dropped_fpt.printSchema()root
|-- FixedProperties: struct (nullable = true)
| |-- choices: string (nullable = true)
| |-- Id: string (nullable = true)
| |-- score: null (nullable = true)
|-- LastUpdated: timestamp (nullable = true)
|-- LastUpdatedBy: string (nullable = true)
|-- StatementId: string (nullable = true)
|-- Stored_timestamp: timestamp (nullable = true)

Drum roll, please……

6. Finally, time to use relationalize to un-nest the struct type.

tempDir = "s3://content-demo/temp/"dfc = dropped_fpt.relationalize("root", tempDir)dyf_out = dfc.select("root")dyf_out.printSchema()root
|-- FixedProperties.choices: string
|-- FixedProperties.Id: string
|-- FixedProperties.score: null
|-- LastUpdated: timestamp
|-- LastUpdatedBy: string
|-- StatementId: string
|-- Stored_timestamp: timestamp
dyf_out.toDF().show()

7. Apply mapping using the ApplyMapping Class.

# Rename, cast, and nest with apply_mapping
final_data = ApplyMapping.apply(frame=dyf_out,
mappings = [
("`FixedProperties.choices`", "string", "choices", "string"),
("`FixedProperties.Id`", "string", "id", "string"),
("`FixedProperties.score`", "string", "score", "string"),
("Stored_time", "timestamp", "stored_time", "timestamp"),
("lastupdated", "timestamp", "last_updated", "timestamp"),
("lastupdatedby", "string", "last_updated_by", "string"),
("statementid", "string", "statement_id", "string")],
transformation_ctx = "final_data")

8. Write the DynamicFrame to S3.

S3_PARQUET = "s3://abc-content-dev/a1/db/dt={}".format(datetime.utcnow().strftime("%Y-%m-%d-%H-%M"))# Write it out in Parquet
glueContext.write_dynamic_frame.from_options(
frame=final_data,
connection_type = "s3",
connection_options = {"path": S3_PARQUET},
format = "parquet")

Once the parquet files are written to S3, you can use a AWS Glue crawler to populate the Glue Data Catalog with table and query the data from Athena.

If you have enjoyed reading it and it has helped you then you can buy me a coffee :)

The viewpoints expressed are mine and not of my employer. The views, opinions and comments expressed by visitors are theirs alone and may not reflect my opinion. If you enjoyed reading this article feel free to connect with me on LinkedIn. If you’re new to Medium, sign up using my Membership Referral.

Sign up to discover human stories that deepen your understanding of the world.

--

--

Published in AWS Tip

Best AWS, DevOps, Serverless, and more from top Medium writers .

Written by Anand Prakash

Avid learner of technology solutions around Machine Learning, Big-Data, Databases. 5x AWS Certified | 5x Oracle Certified.

Responses (3)

Write a response