Scaling Monte Carlo Simulations On Apache Spark. Can We Do Better?
Monte Carlo Financial SimulationsAt Hadean, we’re working to lower the barrier to distributed computing by making it simpler and more reliable. One use-case we decided to investigate was Spark used for Monte Carlo financial simulations, specifically to calculate a “Value at Risk” (VaR) value. Big-data company Cloudera has provided example Spark code to do this in a previous blog post . The logs and scripts used to run the Spark examples in this post have been placed on GitHub. The financial details of the simulation are beyond the scope of this post, but the steps for a simulation of size N can be summarized as follows:
- Construct a list of p random seeds (where p is the number of worker nodes).
- Parallelize the list so that one random seed is present on each worker.
- A flatmap operation is applied to the random seeds. Each random seed is used to seed a multivariate Normal random number generator. Given a random vector sampled from the distribution, inner products are evaluated against different financial “instruments”. These are summed and the final value is considered the result of the trial . N/p trials are run on each worker. Since this operation is a flatmap, rather than a map, the result is a list of ~N double precision values corresponding to the result of each trial.
- The 5% VaR value corresponds to the trial value at index N/20 in the list of trial results, were the list sorted. In the Cloudera code, this is implemented by using the takeOrdered() function to retrieve the smallest N/20 elements from the list, then taking the final value.
The Monte Carlo example is ideal for a small-scale test. It’s representative of a real-world scenario, and has a size that can be easily increased that directly corresponds both to the amount of compute and to the amount of data generated. Since each Monte-Carlo trial generates one double-precision value, it’s possible to talk about the amount of work in terms of the size of trial results. One million Monte Carlo trials correspond to 7.6MiB of data.
Monte Carlo on Spark
Let’s try this on Spark. We’ll first try with a single worker node, then increase the number. For our master node we choose Amazon’s r4.xlarge (4 vCPUs, 30.5 GiB memory) and a single worker node of type c4.4xlarge (16 vCPUs, 30 GiB memory). Amazon EMR is configured to maximise resource allocation, so we try scaling up problem size without trying to tune Spark’s memory/thread count settings for worker nodes.
We observe that very little of the execution time is spent doing the trials themselves. Instead, the majority is spent taking the first 5% of ordered trial results. We also find that we cannot scale beyond 4.8e8 trials, or a data-set size of 3.7GiB. Beyond this, the following errors occurred.
17/11/17 16:17:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-172-31-19-196.us-east-2.compute.internal:34217 (size: 2.1 KB, free: 6.5 GB) 17/11/17 16:28:59 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 325326 ms exceeds timeout 300000 ms 17/11/17 16:28:59 ERROR YarnScheduler: Lost executor 1 on ip-172-31-19-196.us-east-2.compute.internal: Executor heartbeat timed out after 325326 ms 17/11/17 16:28:59 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 32, ip-172-31-19-196.us-east-2.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 325326 ms 17/11/17 16:28:59 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 41, ip-172-31-19-196.us-east-2.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 325326 ms
Increasing the network timeout had no effect, nor was there any feedback on why the worker node had stopped responding. After this we killed the job, since Spark will continue to retry it. 3.7GiB seems surprisingly small given that a worker has 30 GiB of RAM. The Spark logs showed that the main compute phase completed fine, and the issue occurred during the takeOrdered() operation.
We also re-ran the same experiment, but this time chose to control more of the worker settings. Each worker machine runs one or more Spark processes (“executors”), each of which has multiple threads (“cores” in Spark terminology). Based on guides online and our own experimentation, we chose to re-run with 2 executors on the worker with 4 cores and 9 GiB memory per executor (more than 9 GiB caused a reduced number of executors). Since the worker machine is hyperthreaded and therefore only has 8 physical cores, we don’t expect using only 8 threads to significantly reduce throughput.
Manually taking control of memory and thread counts gives us improved performance, but we still cannot scale beyond 4.8e8 trials on a single worker node.
Having established that we can successfully run 4.8e8 trials per worker machine, we decide to scale up to five worker machines. Running more trials per machine would be nice, but it’s even more important that we can handle problems that span multiple machines. Such scaling should be trivial as long as we ensure that the amount of work performed and data generated on each worker remains constant. We also change the master machine to an r4.xlarge which has 60GiB of RAM since the increase in problem size means that more data will be sent to the master node.
Given that we were able to run 4.8e8 trials with a single worker, we expect to be able to run 2.4e9 trials using 5 workers. Instead, we only managed to scale to 8e8 workers, or around a third of what we expected. This means we only achieved a 1.5x scale-up by using 5 machines instead of the desired 5x. Attempting to scale beyond this, things failed, but it was hard to determine why. The first sign of a problem was a warning:
17/11/22 12:49:06 WARN DFSClient: DFSOutputStream ResponseProcessor exception for block BP-210548905-172.31.14.197-1511354019022:blk_1073741843_1019 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java :733) 17/11/22 12:49:06 WARN DFSClient: Error Recovery for block BP-210548905-172.31.14.197-1511354019022:blk_1073741843_1019 in pipeline DatanodeInfoWithStorage[172 .31.8.218:50010,DS-c6be1038-ef92-4571-aa2f-91139050c357,DISK], DatanodeInfoWithStorage[172.31.6.159:50010,DS-0eb98214-d8b3-4b83-a7f3-7336c54a398e,DISK]: bad datanode DatanodeInfoWithStorage[172.31.8.218:50010,DS-c6be1038-ef92-4571-aa2f-91139050c357,DISK]Despite this warning, the job apparently continued to execute, until a JVM thread dump. This appeared to be from the master node, though it’s unclear why this might have occurred. Spark is in use in production, and therefore can and has been made to work effectively. The author notes that he is not a Spark expert, but has previously developed and run software in an HPC environment. Spark’s promise is that it makes big-data simpler, in particular by avoiding the complexity of having to deal with HPC-like environments and the Scala code for the Monte Carlo example is certainly quite succinct. Yet at first glance, this simplicity has been counterbalanced by a corresponding increase in complexity when attempting to run at scale, mostly in the form of obscure messages from a highly complex system.
Monte Carlo on HadeanAt Hadean, we’ve been building a platform designed to remedy these issues. The full technical details are beyond the scope of this blog post, but in summary:
- We run native code. Our toolchain is currently based around Rust, but we also support C and expect to support other languages that compile with LLVM. We eliminate issues related to garbage collection simply by not having it in the first place.
- We bound memory. When we execute a process, we fix the amount of memory it can allocate, including not just the heap but even the executable image itself. We guarantee such memory has been allocated so features such as the opportunistic memory allocation strategy used in Linux cannot cause strange out of memory errors. Even the dynamic allocation of things like low-level network buffers comes from the memory allocated to a user process ensuring that our process execution mechanism never needs to perform the (possibly failing) dynamic allocation itself.
- We flatten the stack. Rather than try to support containers, Hadean processes run as close to the bare metal as possible. We use lightweight sandboxing mechanisms similar to those in Google Native Client rather than more complex virtualisation mechanisms like Xen. By eliminating the number of layers in our stack that failures can occur at, we make errors from user applications much less opaque.
ConclusionOur experiments demonstrate that given a solid underlying platform, it’s possible to build a framework with a relatively simple Spark-like API, and achieve more than acceptable performance and scaling properties. Given the relatively simple nature of the example we chose to explore, it’s not surprising that we manage to scale. Instead it’s confusing that Spark does not. It’s possible with more tuning Spark would function more effectively, yet this is our main gripe with Spark. Experience with HPC environments, Java development and previous extensive experience with Linux was not helpful in making Spark run more effectively. Spark’s allure is the simplicity of writing simple code yet having it run at massive scale. The Spark code for the Monte Carlo problem was simple, yet running it at scale was anything but. Our Mesh implementation has a similar API but runs more predictably and effectively. To answer the question we posed in the title, the answer is definitely yes. Notes
- Rust has good closure support but poor support for sending serialized closures. Until we solve this (most likely by modifying the compiler) we use an effective but less pretty manual closure implementation in our code.
- Since this blog post was not primarily about Spark, we glossed over a number of the issues we encountered while trying to get Spark to scale. Some of these included:
- Memory errors. Errors due to heap exhaustion or over over-provisioning are expected when trying to scale. However, many the errors we received were much more opaque. These included errors suggesting that somehow memory had been overcommitted to the JVM, perhaps by Linux’s optimistic memory allocation policy, and errors from YARN when container memory limits had been exceeded. In practice, both of these should be avoidable.
- Serialization issues. Kryo, a Java serialization framework. frequently needed its buffer size increased as we scaled problem size. In the end, we ended up using 512MiB. It’s not apparent to use why a serialization library would need such large buffers since it’s possible to used fixed sized buffers if data is being written to a stream leaving the machine. In another study on Hadean, we’ve had no issues sending multi-gigabyte objects (an indexed Human genome) between processes.
- Odd EMR failure modes. If a Spark job fails to run correctly, it’s not always clear the cluster can be re-used. Sometimes we had a Spark cluster appear to function correctly, yet the exhibited failures that would not have occurred if we had killed then recreated the cluster (which can be extremely time intensive).
- Other errors. Some errors made no sense at all. These included workers apparently failing to generate heartbeats, presumably having failed. Usually, failed workers would show YARN container errors, but in this case they didn’t.