Scaling Monte Carlo Simulations On Apache Spark. Can We Do Better?
Consider a map operation that transforms a list of doubles to a list of tuples each containing three double-precision values. We’ve just increased the amount of data we have by a factor of 3x. If the original list was partitioned in such a way that ½ the memory on a worker machine was being used to hold it, it’s necessary to repartition the list in such a way that that ¾ of the elements held on that machine must be held elsewhere.
If we were producing strings rather than fixed sized objects, the problem becomes even harder – we cannot predict the size of partitions in advance. Spark is a big-data framework generally considered to be an industry standard – Amazon provides the ability to run Spark under their Elastic MapReduce (EMR) framework. The key structure provided by Spark is the Resilient Distributed Dataset (RDD). The RDD is effectively a parallel list on which is possible to perform common functional operations such as maps and folds. Spark is mainly written in Scala, but can be interfaced to easily from Java and Python.
Monte Carlo Financial Simulations
At Hadean, we’re working to lower the barrier to distributed computing by making it simpler and more reliable. One use-case we decided to investigate was Spark used for Monte Carlo financial simulations, specifically to calculate a “Value at Risk” (VaR) value. Big-data company Cloudera has provided example Spark code to do this in a previous blog post . The logs and scripts used to run the Spark examples in this post have been placed on GitHub. The financial details of the simulation are beyond the scope of this post, but the steps for a simulation of size N can be summarized as follows:
- Construct a list of p random seeds (where p is the number of worker nodes).
- Parallelize the list so that one random seed is present on each worker.
- A flatmap operation is applied to the random seeds. Each random seed is used to seed a multivariate Normal random number generator. Given a random vector sampled from the distribution, inner products are evaluated against different financial “instruments”. These are summed and the final value is considered the result of the trial . N/p trials are run on each worker. Since this operation is a flatmap, rather than a map, the result is a list of ~N double precision values corresponding to the result of each trial.
- The 5% VaR value corresponds to the trial value at index N/20 in the list of trial results, were the list sorted. In the Cloudera code, this is implemented by using the takeOrdered() function to retrieve the smallest N/20 elements from the list, then taking the final value.
The Monte Carlo example is ideal for a small-scale test. It’s representative of a real-world scenario, and has a size that can be easily increased that directly corresponds both to the amount of compute and to the amount of data generated. Since each Monte-Carlo trial generates one double-precision value, it’s possible to talk about the amount of work in terms of the size of trial results. One million Monte Carlo trials correspond to 7.6MiB of data.
Monte Carlo on Spark
Let’s try this on Spark. We’ll first try with a single worker node, then increase the number. For our master node we choose Amazon’s r4.xlarge (4 vCPUs, 30.5 GiB memory) and a single worker node of type c4.4xlarge (16 vCPUs, 30 GiB memory). Amazon EMR is configured to maximise resource allocation, so we try scaling up problem size without trying to tune Spark’s memory/thread count settings for worker nodes.
We observe that very little of the execution time is spent doing the trials themselves. Instead, the majority is spent taking the first 5% of ordered trial results. We also find that we cannot scale beyond 4.8e8 trials, or a data-set size of 3.7GiB. Beyond this, the following errors occurred.
17/11/17 16:17:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-172-31-19-196.us-east-2.compute.internal:34217 (size: 2.1 KB, free: 6.5 GB)
17/11/17 16:28:59 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 325326 ms exceeds timeout 300000 ms 17/11/17 16:28:59 ERROR YarnScheduler: Lost executor 1 on ip-172-31-19-196.us-east-2.compute.internal: Executor heartbeat timed out after 325326 ms
17/11/17 16:28:59 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 32, ip-172-31-19-196.us-east-2.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 325326 ms
17/11/17 16:28:59 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 41, ip-172-31-19-196.us-east-2.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 325326 ms
Increasing the network timeout had no effect, nor was there any feedback on why the worker node had stopped responding. After this we killed the job, since Spark will continue to retry it. 3.7GiB seems surprisingly small given that a worker has 30 GiB of RAM. The Spark logs showed that the main compute phase completed fine, and the issue occurred during the takeOrdered() operation.
We also re-ran the same experiment, but this time chose to control more of the worker settings. Each worker machine runs one or more Spark processes (“executors”), each of which has multiple threads (“cores” in Spark terminology). Based on guides online and our own experimentation, we chose to re-run with 2 executors on the worker with 4 cores and 9 GiB memory per executor (more than 9 GiB caused a reduced number of executors). Since the worker machine is hyperthreaded and therefore only has 8 physical cores, we don’t expect using only 8 threads to significantly reduce throughput.
Manually taking control of memory and thread counts gives us improved performance, but we still cannot scale beyond 4.8e8 trials on a single worker node.
Having established that we can successfully run 4.8e8 trials per worker machine, we decide to scale up to five worker machines. Running more trials per machine would be nice, but it’s even more important that we can handle problems that span multiple machines. Such scaling should be trivial as long as we ensure that the amount of work performed and data generated on each worker remains constant. We also change the master machine to an r4.xlarge which has 60GiB of RAM since the increase in problem size means that more data will be sent to the master node.
Given that we were able to run 4.8e8 trials with a single worker, we expect to be able to run 2.4e9 trials using 5 workers. Instead, we only managed to scale to 8e8 workers, or around a third of what we expected. This means we only achieved a 1.5x scale-up by using 5 machines instead of the desired 5x. Attempting to scale beyond this, things failed, but it was hard to determine why. The first sign of a problem was a warning:
17/11/22 12:49:06 WARN DFSClient: DFSOutputStream ResponseProcessor exception for block BP-210548905-172.31.14.197-1511354019022:blk_1073741843_1019 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java :733)
17/11/22 12:49:06 WARN DFSClient: Error Recovery for block BP-210548905-172.31.14.197-1511354019022:blk_1073741843_1019 in pipeline DatanodeInfoWithStorage[172 .31.8.218:50010,DS-c6be1038-ef92-4571-aa2f-91139050c357,DISK], DatanodeInfoWithStorage[172.31.6.159:50010,DS-0eb98214-d8b3-4b83-a7f3-7336c54a398e,DISK]: bad datanode DatanodeInfoWithStorage[172.31.8.218:50010,DS-c6be1038-ef92-4571-aa2f-91139050c357,DISK]
Despite this warning, the job apparently continued to execute, until a JVM thread dump. This appeared to be from the master node, though it’s unclear why this might have occurred. Spark is in use in production, and therefore can and has been made to work effectively. The author notes that he is not a Spark expert, but has previously developed and run software in an HPC environment.
Spark’s promise is that it makes big-data simpler, in particular by avoiding the complexity of having to deal with HPC-like environments and the Scala code for the Monte Carlo example is certainly quite succinct. Yet at first glance, this simplicity has been counterbalanced by a corresponding increase in complexity when attempting to run at scale, mostly in the form of obscure messages from a highly complex system.
Monte Carlo on Hadean
At Hadean, we’ve been building a platform designed to remedy these issues. The full technical details are beyond the scope of this blog post, but in summary:
- We run native code. Our toolchain is currently based around Rust, but we also support C and expect to support other languages that compile with LLVM. We eliminate issues related to garbage collection simply by not having it in the first place.
- We bound memory. When we execute a process, we fix the amount of memory it can allocate, including not just the heap but even the executable image itself. We guarantee such memory has been allocated so features such as the opportunistic memory allocation strategy used in Linux cannot cause strange out of memory errors. Even the dynamic allocation of things like low-level network buffers comes from the memory allocated to a user process ensuring that our process execution mechanism never needs to perform the (possibly failing) dynamic allocation itself.
- We flatten the stack. Rather than try to support containers, Hadean processes run as close to the bare metal as possible. We use lightweight sandboxing mechanisms similar to those in Google Native Client rather than more complex virtualisation mechanisms like Xen. By eliminating the number of layers in our stack that failures can occur at, we make errors from user applications much less opaque.
We built a small Spark-like framework called “Mesh” on top of the Hadean platform equipped with enough functionality to perform the Monte Carlo simulation. This functionality primarily consisted of spawning a number of worker processes, parallelizing a list, being able to apply map operations and the takeOrdered() operator from Spark. The client code for this is available on GitHub.
On the Oracle cloud the machines we have access to are five 36-core Intel Xeon E5-2699 CPUs, but we restrict the number of cores used per machine to 8 for our runs. Also, the data-centre network infrastructure if different. This makes direct comparison tricky, but we aim to run experiments that are illustrative of the Hadean platform’s properties. Firstly, we see how far we can scale a process with a single worker. The Hadean process model deliberately avoids threads, so we run with 1 master process and 8 worker processes.
We bound the heap size of the worker and master processes at 2.25GiB. This gives approximately 18 GiB of memory for workers in total, which roughly matches our experiment earlier (2 executors with 9 GiB per executor). Hadean has no notion of master and worker processes and by default would allocate Mesh manager and worker processes to the same machine. To ensure that network traffic between the master and workers isn’t simply over loopback, we forced the master and workers to be allocated to different machines.
We successfully scale to 1.1e9 trials on a single machine (8392MiB), which is more than double that we achieved with Spark. The overheads we show are due to the fact that each spawned process must have its executable image sent to the target machine. Even though all 8 workers are identical, the image is sent 8 times. We expect to be able to reduce the overhead from spawning to less than a second in many cases.
Next we scale up the number of machines. This time we use 5 machines, utilising 8 cores per machine. Again we bound the heap to 2.25GiB. We run 39 worker processes and 1 master process. This is closer to normal execution on the Hadean platform than the previous runs. We do not use a dedicated master machine, but instead allocate one physical core for the master process. We use 1 less machine, but reduce our potential scale-up to 4.875.
hWe can successfully run up to 4.88e9 trials, which is a scale up of 4.44x and significantly better than the 1.5x what we achieved with Spark. In terms of total number of trials, we achieve 6.1x of what we managed with Spark with the same amount of memory.
Our experiments demonstrate that given a solid underlying platform, it’s possible to build a framework with a relatively simple Spark-like API, and achieve more than acceptable performance and scaling properties. Given the relatively simple nature of the example we chose to explore, it’s not surprising that we manage to scale. Instead it’s confusing that Spark does not. It’s possible with more tuning Spark would function more effectively, yet this is our main gripe with Spark. Experience with HPC environments, Java development and previous extensive experience with Linux was not helpful in making Spark run more effectively.
Spark’s allure is the simplicity of writing simple code yet having it run at massive scale. The Spark code for the Monte Carlo problem was simple, yet running it at scale was anything but. Our Mesh implementation has a similar API but runs more predictably and effectively. To answer the question we posed in the title, the answer is definitely yes.
- Rust has good closure support but poor support for sending serialized closures. Until we solve this (most likely by modifying the compiler) we use an effective but less pretty manual closure implementation in our code.
- Since this blog post was not primarily about Spark, we glossed over a number of the issues we encountered while trying to get Spark to scale. Some of these included:
- Memory errors. Errors due to heap exhaustion or over over-provisioning are expected when trying to scale. However, many the errors we received were much more opaque. These included errors suggesting that somehow memory had been overcommitted to the JVM, perhaps by Linux’s optimistic memory allocation policy, and errors from YARN when container memory limits had been exceeded. In practice, both of these should be avoidable.
- Serialization issues. Kryo, a Java serialization framework. frequently needed its buffer size increased as we scaled problem size. In the end, we ended up using 512MiB. It’s not apparent to use why a serialization library would need such large buffers since it’s possible to used fixed sized buffers if data is being written to a stream leaving the machine. In another study on Hadean, we’ve had no issues sending multi-gigabyte objects (an indexed Human genome) between processes.
- Odd EMR failure modes. If a Spark job fails to run correctly, it’s not always clear the cluster can be re-used. Sometimes we had a Spark cluster appear to function correctly, yet the exhibited failures that would not have occurred if we had killed then recreated the cluster (which can be extremely time intensive).
- Other errors. Some errors made no sense at all. These included workers apparently failing to generate heartbeats, presumably having failed. Usually, failed workers would show YARN container errors, but in this case they didn’t.