FHIR Chat · Parquet Bulk Data format · bulk data

Stream: bulk data

Topic: Parquet Bulk Data format


view this post on Zulip Gidon Gershinsky (Nov 25 2019 at 14:47):

Hi All. It was good meeting many of you at the EU DevDays last week, and thanks for attending my talk on our FHIR/Parquet
project.

view this post on Zulip Gidon Gershinsky (Nov 25 2019 at 14:47):

@Josh Mandel - per our discussion, off-the-shelf Apache Spark can transform NDJSON to Parquet files.
I've checked that these commands work (with Synthea patient data):

scala>  val patients= spark.read.json("/path/patients.json")
scala> patients.write.parquet("/path/patients.parquet")

The Parquet data set schema is identical to the NDJSON source:

scala> val p_patients = spark.read.parquet("/path/patients.parquet")
scala> val s1 = patients.schema.fields.map(f => (f.name, f.nullable))
scala> val s2 = p_patients.schema.fields.map(f => (f.name, f.nullable))
scala> s1.diff(s2).isEmpty
res1: Boolean = true

Also, the data is the same:

scala> p_patients.except(patients).count
res2: Long = 0

So it looks like the data in Parquet files is a perfect copy of the data in NDJSON original.

view this post on Zulip Gidon Gershinsky (Nov 25 2019 at 14:48):

The next steps could be:

Phase 1) We'll investigate the following performance aspects of Parquet conversion:
a) compression (how much transmission bandwidth is saved in SNAPPY and GZip compression modes, defined in Parquet standard)
b) SQL query efficiency on exported data (columnar projection and predicate pushdown can be imperfect when executed on nested columns, we need to test this).
Once we have enough data and are comfortable with the results, we might add the highlights
to the FHIR spec draft. Since NDJSON and Parquet schema/data are identical, maybe Parquet could then be moved from candidates to the list of bulk formats (spec draft), for further validation by the community.
No other changes are required, other than allowing 'parquet' value in the accept/_outputFormat parameters (plus, optionally, adding the compression mode in the request).

Phase 2) Per your suggestion, we will then add an advanced feature where the receiver of bulk export could specify the
required Parquet schema.
If not specified, the Parquet schema will be identical to the original.
However, a changed schema can help with analytics performance, since using the original schema might slow down the queries (item 1b above).
I guess we'll need the receiver to send a mapping of the original NDJSON schema to the required Parquet schema
(what to flatten, what to drop - and maybe which extensions should be made "first class citizens", per the sql-on-fhir work). Using FHIRPath , profiles, etc - requires investigation. Can also start with receiver requesting the original NDJSON schema of the data to be exported (to decide what schema to request / what to map).

Phase 3) Once the Parquet encryption mechanism is released, we can add another advanced feature where the receiver of bulk export can require encryption of the exported data.
Initially, all columns can be encrypted - with different data keys, but with the same master key. The receiver will send
its public key to the FHIR server; the server will use it to encrypt the master key and send it with the data.
Later, we can enable the receiver to specify what columns to encrypt (the rest won't be encrypted). And/or - the FHIR
Server can be pre-configured with sensitivity of columns in each table, so the FHIR Server itself knows which columns must be encrypted.

Does it make sense? Any feedback will be appreciated!

view this post on Zulip Veliyan Georgiev (Nov 25 2019 at 19:42):

@Gidon Gershinsky What is the schema that you end up getting? How many columns does it generate?

view this post on Zulip Gidon Gershinsky (Nov 25 2019 at 20:55):

@Veliyan Georgiev This is returned for both NDJSON and Parquet data:

scala> patients.printSchema()
root
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- extension: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- extension: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- valueDecimal: double (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |-- line: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- postalCode: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |-- birthDate: string (nullable = true)
 |-- communication: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- language: struct (nullable = true)
 |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- display: string (nullable = true)
 |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |-- extension: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- extension: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |-- valueCoding: struct (nullable = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- display: string (nullable = true)
 |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |    |-- valueString: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- valueAddress: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |-- valueCode: string (nullable = true)
 |    |    |-- valueDecimal: double (nullable = true)
 |    |    |-- valueString: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- identifier: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- system: string (nullable = true)
 |    |    |-- type: struct (nullable = true)
 |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- display: string (nullable = true)
 |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- maritalStatus: struct (nullable = true)
 |    |-- coding: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- display: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |    |-- text: string (nullable = true)
 |-- multipleBirthBoolean: boolean (nullable = true)
 |-- name: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- family: string (nullable = true)
 |    |    |-- given: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- prefix: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- use: string (nullable = true)
 |-- resourceType: string (nullable = true)
 |-- telecom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- system: string (nullable = true)
 |    |    |-- use: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- text: struct (nullable = true)
 |    |-- div: string (nullable = true)
 |    |-- status: string (nullable = true)

view this post on Zulip Josh Mandel (Nov 25 2019 at 21:48):

Thanks for the follow-up here @Gidon Gershinsky, and great meeting you too. Re: schema, I'm still not 100% sure what's happening under the hood in this example. For example, is the .write.parquet()inspecting the data to come up wtih a schema? Will the schema be different for different NDJSON files (e.g., one full of Patient resoures vs one full of Observation resources? Or one full of Observations with .component elements vs one full of Observations without .component elemenents? Or one with 2-level-deep Questionnaire items vs 3-level-deep?)

view this post on Zulip Josh Mandel (Nov 25 2019 at 21:49):

In other words, how do the particular instance data shape the schema, and does it matter? Do consumers of the data need to care? Can data be combined arbitrarily without worrying about this, as long as the schema are "compatible" (like, they might differ but don't contradict each other by using the same field name to capture distinct data)?

view this post on Zulip Gidon Gershinsky (Nov 26 2019 at 14:31):

@Josh Mandel , the under the hood machinery is based on DataFrames, the main construct in Spark (SQL) - a distributed table
with schema and data.
Both patient and p_patient are DataFrame objects. When a DataFrame is created from an NDJSON file, as in the

val patients= spark.read.json("/path/patients.json")

the schema is inferred from the JSON dataset
https://spark.apache.org/docs/latest/sql-data-sources-json.html

A created DataFrame can then be saved in a file, eg in Parquet format:

patients.write.parquet("/path/patients.parquet")

This operation performs "writing Parquet files that automatically preserves the schema of the original data"
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

Then, when we create a new DataFrame (in another process - the reader of Bulk Export data, eg Spark run by
researchers etc) from this parquet file,

val p_patients = spark.read.parquet("/path/patients.parquet")

we get the exact copy of the original NDJSON data and schema. Btw, DataFrames loaded from Parquet files, get the
explicit schema stored in Parquet metadata, instead of inferring it from data as done for NDJSON files.
In any case, the consumers don't need to care, they automatically get the original schema, exactly
the same as they have today with NDJSON export.

So the schema of Patients and Observations will certainly be different.

Regarding your questions on Observation components and Questionnaire level depth - I believe Spark will be
able to accurately infer the schema - to reflect the level depth or presence of components. However,
I want to verify that, will set up and run additional tests.

There is also a question of different lines with a different schema in the same Resource. Eg all Observations
are exported as one NDJSON file; but some Observation lines have a component element, while others don't.
(if this is not allowed in FHIR - please let me know).
When loading this file as a DataFrame, Spark will likely create a wider schema - with the component column(s).
They will be empty for rows without components. Again, something we need to verify, I'll check it .

This becomes even more interesting when the full FHIR Server dataset (with all Resource types) is exported as one
NDJSON file (or a collection of NDJSON files, for staged download - but with each file having many Resource types).
Then the Spark will probably create a super-set of all schemas. We'll verify the size and
efficiency implications when writing this data to Parquet. However, a simple fallback strategy
would be to formally suggest/require Parquet bulk export implementations to create separate files for separate Resource
types. But again, TBD.

view this post on Zulip Josh Mandel (Nov 26 2019 at 14:35):

Thanks -- this is really helpful background, and it'll be great to have the results from your verification tests (e.g., mixed resource types, etc). Note that in FHIR even for files with a single resource type (like, Observations.ndjson), each Observation.contained[] can contain an arbitrary list of FHIR "contained" resources of any time, so it'll be helpful to know the impacts of this on performance, too.

view this post on Zulip Veliyan Georgiev (Dec 02 2019 at 13:20):

@Gidon Gershinsky Hi Gidon..this is very helpful. Caution ahead - what I suspect might happen is that should you decide to do some unions or joins you might end up with errors. In example - the schema has multipleBirthBoolean but it doesn't have multipleBirthInteger. What happens under the hood is - as you read schema 1 the columns will be in a certain order based on that schema and when you read schema 2 - they will be different. When you do certain operations like Union - the columns will not match. There is an option to match by name rather than column index but it is flaky with FHIR (or at least it was for us). The best thing you can do is create your schema from a full (fake) FHIR bundle with all resources and all fields populated with something (especially the polymorphic properties like that one in example). This way the columns will always be in the proper order.

view this post on Zulip Gidon Gershinsky (Dec 03 2019 at 14:49):

@Veliyan Georgiev Hi Veliyan, both NDJSON and Parquet work with column names, explicitly mapped to the
in-file data, so there wouldn't be a confusion related to column order etc. In your example, each Parquet file with
Patient data, will have a column named multipleBirthBoolean (assuming it was present in the NDJSON original). In each
of these files, Parquet will be able to correctly set the value of this column for every row, independently of presence
or absence of multipleBirthInteger column/values in that file. This will allow unions and joins
to function properly. But I do agree that we need to proceed with caution. Per the discussion with Josh above, we are currently collecting a set of validation tests; all suggestions on what needs to be verified are very much welcome (preferably with details on how to run them, and what's the expected result).

view this post on Zulip Lee Surprenant (Dec 06 2019 at 16:01):

Phase 2) Per your suggestion, we will then add an advanced feature where the receiver of bulk export could specify the
required Parquet schema.
...
I guess we'll need the receiver to send a mapping of the original NDJSON schema to the required Parquet schema
(what to flatten, what to drop - and maybe which extensions should be made "first class citizens", per the sql-on-fhir work). Using FHIRPath , profiles, etc - requires investigation. Can also start with receiver requesting the original NDJSON schema of the data to be exported (to decide what schema to request / what to map).

wouldn't it be useful to have some analytic-friendly schema pre-defined in spec? i'm thinking something where spark SQL can operate over the format in a manner as similar as possible to (or preferably the same as) whats defined at https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md
then the client can just say "i want the export in the analytic parquet format please"?

view this post on Zulip Lee Surprenant (Dec 06 2019 at 16:27):

This becomes even more interesting when the full FHIR Server dataset (with all Resource types) is exported as one
NDJSON file (or a collection of NDJSON files, for staged download - but with each file having many Resource types).
Then the Spark will probably create a super-set of all schemas. We'll verify the size and
efficiency implications when writing this data to Parquet. However, a simple fallback strategy
would be to formally suggest/require Parquet bulk export implementations to create separate files for separate Resource
types. But again, TBD.

Per my understanding, FHIR Bulk export already creates only one resource type per file (even for ndjson), so I don't think Parquet would be any different

view this post on Zulip Lee Surprenant (Dec 06 2019 at 16:30):

see http://hl7.org/fhir/uv/bulkdata/STU1/export/index.html#response---complete-status

Each file SHALL contain resources of only one type, but a server MAY create more than one file for each resource type returned.

view this post on Zulip Gidon Gershinsky (Dec 08 2019 at 14:14):

see http://hl7.org/fhir/uv/bulkdata/STU1/export/index.html#response---complete-status

Each file SHALL contain resources of only one type, but a server MAY create more than one file for each resource type returned.

Hi @Lee Surprenant , thanks. I had a feeling it could be the case, but its good to know this is explicitly spelled out in the spec. In addition, we will check the impact of contained resources on efficiency of Parquet storage, as suggested by Josh.

view this post on Zulip Gidon Gershinsky (Dec 08 2019 at 15:55):

wouldn't it be useful to have some analytic-friendly schema pre-defined in spec? i'm thinking something where spark SQL can operate over the format in a manner as similar as possible to (or preferably the same as) whats defined at https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md
then the client can just say "i want the export in the analytic parquet format please"?

We explore 2 applications of Parquet - one basic, for general Bulk Export (all usecases - not limited to analytics), and the other advanced, for Bulk Export optimized for analytics. Your comment refers to the second application, but let me step back and sum up both parts being discussed.

"General export": a user can request either "ndjson" or "parquet" format within the current API . This doesn't require any change in the current spec, except for allowing a "parquet" value in the accept header and _outputFormat parameter. Plus potentially an addition of a _compressionType parameter. Why would a user (and a FHIR service provider) want a general-purpose parquet export? For the following reasons:

  • (much) less bytes are prepped/stored by FHIR server, and sent on wire to user - due to parquet encoding and compression

  • faster security - instead of TLS handshakes on each file, TLS is performed once only (to get Parquet keys) - then the encrypted files are sent on a regular connection

  • received files support columnar projection and predicate push down

The last bullet is a side benefit - while the "general export" parquet is better than ndjson for sql workloads, it is not as good as an advanced "analytics-friendly export" parquet which is optimized for these workloads. On the other hand, the "general export" parquet is lossless (data and schema identical to ndjson export; recursive schemas TBD), while the "analytics" parquet is lossy (per the sql-on-fhir.md, some elements are dropped).
The main bullets are the first two (faster transfer / less bandwidth & storage). We will run some experiments to estimate the efficiency of the parquet format for a general lossless export. If efficient enough, it can be a "low hanging fruit" for the spec addition to start validation by the community.

"Analytics-friendly export" : by default, a user requesting bulk export in parquet format, would get the lossless general-purpose files. To get the analytics-friendly files, the user would have to request this explicitly, maybe with additional parameters. Unlike the "general export" mode, this one requires substantial additions in the spec - to be chosen among many options that need to be discussed. I agree it might be possible to pre-define (for each resource) an analytics-friendly schema in the spec, according to the sql-on-fhir.md recommendations, that already show how certain elements can be dropped and how extensions can be made first-class fields. It could be a good starting point. But this is not the most efficient format for analytic frameworks like Apache Spark - that are not at their best with nested columns (still kept in sql-on-fhir.md), and run faster with a flat schema. Also, a user might be interested to drop many more elements upon export (if they are irrelevant for his analytic workloads). So, ideally, a user should be able to tell a FHIR server what parquet schema he is interested in (and how the original resources should be mapped to it). Or to choose from a number of pre-defined schemas / profiles.

view this post on Zulip Josh Mandel (Dec 09 2019 at 22:44):

except for allowing a "parquet" value in the accept header and _outputFormat parameter.

I think just the _outputFormat would change, not the Accept header, since the manifest of results would still be a JSON file with output entries.

view this post on Zulip Gidon Gershinsky (Dec 10 2019 at 06:56):

except for allowing a "parquet" value in the accept header and _outputFormat parameter.

I think just the _outputFormat would change, not the Accept header, since the manifest of results would still be a JSON file with output entries.

Certainly. The Accept header specifies the format of the "OperationOutcome resource response to the kick-off request", I've missed that. The spec's "Currently, only application/fhir+json is supported." shouldn't change then.

view this post on Zulip Gidon Gershinsky (Jan 21 2020 at 13:40):

Hi all, here go the results of Spark/Parquet/NDJSON experiments, that cover the "general purpose export" (GPE) usecase described above.

Regarding the "analytics-friendly export" (AFE) usecase - I have received information from the Apache Spark PMC team that the upcoming Spark versions will include significant improvements in support for nested columns - meaning that analytic workloads will run faster with files produced by GPE, eliminating some inefficiencies in columnar projection and predicate pushdown. The AFE approach will still be the most optimal for analytics - but, given the GPE improvements, and the fact that it doesnt require changes in FHIR Export specification other than allowing for a "parquet" value in the _outputFormat parameter (and a potential addition of a _compressionType parameter) - it makes sense to start with GPE. The AFE approach, that requires significant additions in the FHIR specification, will be considered at a later stage when we have more experience with Parquet GPE in FHIR.

Ok, now for the GPE results. Per the discussion earlier in this thread, the goals of the experiments were to 1. show that Parquet can handle complex data types (with contained and component elements, and Questionnaire types with various depth levels) 2. compare the size of Bulk Export data in NDJSON and Parquet formats. 3. compare SQL query execution time on NDJSON and Parquet files

This post contains a summary of these experiments. The detailed results can be found here.
https://docs.google.com/document/d/1zk0YJ8MVUqBoJ0OK6Gb1lHE1shYIl3xJ1hTl1hxW3pg/edit?usp=sharing

@Josh Mandel , please let me know if this is sufficient for addition of the Parquet option to the next FHIR Export specification draft, for validation by the community.

The experiments were performed on a data derived from samples in https://www.hl7.org/fhir/downloads.html

  1. For any type of resource data (Patients; Observations with and without contained, component elements; Questioinnaire with item depth ranging from 1 to 5, some with contained elements), Parquet files have correct schema, and the contents is fully identical to the original NDJSON files (both schema and data are the same).

  2. Parquet files are much smaller than NDJSON files with the same data.
    Patient files are ~6 times smaller.
    Observation files are ~40 times smaller.
    The reason for stronger reduction in 'observations' is that they are less diverse than 'patients' - typically, only a few elements change from record to record (such as id, value) while the rest (coding, performer, subject) change slowly or don't change at all. This allows the Parquet 'dictionary encoding' mechanism to store the repeating fields using only their reference ids.

  3. SQL queries run significantly faster on Parquet files than on NDJSON files.

view this post on Zulip Matt Sargent (Feb 14 2020 at 19:24):

@Gidon Gershinsky Are the query results in your report the sum of the time to load the file and the time to run the query? Or is the data loaded and cached in memory, but the queries run after take different time going over what should be equivalent data?

I'm assuming it's the former, but wanted to be sure.

view this post on Zulip Gidon Gershinsky (Feb 16 2020 at 17:46):

@Matt Sargent Correct, its the former (even though the files are likely loaded from the host memory; the OS would optimize the disk reading). Still, as mentioned in the post, in many real-life use cases the files won't be co-located with the Spark worker host - instead, they would be stored in a distributed file system or in an object store. Then the Parquet in-storage filtering capabilities (columnar projection and predicate push-down) will further increase the relative difference in query execution time. On the other hand, caching the NDJSON files in Spark memory will decrease the difference - at a price of fetching the full NDJSON datasets from the storage and keeping them in memory. Parquet files are not only smaller (for the same data), but allow for fetching a data subset.

view this post on Zulip Josh Mandel (Mar 04 2020 at 16:20):

Thanks @Gidon Gershinsky for the analysis above! I haven't had much time to dig into this technology, but I suspect that as more implementations of the bulk data API come online and people are trying to actually use bulk export files for analysis as well as managing storage, there will be a natural gravitation to the kinds of tools you're exploring. @Edward Yurcisin was describing to me just last week some of the challenges of working with larger export files.

As a practical matter, I'd note that demonstrations and easy-to-use tooling can speak volumes. For example, if you thought about building an open source client that could pull JSON LD data from a server and then automatically convert them into parquet, demonstrating how to run queries against them efficiently -- this plus comparison benchmarks might significantly help bring people along.

view this post on Zulip Gidon Gershinsky (Mar 04 2020 at 17:23):

Hi Josh, sounds good! We do work on such technologies, and plan to open source them. We'll get back with that, including the benchmark results.


Last updated: Apr 12 2022 at 19:14 UTC