320GB `YYYY/MM/DD/HH/*.json.gz` -> `YYYY/MM/tenant_id=x/data.parquet`?

I have 1mil gzipped files which contain in total 350mil \n separated json objects. 26GB compressed, ~320GB uncompressed, representing 7 years of data for a multi-tenant application.

I want to create one parquet file per tenant per month. tenant_id is a property of each object.

All objects have the same structure. There are ~30 properties. Property values can be missing, booleans may be quoted true or unquoted true, etc. Many of my attempts failed until now, with all the tools I tried throwing an error similar to column x has differing types across rows, or number of columns varies across rows. I have no experience doing this, and every time I saw that error I moved on to the next tool suggested by the internet, until I found dask, which helped me moved past that.

These events were dumped by AWS firehose in the folder structure /data/raw_events/YYYY/MM/DD/HH/*.gz

I want to create one parquet file per month per tenant_id, like so /data/parquet/YYYY/MM/tenant_id=x/data.parquet

At the moment my approach is to run dask over each YYYY/MM:

bag = dask.bag.read_text(/data/raw_events/YYYY/MM/**/*, files_per_partition=files_per_partition)
bag = bag.map(process_event.normalise_event)

df = bag.to_dataframe().set_index('tenant_id', drop=True)
df.to_parquet(output_path, partition_on=['tenant_id'], write_metadata_file=False)

I bag.map with normalise_event, which just ensures that all columns are present and have a sane type:

def normalise_event(raw):
  event = json.loads(raw)
  normalised = {}
  
  normalised['tenant_id'] = event.get('tenant_id', 'unknown')
  return normalised

And provided I run this on a 32 core, 250gb ram EC2 instance, it works.

I set_index('tenant_id') because I want to create a single parquet file per tenant_id per month, but this means that it needs to load the entire dataset into memory.

I feel like I'm missing the insight which will allow me to point dask at a month of data, and have it generate one parquet file per tenant_id, without having to load the entire month of data for all tenant_ids into memory.

The other way of doing this I think might work, but I'm avoiding because it would be more moving parts is:

  1. YYYY/MM/DD/**/*.gz - YYYY/MM/DD/data.parquet
  2. YYYY/MM/DD/data.parquet - YYYY/MM/DD/tenant_id=x/data.parquet
  3. YYYY/MM/DD/tenant_id=x/data.parquet - YYYY/MM/tenant_id=x/data.parquet

In the first step dask would decompress and parse the json and store it as parquet per day.

In the second step it would split the dailies per tenant_id.

In the third step it would merge the dailies into monthlies per tenant_id.

What would be the best approach here? I am wishing for a solution which could be spun up, given a month of data to process, and then tore down.

Thank you!

Topic transformation json python

Category Data Science

About

Geeks Mental is a community that publishes articles and tutorials about Web, Android, Data Science, new techniques and Linux security.