A Simple and Predictable Big Data Stack
Where things stand today
It has been noted that the complexity of big data frameworks like Hadoop and Spark make them less productive tools than their small data counterparts: Python, R and Excel. The degree is contested, but our experience is what might take an hour to implement upon megabytes of data in Python can easily take man months or even man years to implement upon terabytes of data with Spark.
One case study we conducted saw a client – one of the world’s largest technology companies – spend two years of a three-person team trying to scale up a simple statistical analysis involving a matrix multiplication. Their “small data” equivalent was less than 100 lines and would take under an hour to write.
What’s really interesting about this work for us is enabling the exploration of what we see as under-explored territory: the intersection of large-scale data science and large-scale simulation. As Doyne Farmer says, to catapult simulation science to a level of usefulness such that it is as readily applicable as machine learning, it needs to be integrated to work alongside those other disciplines.
The cost of complexity and unpredictability in the current Big Data stack
My background spans composing machine code from ISA specs to large distributed systems to machine learning. No part of the Spark stack should be particularly alien to me. And indeed, I’ve seen nothing that in isolation isn’t comprehensible. Yet, Spark as a whole – and particularly when something goes wrong – I find impenetrable. Spark itself totals 100k lines, its standard resource manager YARN and distributed filesystem HDFS each approach 1M lines. When something goes wrong, it could be anywhere in those 2M+ lines – not to mention the interaction with the 10M line JVM and its memory management. At this scale of complexity, I find it to be almost impossible to develop any kind of rigorous understanding of Spark as a whole. Instead, one is limited to reasoning about it as an emergence of many, many flaky components.
How does this manifest? In our experience this is the key difference between the predictable productivity of “small data” Python, and the unpredictable, unproductive “big data” Spark. As an example, say you `join` two pandas `DataFrame`s you know that:
- either you’ll get the desired result;
- or the result didn’t fully fit in memory.
“Fitting in memory” is a fairly sensical rule you can readily internalise and use to guide what you attempt. With Spark, by contrast, upon `join`ing two `Dataset`s you will find that:
- either you’ll get the desired result;
- or some component will timeout;
- or there’ll be some serialisation error;
- or there’ll be some memory issue during an intermediate step;
- or your cluster will seemingly hang at some arbitrary point;
And here’s the problem! What could cause you not to get the desired result is unpredictable, usually unrelated to your algorithm, and not necessarily readily resolvable. “Failure” in Spark is a fractal of complexity, and resolving these failures is a dark art.
Some examples of specific failures are described by Facebook here and by my colleague Francis here. Note how none of them are related to the algorithm – i.e. the business logic – and all are related to design decisions or bugs in Spark.
Can we do better?
Firstly, what does “better” look like? The response we typically hear is that Spark’s still early, bugs are being fixed, it’s constantly getting better. After all, with a team of researchers one can eventually sort 1PB data. There’s no denying that is cool.
There’s also no denying that bugs are constantly being found and fixed. One can believe that Spark is indeed “constantly getting better”. To our eye however, it is still in a different league – not uncommonly 3-5 orders of magnitude, increasing with the data scale, away from the productivity of “small data” tools. Furthermore, we haven’t seen it narrowing that gap by any substantial amount over the last four years; nor do we see it narrowing that gap by any substantial amount in the future.
So what do we want from a “better” solution, such that it offers comparable productivity to our “small data” tools? When we `join` two `DataFrame`s, we need to know that:
- either we’ll get the desired result;
- or the user’s cloud instance budget was elapsed.
And nothing else. We need to guarantee the desired result, barring some truly unrecoverable force majeure event like budget elapsing, or the majority of hardware instances failing.
What does that look like?
And is it even possible? The relevant data points for me here are that every issue I or my colleagues have ever bumped into with Spark are resolvable. I’ll tackle the biggest “groups” of them:
Opaque memory errors are omnipresent when trying to scale on Spark. Some, for example over-provisioning a machine, or heap exhaustion by user code, are understandable and resolvable by adjusting configuration or reworking user code to require less memory respectively. However, many are much more opaque, and require workarounds or changes to Spark itself.
These errors occur because of hard-to-predict, and generally unbounded memory allocation. And crucially, they are unrelated to the algorithm the user is trying to run. The proximate cause of an OOM error when a user tries to perform a “group by” is unrelated to that operation, unrelated to the user’s articulation of their algorithm, and ultimately unrelated to the user’s intent. They wanted to group some data, now they have to fiddle with some arcane, non-obvious, unpredictable idiosyncrasies of Spark.
Can this be avoided? Well… yes. One needs to dynamically allocate more resources to handle growth in the amount of total data. But besides that? One doesn’t actually require unbounded dynamic memory allocation at all. All of the OOM (out of memory) errors are something akin to `malloc` failing due to lack of memory available at that time. We can avoid this whole class of failures by never calling `malloc`3 dynamically.
In Spark, the user is responsible for deciding the size and number of partitions – while there are mechanisms to automate this, in our experience when operating at scale it becomes necessary to be explicit about it..
If at any point the total elements for a partition exhaust the available memory, then an OOM occurs. It can be extremely difficult to predict with any confidence what a workable number of partitions could be – and this gets harder the larger and less predictable/structured your data is. For example, you might be processing log data where a typical key has tens of values, but outlier keys have hundreds of thousands – we saw this in web traffic data where a few bots were dwarfing regular users. This will typically cause an OOM as you operate on the data – and require massaging and filtering to work around.
A solution is to partition at the byte level rather than the element level. There is naturally an upper bound on the number of bytes that can be stored on a partition – why not use exactly that rather than guessing partition sizes? One issue is that `T`s are traditionally stored in memory – and they may include handles to the heap, e.g. a String. Given this, and issues like fragmentation and lack of control over the heap, we can’t provide a reliable lower bound of bytes worth of `T`s that can be stored, so is there any alternative? How about if we think of it as bytes when it’s at rest, and as a `T` only when we’re passing it to user functions?
Spark stores its `T`s in memory, serialising to bytes when it needs to send a `T` over the network and deserialising to a `T` again at the other end. We want to be able to reason about a partition of `T`s as a collection of bytes. If we instead store the serialised bytes of `T` at rest, and only serialise to a `T` when we pass to a user function, this means that we can have our partition boundaries wherever we want – they don’t have to match the `T` boundaries. Thus a `T` could straddle the end of one partition and the beginning of the next; a particularly large `T` could even span multiple partitions.
4. Miscellaneous Bugs
I’m referring here to mistakes in the articulation of programs, rather than manifestations of design decisions. They are of course resolvable, however the challenge is to minimise the number that users might ever bump into. Our experience leads us to take a dual approach here:
- Keep code as simple as possible – the less edge case-y the code, the shallower its bugs are
- Aggressive testing – leveraging intuition and computing resources to hunt for bugs.
Spark has evolved over the last 8 years in a way very divergent from those however: most commits are introducing new edge cases… to rectify behaviour from other edge cases. The idea of aggressively fuzzing Spark at scale with random data and queries is unfortunately a non-starter as getting even a single success is extremely unlikely.
Key Design Decisions
We decided to use Rust fairly early on, as it’s (in our opinion) the most powerful language that still retains complete control over memory management. Particularly beneficial is that:
- As a compiled language, it runs at native speed.
- It’s memory safe, so users used to Java never see “segmentation faults” well known to C/C++ programmers.
- It avoids garbage collection (a frequent cause of issues in Java) yet makes it difficult to incur memory leaks (a common problem in C/C++).
Backing data structure
On each partition we want to store an ordered collection of bytes – of capacity around 5GB – to and from which we can add and remove bytes at arbitrary indices in `Θ(1)`. A doubly linked list would give us this property, but also a terrible amount of space and dereferencing overhead. A doubly linked list of circular buffers offers a nice compromise – a bounded small cost on inserts and deletes, and many elements stored per pointer pair.
Almost all traditional approaches to I/O sacrifice at least one of perfect parallelism or bounded allocation. One approach that we identified as promising – and crucially didn’t necessitate the sacrifice of either – are pipes. Pipes provide a way to compose streaming operations in constant space, enabling us to keep our application single-threaded, perfectly parallel, and allocation-bounded.
A key question is what kinds of failures do we need to be able to continue in the face of, and what can we accept as force majeure?
- Linux hang/crash/unrecoverable error raised by syscall: we architect to minimise this risk for the time being, with a mind to eventually mitigate by moving functionality like networking to userspace (i.e. kernel bypass).
- Network partition: we’re okay with progress potentially being paused due to network partitions. In our experience, network partitions on the clouds over 30 seconds are exceptionally rare – we’ve only ever observed them as symptoms of serious datacenter-wide problems.
- Hardware instance failure: we’ll accept this as a force majeure event for the time being, though with a mind to implementing configurable redundancy in future to survive such failures.
- Running out of hardware instances: elapsing some specified limit, e.g. the user’s budget on their cloud provider, is force majeure – there is nothing we can do but fail.
Heuristics, magic values and configuration unrelated to the user’s algorithm
As a constant source of issues in Spark, these should be avoided except where absolutely unavoidable. We tackle the most common sources of Spark issues as follows:
- Timeouts: Spark contains of the order of 1000 references to “timeout” – heartbeats, RPC timeouts, communication with orchestrators, et cetera, and they’re a regular source (or manifestation) of problems – particularly when they interact with long GC pauses. There are also multiple timeouts in Linux’s implementation of TCP. Going right back to computing first principles and given our design decision regarding network partitions above, there are only two timeouts that seem to be necessary: To detect lost packets; and to detect failed hardware7
- Partition count: This is generally user specified in Spark, and getting it wrong can result in degraded performance or failure – typically OOM. It is useful to be able to adjust the parallelism of the solution however – for example doubling the workers should give each half the work, approximately halving latency. This should be configurable, but tested to extremes such that it’s guaranteed not to cause a crash.
- Buffer sizes: These should exist only where necessary – namely channel buffer size (informed by the bandwidth-delay product), number of channel slots (informed by maximum potential number of partitions), and size of heap for user functions. We so far haven’t seen cause for any other.
In this blog post, we looked at the causes of errors in different levels of the big data stack. Despite being debuggable in isolation, these errors can compound each other to the point that any system built on this framework becomes almost incomprehensible from a performance or debuggability perspective.
Using the Hadean platform, we have built the underpinnings of the most important container in big-data – the list. Through precise engineering and careful design choices, we have demonstrated how this data-structure, so easily implemented at small-scale yet deceivingly complex at large scale, can be made reliable, massively increasing productivity.
Download The Hadean Architecture White Paper today to find out how Hadean could help your organisation unleash the power of cross-cloud compute.