Embedding Inference at Scale for RAG Applications with Ray Data and Milvus

By Christy Bergman and Cheng Su   

Retrieval Augmented Generation (RAG) is one of the most popular use cases for enterprise Generative AI. Most RAG tutorials show how to use the OpenAI API for both the embedding model and large language model (LLM) inference.  Why should you pay to access your own data, especially during the development process?  You can access your own data and iterate as quickly as you like using open source.

One of the most intriguing discoveries was the remarkable performance boost achieved with Ray Data during the embedding step, where data is transformed into vectors. 

Running open-source embeddings using pooled batch inference requests with tools such as Ray Data saved resources and time compared to Pandas.  Using just four workers on a Mac M2 laptop with 16GB RAM, Ray Data was 60 times faster, more details later in this blog.  

Our Open Source RAG Stack:

  • New BGM-M3 embedding model (generating 3 types of vectors in one round: sparse, dense, and multi-vector)

  • Ray Data for fast, distributed embedding inference

  • AWS S3 for temporarily storing the inference result

  • Milvus or Zilliz Cloud vector database

  • Example data downloaded from Kaggle IMDB poster

LinkOur Open Source RAG Stack

LinkBGM-M3 Embedding model

Powerful Sparse, Dense, and Multi-vector Embeddings.  The BGE-M3 embedding model is nicknamed for its “multi” capabilities: Multi-Linguality, Multi-Functionality, and Multi-Granularity. It can work with over 100 languages, simultaneously compute embeddings for the three common retrieval methods: dense, sparse, multi-vector embeddings. It also works with various text lengths from short sentences to long documents (up to 8,192 tokens). You can learn more from this Paper,  or from this HuggingFace page about the model.

Since version 2.4, Milvus has built-in support for BGE M3.

LinkRay Data

Long-running data transformation tasks? Ray Data’s scalable data processing makes it easier and faster to process massive amounts of data in parallel across multiple machines (CPUs, GPUs, etc.). Ray Data is especially helpful when the data can be split into parallel processes, such as many simultaneous chunkin and embedding transformations! Under the hood, Ray Data has a powerful streaming execution engine, to maximize the GPU utilization in the cluster. Compared to running embedding with an online service (such as OpenAI embedding API), running an offline embedding job with Ray Data can save the majority of cost.

Anyscale is a managed platform for Ray. You can easily scale out the embedding jobs on Anyscale to leverage hundreds of GPU machines.

LinkMilvus and Zilliz

The secret sauce behind a lightning-fast RAG app is a powerful vector database!  Milvus is built to handle massive amounts of data for large-scale use by businesses.  Unlike some vector databases, Milvus can flexibly grow as your data needs increase –  its architectural layers for Storing, Indexing, and Querying are designed to scale up independently and/or out.  This makes your RAG app fast since Milvus smartly does offline computations before and while the queries come in real time. Plus, Milvus comes with other bells and whistles important for businesses, like keeping your data secure and organized (multi-tenancy and role-based access control) and ensuring it's always available (high availability).  

Zilliz is a managed cloud product and uses open source Milvus.

LinkSet up your RAG tools

We will use the Python SDK for Milvus, Ray Data, Amazon S3, and Zilliz.

For Amazon S3, you’ll need to sign up for an AWS account.

In your browser, navigate to console.aws.amazon.com > IAM > My security credentials > Create access key.  Copy and securely save locally your key and secret key.

Install libraries and run aws config.  This will put the AWS variables in a credentials file.

1
2
3
4
pip install boto3 
pip install awscli –force-reinstall –upgrade
aws config #fill in your key and secret key
more ~/.aws/credentials #make sure this looks correct

Install Ray Data:

1
pip install -U "ray[data]"

Install Pymilvus:

1
pip install -U pymilvus "pymilvus[model]" langchain

The BGE-M3 embedding model comes packaged already with Pymilvus since v2.4.

1
2
3
4
5
6
import ray, os, pprint, time, boto3
from langchain.text_splitter import RecursiveCharacterTextSplitter
import numpy as np
import pymilvus
print(pymilvus.__version__) # must be >= 2.4.0
from pymilvus.model.hybrid import BGEM3EmbeddingFunction

Zilliz: To use the Zilliz free tier (up to 2 collections, 1 million vectors each), sign up for an account, and create a Starter cluster.

LinkPrepare Data

The code in this blog uses the well-known public Kaggle IMDB poster data.  It contains about 48,000 movies, reviews, poster links, and more metadata.  

I copied all the text fields (movie name, description, review text) into a new column called ‘text’ and saved it in Parquet format, since it is more efficient than CSV.

LinkGenerate embeddings

The steps to create embeddings are:
1) Chunk the data: Split the input text into chunks, to keep the semantically related pieces of text together.
2) Call an embedding model in inference mode to generate vector representations of the chunks.

Ray Data is able to parallelize these data operations using: 
1) flat_map() for chunking the data since the output will have more rows than the input.
2) map_batches() for calling the embedding model from inside a callable Class method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 1. Define a regular python function to chunk data.
chunk_size = 512
chunk_overlap = np.round(chunk_size * 0.10, 0)

# Define a LangChain text splitter.
text_splitter = RecursiveCharacterTextSplitter(
   chunk_size=chunk_size,
   chunk_overlap=chunk_overlap,
   length_function=len)  #len is a built-in Python function

# Define a regular python function for chunking.
def chunk_row(row, splitter=text_splitter):

   # Copy the row columns into metadata.
   metadata = row.copy()
   del metadata['text'] # Remove text from metadata

   # Split the text into chunks.
   chunks = splitter.create_documents(
       texts=[row["text"]],
       metadatas=[metadata])
   chunk_list = [{
       "text": chunk.page_content,
       **chunk.metadata} for chunk in chunks]

   return chunk_list

# 2. Define a class with a callable method to compute embeddings.
class ComputeEmbeddings:
     def __init__(self):
           # Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder.
           # https://huggingface.co/BAAI/bge-m3
           self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
           print(f"dense_dim: {self.model.dim['dense']}")
           print(f"sparse_dim: {self.model.dim['sparse']}")

     def __call__(self, batch):

           # Ray data batch is a dictionary where values are array values.
           # BGEM3EmbeddingFunction input is docs as a list of strings.
           docs = list(batch['text'])

           # Encode the documents. bge-m3 dense embeddings are already normalized.
           embeddings = self.model(docs)
           batch['vector_dense'] = embeddings['dense']
           return batch

if __name__ == "__main__":

   FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"

   # Read data from local drive.
   ds = ray.data.read_parquet(FILE_PATH)

   # Chunk the input text
   chunked_ds = ds.flat_map(chunk_row)

   # Compute embeddings with a class that calls the embeddings model.
   embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)

   # Save the embeddings to S3 in a folder of parquet part files.
embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')

To run this, you’ll submit it as a Ray job:

  1. Save the code into a Python script file.  I called it ray_data_demo.py

  2. To run locally from your laptop, create a clean directory, with only the .py script file and the .parquet data file.  Only put the bare minimum in this clean directory.  I called mine ‘ray_cluster’.

  3. Run the Python script.  This will start a Ray cluster and submit a job automatically.

  4. Navigate over to http://127.0.0.1:8265.  View the Cluster and Jobs timings.

image3

LinkEmbedding Latency - 60x faster on a laptop

Approach

Input data size

Total time

Screenshot

Pandas

100 rows

23 sec

image1

Ray Data

100 rows

50 sec

screenshot 2

Pandas

45K rows

>4 hours

image7-

Ray Data

45K rows

4 min

image4

Table: Timings for embedding data on a M2 16GB laptop.  Ray data batch processing was on a single-node Ray cluster, concurrency= 4 workers.  Pandas was slow because it only had one processor, while Ray Data had 4 processors.  Both would run faster on a bigger cluster.

LinkBulk-insert the embedded data from S3 directly into Milvus or Zilliz

Both Milvus and Zilliz offer bulk-insert to import already embedded data directly from AWS, GCP, or Azure.  In addition to the web console (shown below), Zilliz also offers a restful API and SDK.

For a large corpus of batch-generated embeddings, using bulk import can significantly save the machine resource and shorten insertion time compared to incremental insertion. More importantly, the vector search index built by bulk import is much more efficient than that from incremental insertion (think of global optimization v.s. local optimization).

Let’s see how to conveniently conduct bulk import with a few simple clicks on the Zilliz Cloud web console.  Starting from the Cluster where you want to create the new collection, create a new collection with AutoID, only the “vector” column with correct EMBEDDING_DIMENSION, use the convenient “Dynamic Field” option, and click “Create Collection”.

Next, click “Import Data” and follow the screen instructions to copy the path to the parquet files written by the Ray Data job. (Note that you need to also specify the Access Key and Secret Key if your S3 bucket is private, so that Zilliz Cloud would be able to read the data in it). Any of Amazon S3, Google Cloud Storage, or Azure Blob Storage cloud sources are supported.  Click “Import” to start importing all data into the vector database collection. 

Once imported, you can optionally click on the build index on the collection to make the vector search more efficient in the Query your data step.

image2
Image: Screenshot of the Zilliz bulk insert screens.

LinkQuery your data

To test the newly imported collection, let’s ask a question and retrieve answers from our movie data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def mc_run_search(question, output_fields, top_k=2, filter_expression=""):

   # Embed the question using the same encoder.
   embeddings = model_bgem3([question])
   query_embeddings = embeddings['dense']

   # Run semantic vector search using your query and the vector database.
   results = mc.search(
       COLLECTION_NAME,
       data=query_embeddings,
       search_params=SEARCH_PARAMS,
       output_fields=output_fields,
       # Milvus can utilize metadata in boolean expressions to filter search.
       filter=filter_expression,
       limit=top_k,
       consistency_level="Eventually"
   )

   # Assemble retrieved context and context metadata.
   # The search result is in the variable `results[0]`, which is type
   # 'pymilvus.orm.search.SearchResult'.
   METADATA_FIELDS = [f for f in output_fields if f != 'chunk']
   formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context(
       results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k)
   return formatted_results, context, context_metadata

SAMPLE_QUESTION = "muybridge horse movie"

# Return top k unique results with HNSW index.
TOP_K = 2

# Define output fields to return.
OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]

formatted_results, context, context_metadata = \
   mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)

Looping through the top 2 unique results, we can see the following content closely returned from the above search query:

image5

The full Ray Data script is available on GitHub.

LinkConclusion

This blog showed how to use Ray Data and Milvus Bulk Import features to significantly speed up the vector generation and efficiently batch load them into a vector database. For example, Embedding 102K rows of data using Ray Data took 4 minutes compared to 4 hours using a naive Pandas approach! Furthermore, using Bulk Import in Milvus can build a highly efficient vector index and save resources and time compared to regular incremental insertion. Check out Ray Data and Bulk Import features in Milvus and Zilliz Cloud for more details!

Ready to try Anyscale?

Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.