In a previous blog post, we defined a "3rd generation ML platform" as one that offered full programmability for ML workflows. Key to a 3rd generation platform is the concept of a programmable compute layer. In this blog, we report on emerging patterns of distributed compute we see in advanced ML platform workloads. We show how Ray, a leading programmable compute layer, improves performance by 3-9x in relevant production workloads.
Distributed ML workflows are typically composed from a few types of compute patterns: collective (i.e., a set of processes communicating with each other like in distributed SGD), chaining (i.e., a sequential workflow of tasks run one after the other), and nesting (i.e., tasks that kick off other tasks, commonly seen in hyperparameter tuning):
In first generation platforms, ML workflows were implemented as custom-built systems optimized for a specific workflow. In second generation platforms, flexibility was achieved by relying on workflow orchestrators to provide chaining and nesting, gluing together separate systems that internally implement high-optimized collective operations. The following figure illustrates a 2nd generation distributed ML platform. Each step is run as a separate cluster by a workflow orchestrator (e.g., FBLearner Flow, SageMaker Steps, Metaflow, KubeFlow):
However, this 2nd generation architecture imposes performance overheads and limits programmability. This is due to:
Scheduling overheads. These platforms rely on separately scheduled VMs or containers per step to distribute the workload. Each step may launch its own distributed framework (e.g., Spark or Distributed PyTorch). This leads to several seconds to minutes of scheduling overhead per step, and prevents optimizations like pipelining.
Data movement overheads. The overhead between steps can be reduced substantially if data is kept in memory between steps when possible and not materialized to storage.
Programmability overheads. Expressing fine-grained nesting or pipelining can require substantial changes to distributed systems code that are rarely accessible to end-users of existing ML platforms.
In many cases, chaining and nesting are not performance bottlenecks. This is because the data transferred is small, or scheduling overhead is small compared to execution time. However, there are a growing number of scenarios where bottlenecks do arise. Here we overview several use cases that benefit from optimized chaining and nesting.
Use case | 2nd generation platforms | 3rd generation platforms |
---|---|---|
Data Ingest for Model Training (Chaining): Chaining data processing and ML training is simple when the data can fit onto a single machine. It becomes challenging at large scale, when you want to shuffle data globally during training, or pipeline data preparation with training to reduce latency. Although materializing data to cluster storage provides certain benefits, frequent I/O to cluster storage introduces a lot of overhead. | ||
Serving Pipelines (Chaining): Similarly, passing data efficiently is critical to the performance of disaggregated model serving pipelines. In a disaggregated pipeline, large models are split into multiple parts that can be deployed and scaled separately (e.g., fully-connected networks vs memory-intensive embedding lookups). Predictions can be composed of multiple models in a DAG structure (e.g., RoboVision uses a 5-model stack for vehicle detection pipeline). | ||
Batch Scoring (Nesting): Scoring a model on a large dataset can be quite slow without distributed computation, which naturally generates a nested workload during the course of training. Nesting can be difficult to express, let alone efficiently, in traditional workflow orchestrators. | ||
Hyperparameter Tuning (Nesting): Nesting also occurs naturally during hyperparameter tuning, which seeks to explore many variants of existing workloads. Nesting is needed for distributed trials, and efficiency is important for supporting lightweight trials and population based approaches. | ||
Workflow DAGs (Both Chaining and Nesting): Finally, both chaining and nesting occur naturally in higher level workflow DAGs implemented by workflow orchestrators. Here two bottlenecks can occur: (1) if large amounts of data is passed or shared between workflow steps, it is desirable to pass data at memory-speed rather than reading and writing to cluster storage, and (2) if steps are small, scheduling overhead can dominate workflow run time. |
A third generation ML platform eliminates the above performance bottlenecks. Users and builders of third generation platforms are able to:
Implement chaining and nesting of distributed steps with minimal scheduling and data movement overheads.
Programmatically author ML workflows, weaving together steps like ingest, transform, and training without needing to wrangle separate distributed systems.
This is possible with the use of a programmable compute layer such as Ray, which can serve as a replacement to (or accelerator for) workflow orchestrators. In a 3rd gen platform, distributed logic such as data processing is implemented as libraries within the compute layer:
In this section we highlight some of the performance gains users have seen by leveraging Ray's support for efficient chaining and nesting of computations:
In a previous blog post, Uber introduced how they were leveraging Ray for elastic scheduling and training with Horovod. In a follow-up project, another team is using Ray's Dataset Pipeline to implement shuffled ML ingest, where data is globally shuffled across cluster CPU workers per iteration of GPU training. Here we analyze the importance of pipelining and in-memory data exchange. We ran a training workload reading 500GB of data on a cluster of 70 CPU nodes and 16 GPU nodes, and find ingest throughput is 3x higher with the pipelining and in-memory data exchange enabled by Ray:
Mobile gaming giant Wildlife Studios’ legacy system for serving revenue-generating in-game offers was not scaling to meet their latency and cost requirements. After switching to Ray Serve, their Dynamic Offers team was able to serve offers three times faster. The Ray Serve architecture provided support for parallel inference on multiple models in a pipeline, decreasing latency and minimizing idle machines:
Anastasia provides a powerful platform that enables organizations to operate AI capacities at scale with a fraction of the resources and effort traditionally required. They were able to accelerate a demand prediction problem using Ray Tune, and got up to 9x performance gains due to fine-grained re-use of resources compared to a coarse grained orchestrator:
In this blog, we showed how a programmable compute layer such as Ray can provide 3-9x performance improvements for production ML workloads, eliminating bottlenecks found in 2nd generation production architectures. This is in addition to the productivity and operational benefits of having a programmable architecture.
As a final note, the ideas presented here apply generally to distributed programming as well as to ML workloads. If you're interested in the performance and programmability of ML applications and distributed computing, check out the Ray project and consider joining us.
Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.