Concurrently Download Large Files from GCS.
There is nothing more annoying than sitting around waiting for files to download. That was true while I was in high school staring at LimeWire, it’s still true today. Especially when you’re a data engineer who’s supposed to make data pipelines fast. You’re in luck! Yes, it is possible to download a large file from Google Cloud Storage (GCS) concurrently in Python. It took a little digging in Google’s terrible documentation for their Python cloud storage wrapper (hear my snarky-ness), but I found a diamond in the rough.
BytesIO, ProcessPoolExecutor, and get_blob().
So anyone working in or around the Google Cloud is probably at some point downloading files, possibly large ones. If this is happening inside some sort of data pipeline, this is going to be an obvious IO blocker. Of course some sort of concurrency to speed up this process is going to be the only thing we can do short of living in the data center. At first this seems like a problem that would be too complicated to solve, not so. All it takes is BytesIO (which we’ve talked about before), a ProcessPoolExecutor, and the correct method in the Python GCS package, which happens to be get_blob().
I’m going to throw down some code first, then we can talk about it. This code assumes breaking a file up into 4 chunks that can be sent to a different Processes is going to be fastest, feel free to experiment. ( I don’t show auth here, that’s up to you.)
from google.cloud import storage
from concurrent.futures import ProcessPoolExecutor
def split_byte_size(size: int, uri: str, bucket: str, key: str) -> list:
byte_list = []
split = int(size/4)
byte_list.append({"uri": uri, "start": 0, "end": split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": split+1, "end": (split+1)+split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": (split+1)*2, "end": ((split+1)*2)+split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": (split+1)*3, "end": size, "bucket": bucket, "key": key})
return byte_list
def downloader(input: dict) -> object:
storage_client = storage.Client()
bucket_object = storage_client.get_bucket(bucket_or_name=input["bucket"])
blob = bucket_object.blob(input["key"])
in_memory_file = io.BytesIO()
blob.download_to_file(in_memory_file, start=input['start'], end=input['end'])
return in_memory_file
storage_client = storage.Client()
bucket_object = storage_client.get_bucket(bucket)
blob = bucket_object.get_blob(key)
split_bytes = split_byte_size(blob.size, uri, bucket, key)
with ProcessPoolExecutor(4) as ex:
results = ex.map(downloader, split_bytes)
in_memory_file = io.BytesIO()
for result in results:
result.seek(0)
in_memory_file.write(result.getbuffer())
in_memory_file.seek(0)
Get byte size, split bytes, download byte chunks, re-assemble chunks.
The logic flow here is as follows. We would like to download a file in chunks, then re-assemble those chunks into a file. What do we need to do first?
- Get the size in Bytes of the object we want to download.
- Split those Bytes up into chunks we would like to download. (must be precise.)
- Download those chunks in separate processes.
- Re-assemble those byte chunks into a file.
Get Byte Size.
Simple enough. Setup the storage client, setup the bucket, use the bucket to call get_blob() on the file key you want. You must call get_blob(), it’s the only one that will populate the property called size on the blob object.
storage_client = storage.Client()
bucket_object = storage_client.get_bucket(bucket)
blob = bucket_object.get_blob(key)
size = blob.size
Get Byte Chunks Ready
This has to be exactly right, if we are going to have a complete file that isn’t corrupt there is no room for error. You can see here we are just passing in the size, and other info about the object so we have a dictionary that’s ready to ship off to a Process Pool and give it everything it needs for a download. What I’m doing below with the splits is making sure we have no overlap between the byte start and end for each chunk.
def split_byte_size(size: int, uri: str, bucket: str, key: str) -> list:
byte_list = []
split = int(size/4)
byte_list.append({"uri": uri, "start": 0, "end": split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": split+1, "end": (split+1)+split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": (split+1)*2, "end": ((split+1)*2)+split, "bucket": bucket, "key": key})
byte_list.append({"uri": uri, "start": (split+1)*3, "end": size, "bucket": bucket, "key": key})
return byte_list
split_bytes = split_byte_size(blob.size, uri, bucket, key)
Download File Chunks
Now that we have our chunks ready to be shipped to separate processes, let er’ rip! The magic goes on here, in blob.download_to_file() method. It takes a start and end, our byte locations of course. We read this chunk back into a BytesIO(), in memory.
def downloader(input: dict) -> object:
storage_client = storage.Client()
bucket_object = storage_client.get_bucket(bucket_or_name=input["bucket"])
blob = bucket_object.blob(input["key"])
in_memory_file = io.BytesIO()
blob.download_to_file(in_memory_file, start=input['start'], end=input['end'])
return in_memory_file
with ProcessPoolExecutor(4) as ex:
results = ex.map(downloader, split_bytes)
Re-Assemble The File.
Last but not least we need to re-assemble the file. Not much magic going on here. We take the iterator of our results, spin through them. Each result is a BytesIO object, which we can write to another final BytesIO object, using the .getbuffer() method of a BytesIO object.
for result in results:
result.seek(0)
in_memory_file.write(result.getbuffer())
in_memory_file.seek(0)
That’s it. Now you have a in memory file of what you wanted, faster than sitting around waiting for it all at one time.
Have fun!