Read XML using Spark - example for parsing OSM changeset data



Edit: fixed xml schema which had an error

Databricks created a library that allows Spark to parse XML files.

Here I’ll convert OpenStreetMap dump of changesets that can be downloaded from planet.osm.org. XML file is compressed using BZ2 codec but thankfully Spark handles that out of the box.

Example will use Python but Scala code would look very similar.

Create session

from multiprocessing import cpu_count

from pyspark.sql import SparkSession

spark = (
    SparkSession.
    builder
    .master(f"local[{cpu_count() - 1}]")
    .appName("Spark App")
    .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC')  # important so timestamps won't be auto converted to different TZ
    .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
    .config('spark.sql.session.timeZone', 'UTC')
    .config("spark.executor.memory", "3g")  # some params that theoretically should let spark use more memory, facultative
    .config("spark.driver.memory", "10g")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "3g")
    .config('spark.jars.packages', 'com.databricks:spark-xml_2.12:0.16.0')  # use Databricks XML library
    .getOrCreate()
)

Prepare schema

While spark can autodetect schema it’s very slow as it would need to decompress and parse the entire file first. We’ll define schema so it’s faster.

Fragment of XML file:

<?xml version="1.0" encoding="UTF-8"?>
<osm license="http://opendatacommons.org/licenses/odbl/1-0/" copyright="OpenStreetMap and contributors" version="0.6" generator="planet-dump-ng 1.2.4" attribution="http://www.openstreetmap.org/copyright" timestamp="2023-01-16T01:00:02Z">
 <bound box="-90,-180,90,180" origin="http://www.openstreetmap.org/api/0.6"/>
 <changeset id="1" created_at="2005-04-09T19:54:13Z" closed_at="2005-04-09T20:54:39Z" open="false" user="Steve" uid="1" min_lat="51.5288506" min_lon="-0.1465242" max_lat="51.5288620" max_lon="-0.1464925" num_changes="2" comments_count="33"/>
 <changeset id="909337" created_at="2009-04-23T08:06:48Z" closed_at="2009-04-23T08:06:51Z" open="false" user="Alberto58" uid="91650" min_lat="44.4486102" min_lon="11.6230873" max_lat="44.4623357" max_lon="11.6425428" num_changes="40" comments_count="0">
  <tag k="comment" v="Via Graffio"/>
  <tag k="created_by" v="JOSM"/>
 </changeset>
 </osm>

Spark schema:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType, ArrayType
from pyspark.sql import functions as f

schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("created_at", TimestampType()),
    StructField("closed_at", TimestampType()),
    StructField("open", StringType()),
    StructField("user", StringType()),
    StructField("uid", IntegerType()),
    StructField("min_lat", DoubleType()),
    StructField("min_lon", DoubleType()),
    StructField("max_lat", DoubleType()),
    StructField("max_lon", DoubleType()),
    StructField("num_changes", IntegerType()),
    StructField("comments_count", IntegerType()),
    StructField("tag", ArrayType(StructType([
        StructField("k", StringType()),
        StructField("v", StringType()),
    ]))),
])

Create DataFrame

df = (
    spark.read
    .format("com.databricks.spark.xml")
    .schema(schema)
    .option("mode", "FAILFAST")  # throw an error if any row can't be parsed
    .option("rootTag", "osm")
    .option("rowTag", "changeset")
    .option("inferSchema", "false")  # we specify schema manually above
    .option("ignoreNamespace", "true")  # no namespaces but why not be explicit
    .option("attributePrefix", "")  # don't add any prefix to attributes as we know there won't be any conflict in names
    .load("/home/tomasz/Downloads/changesets-230116.osm.bz2")
)

Adjust DataFrame format

Possible there is a way to specify schema for MapType straight away but I was too lazy to try so we are reading tags into StructType and then convert to MapType.

df = (
    df
    # change struct to map - it will be easier to query later
    .withColumn("tags", f.when(f.col("tag").isNotNull(), f.create_map("tag.k", "tag.v")).otherwise(None)).drop("tag")
    # add column with year which we will use to partition dataset
    .withColumn("opened_year", f.year(f.col("created_at")))
    # standardize "open" column to boolean
    .withColumn("open", f.when(f.col("open") == f.lit("true"), True).when(f.col("open") == f.lit("false"), False).otherwise(None))
    # drop user name
    .drop("user")
)
# pull out some commonly used values from map into separate columns to make querying faster
df = (
    df
    .withColumn("created_by", f.col("tags")["created_by"])
    .withColumn("source", f.col("tags")["source"])
    .withColumn("locale", f.col("tags")["locale"])
    .withColumn("bot", f.col("tags")["bot"])
    .withColumn("review_requested", f.col("tags")["review_requested"])
    .withColumn("hashtags", f.col("tags")["hashtags"])
)

Save DataFrame to Parquet

Normally you wouldn’t repartition the dataset but for my use case I wanted to have one file per partition.

(
    df
    .repartition("opened_year")
    .write
    .partitionBy("opened_year")
    .mode("overwrite")
    .format("parquet")
    .option("spark.sql.parquet.compression.codec", "zstd")
    .save("/home/tomasz/PycharmProjects/sedona_xml_bz2/parquet_partitioned_1_file_per_partition/")
)