Decompress ZST files on Databricks with this one weird trick!

Who needs distributed processing, anyway?

Recently, I found myself in a situation where I had a pile of Zstandard compressed JSONL files, and needed an expedient way to decompress them. Whilst generally an antipattern, running vanilla Python code in my Databricks notebook gave me a quick solution that worked, and allowed me to get on with the more interesting Data Engineering.

I don't think we're in DBFS anymore, Toto!

Using vanilla Python to carry out the decompression means that files exist on the disks attatched to the Driver VM. Therefore, there are a few things to watch out for:

  • Data will be lost when the cluster restarts
  • It's easy to run out of disk space

In my approach, I disregarded the first issue - the acquisition process is started manually, and I can retrigger it if needed. The second issue is more serious - when a 2GB file can decompress to a 16GB one, it is easy to run out of disk space if not careful.

The code below therefore uses dbutils.fs.mv() to immediately move the decompressed data into DBFS. This deletes the local copy of the decompressed data, helping reduce the risk of running out of space.  You may still have issues with larger files, as they individually may exceed the available disk space when decompressed. By default, Azure provisions the Databricks VM with 256GB of Disk.

The Code

import os
import zstandard
import json

#Create the directory if it doesn't already exist. 
os.makedirs("/tmp/decompress", exist_ok=True)

#Empty the directory of anything already in there
dbutils.fs.rm("file:/tmp/decompress/*",recurse=True)

def decompress(fileName):  
    print(f"Decompressing {fileName}")
    ##Get a decompressor
    dctx = zstandard.ZstdDecompressor(max_window_size=2147483648)
    #Get the name of the file itself
    fn = fileName.split('/')[-1].split('.')[0]
    #Create an output path for where the file is going
    outputPath = f"/tmp/decompress/{fn}.jsonl"
        
    #Get an input stream and an output stream
    with open(fileName, 'rb') as ifh, open(outputPath, 'wb') as ofh:
        #Copy between the two
        dctx.copy_stream(ifh, ofh)        

    #Move the file into DBFS. 
    dbutils.fs.mv(f"file://{outputPath}","dbfs:/mnt/blob/reddit")

#Decompress everything in our downloads folder
for file in os.listdir("/tmp/downloads"):
    decompress(f"/tmp/downloads/{file}")