Data Migrations and Dealing with Over-Engineering

Recently at work, I inherited the task to migrate data in Azure to S3. The overall task was pretty simple; we have data stored bzipped across many blobs in Azure, organized by a complex blob and container naming system, spread out across many accounts, and it needs to move to S3 and be collated into a single file while there. Overall it was a simple one time ETL (Extract, Transform, Load) from Azure to S3.

The part where it gets complex is we dont want to move all of the data, only some of it, and the order of magnitude of the data we needed to move; ~1 million blobs and ~10GB of data per day for the past 4 years.

The previous owner of the task left before he could get beyond basic design, so I took a look at the amount of data and decided that a simple serial python script wouldn’t cut it. My first reaction was to build something using thespianpy, which i have previously written about here.

The design was pretty simple, have a top level MigrationActor who took a list of data that needed to get migrated. Have it create Extract, Transform, and Load actors for each data set that needed to get moved over. The ExtractActor would lookup and download all of the blobs using the Azure SDK, pass all of the data to the TransformActor which would bundle it up and zip it up, which would pass it to the LoadActor which would upload it to S3 and report back to the MigrationActor that the unit of work was complete. The MigrationActor would keep track of the number of in flight ETL’s and could be configured to match the bandwidth and CPU limitations of the box it was put on.

After a quick coding up, I fired it up and passed it a file of 1000 units of work to be migrated. It finished in ~7 minutes averaging about 1.1 mbps the whole way. I had no idea if that was good or not, but back of the napkin math looked like it would take ~25 days to migrate. I knew that we probably get a little more performance out of a beefy network-optimized EC2 instance, and if we needed to, we could setup a cluster of them and have the thespian system divide the work across many machines.

Then I wanted to take a look at another library good for parallel processing in python; Celery. I could setup a queue of units of work to be migrated and have a bunch of worker nodes pull from it.

At this point I took a step back and started to challenge my assumptions. I have often been critical of other of adding a lot of unnecessary complexity to solve problems; in other words: over-engineering. It happens a lot more often than I originally thought, engineers like to solve problems and show off so they come up with these intricate solutions. These solutions are harder to maintain and often cause problems when trying to debug or extend. I had made the assumption that python wouldn’t be fast enough to do this work, so when in doubt, measure. After all, premature optimization is the root of all evil.

So I decided to write up a serial python script which consisted basically of a loop; pull line out of input file, find all files for that unit in Azure, download all of those files, combine and gzip them, upload to s3. Rinse Repeat. The result was a much smaller, single file script that was super easy to read. So I ran it against the same test file, and it took 20 minutes, averaging 0.4 mbps. So my assumptions that I was getting a lift via thespian were correct; I was getting just under 3x performance using the actor model.

But while I was in the “measure; don’t guess” mood, I decided to bake off a few different technologies in python and see how they performed against each other vs the amount of complexity they added.

So my next foray was into threading. It had been a long while since I have played with threading in python, and while I have been having fun working with it in Rust, I didn’t have any good memories of threading in python. The primary reason people do not associate multithreading with python is the GIL. The GIL (Global Interpreter Lock) is a lock that keeps python’s interperter thread safe by only allowing 1 thread to execute commands at a time. So while you can break work up into many threads, only one will really be executing at a time.

The script for threading basically took the serial python script, took the inside loop that happens for each unit of work and made a function out of it. It then makes a worker function which takes a queue, a function, and a callback, and inside of a while True loop, pulls items off of the queue, evaluates the function with that items arguments, and passes the result to the callback. It then builds a queue.Queue (a thread safe queue in python), spins up a configured number of worker threading.Thread nodes passing each of it the queue instance and the work function, and finally dumps the entire input file into the queue. It is a very similar design to what I would have done in celery, just without the RabbitMQ and the extensive setup it takes. This solution is also entirely standard lib for the performance aspect (we still used azure and boto3 for the migraiton).

I fired up the same file expecting it to not do much better than the serial python because of the GIL, and was shocked by the results. It finished the file in 54 seconds, averaging ~8mbps. It was 20x faster than the serial version and 7x faster than the thespian version. So I decided to test it again on a larger input file passing it 5000 units of work, and it got faster still at 12mbps.

This overturned assumption sent me to the docs reading. My incorrect assumption was that I was going to be blocked by the GIL. It turns out that is only true for CPU bound work, not IO bound work. With the GIL, only 1 download or upload could be started at a time, but once an IO operation was started, it would release the lock. Since the transformations in the ETL were so lightweight, and the vast majority of the time would be spent uploading and downloading, threading was an optimal solution since two threads would rarely request the GIL at the same time since most of them would be waiting on IO.

After digging in more, it turns out that multiprocessing would solve the problem in a similar speed since it gets around the GIL entirely, but it would add complexity since the processes no longer share memory space. It wouldnt be much faster, if at all, because it would still be IO bound, not CPU bound. It solves the same problems that thespian solves, except thespian makes the communication between processes easier by abstracting them to actors and messages. My research also showed me that Celery solves the CPU bound problem as well, so it looks like threading was the best solution to this problem.

The guts of the threading code ended up getting refactored into this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from queue import Queue
from threading import Thread
from typing import Iterable, Callable, Optional

def parallel_process(work: Iterable, task_func: Callable, num_threads: int, callback: Optional[Callable] = None) -> None:
"""
Processing work in parallel using a queue-worker pattern.

Breaks up `work` into `num_threads` workers, which call `task_func` for each item in `work`.
If defined, calls the callback with the result.
"""


def pop_task(queue: Queue, cb: Optional[Callable] = None) -> None:
"""
Thread Worker Wrapper.

Pulls from queue, calls task_func, and optionally calls back with the rsult.
"""

while True:
task = queue.get()
if task is None:
break
try:
result = task_func(*task)
if cb:
cb(result)
except Exception as ex:
print(ex)

q = Queue(maxsize=0)

# start worker threads
for _ in range(num_threads):
try:
worker = Thread(target=pop_task, args=(q, callblack))
worker.setDaemon(True)
worker.start()
except Exception as ex:
# catch no more available threads
print(ex)

# Put work into the Queue
for task in work:
q.put(task)

# Block until work is complete
q.join()

This allowed me to then go back to the serial version of python, map the input file with the extra arguments required to process the input, and pass the entire thing to parallel_process specifying a number of nodes. I wrote a simple callback function which tracked progressing using tqdm and track the amount of data moved for reporting purposes.

Lessons Learned.

No matter how senior you get, always challenge your assumptions when you identify yourself over-engineering a problem. Test your assumptions. Don’t guess, measure.

I learned a lot about threads in python after overturning one of my assumptions that python was not good at threading. It turns out it is perfectly decent at threading, as long as you use it in the appropriate situations.