320GB `YYYY/MM/DD/HH/*.json.gz` -> `YYYY/MM/tenant_id=x/data.parquet`?
I have 1mil gzipped files which contain in total 350mil \n
separated json objects. 26GB compressed, ~320GB uncompressed, representing 7 years of data for a multi-tenant application.
I want to create one parquet file per tenant per month. tenant_id
is a property of each object.
All objects have the same structure. There are ~30 properties. Property values can be missing, booleans may be quoted true
or unquoted true
, etc. Many of my attempts failed until now, with all the tools I tried throwing an error similar to column x has differing types across rows
, or number of columns varies across rows
. I have no experience doing this, and every time I saw that error I moved on to the next tool suggested by the internet, until I found dask
, which helped me moved past that.
These events were dumped by AWS firehose in the folder structure /data/raw_events/YYYY/MM/DD/HH/*.gz
I want to create one parquet file per month per tenant_id
, like so /data/parquet/YYYY/MM/tenant_id=x/data.parquet
At the moment my approach is to run dask
over each YYYY/MM
:
bag = dask.bag.read_text(/data/raw_events/YYYY/MM/**/*, files_per_partition=files_per_partition)
bag = bag.map(process_event.normalise_event)
df = bag.to_dataframe().set_index('tenant_id', drop=True)
df.to_parquet(output_path, partition_on=['tenant_id'], write_metadata_file=False)
I bag.map
with normalise_event
, which just ensures that all columns are present and have a sane type:
def normalise_event(raw):
event = json.loads(raw)
normalised = {}
normalised['tenant_id'] = event.get('tenant_id', 'unknown')
return normalised
And provided I run this on a 32 core, 250gb ram EC2 instance, it works.
I set_index('tenant_id')
because I want to create a single parquet file per tenant_id
per month, but this means that it needs to load the entire dataset into memory.
I feel like I'm missing the insight which will allow me to point dask at a month of data, and have it generate one parquet file per tenant_id
, without having to load the entire month of data for all tenant_id
s into memory.
The other way of doing this I think might work, but I'm avoiding because it would be more moving parts is:
YYYY/MM/DD/**/*.gz
-YYYY/MM/DD/data.parquet
YYYY/MM/DD/data.parquet
-YYYY/MM/DD/tenant_id=x/data.parquet
YYYY/MM/DD/tenant_id=x/data.parquet
-YYYY/MM/tenant_id=x/data.parquet
In the first step dask would decompress and parse the json and store it as parquet per day.
In the second step it would split the dailies per tenant_id
.
In the third step it would merge the dailies into monthlies per tenant_id
.
What would be the best approach here? I am wishing for a solution which could be spun up, given a month of data to process, and then tore down.
Thank you!
Topic transformation json python
Category Data Science