Retrieval Augmented Generation (RAG) is one of the most popular use cases for enterprise Generative AI. Most RAG tutorials show how to use the OpenAI API for both the embedding model and large language model (LLM) inference. Why should you pay to access your own data, especially during the development process? You can access your own data and iterate as quickly as you like using open source.
One of the most intriguing discoveries was the remarkable performance boost achieved with Ray Data during the embedding step, where data is transformed into vectors.
Running open-source embeddings using pooled batch inference requests with tools such as Ray Data saved resources and time compared to Pandas. Using just four workers on a Mac M2 laptop with 16GB RAM, Ray Data was 60 times faster, more details later in this blog.
Our Open Source RAG Stack:
New BGM-M3 embedding model (generating 3 types of vectors in one round: sparse, dense, and multi-vector)
Ray Data for fast, distributed embedding inference
AWS S3 for temporarily storing the inference result
Milvus or Zilliz Cloud vector database
Example data downloaded from Kaggle IMDB poster
Powerful Sparse, Dense, and Multi-vector Embeddings. The BGE-M3 embedding model is nicknamed for its “multi” capabilities: Multi-Linguality, Multi-Functionality, and Multi-Granularity. It can work with over 100 languages, simultaneously compute embeddings for the three common retrieval methods: dense, sparse, multi-vector embeddings. It also works with various text lengths from short sentences to long documents (up to 8,192 tokens). You can learn more from this Paper, or from this HuggingFace page about the model.
Since version 2.4, Milvus has built-in support for BGE M3.
Long-running data transformation tasks? Ray Data’s scalable data processing makes it easier and faster to process massive amounts of data in parallel across multiple machines (CPUs, GPUs, etc.). Ray Data is especially helpful when the data can be split into parallel processes, such as many simultaneous chunkin and embedding transformations! Under the hood, Ray Data has a powerful streaming execution engine, to maximize the GPU utilization in the cluster. Compared to running embedding with an online service (such as OpenAI embedding API), running an offline embedding job with Ray Data can save the majority of cost.
Anyscale is a managed platform for Ray. You can easily scale out the embedding jobs on Anyscale to leverage hundreds of GPU machines.
The secret sauce behind a lightning-fast RAG app is a powerful vector database! Milvus is built to handle massive amounts of data for large-scale use by businesses. Unlike some vector databases, Milvus can flexibly grow as your data needs increase – its architectural layers for Storing, Indexing, and Querying are designed to scale up independently and/or out. This makes your RAG app fast since Milvus smartly does offline computations before and while the queries come in real time. Plus, Milvus comes with other bells and whistles important for businesses, like keeping your data secure and organized (multi-tenancy and role-based access control) and ensuring it's always available (high availability).
Zilliz is a managed cloud product and uses open source Milvus.
We will use the Python SDK for Milvus, Ray Data, Amazon S3, and Zilliz.
For Amazon S3, you’ll need to sign up for an AWS account.
In your browser, navigate to console.aws.amazon.com > IAM > My security credentials > Create access key. Copy and securely save locally your key and secret key.
Install libraries and run aws config. This will put the AWS variables in a credentials file.
1pip install boto3
2pip install awscli –force-reinstall –upgrade
3aws config #fill in your key and secret key
4more ~/.aws/credentials #make sure this looks correct
Install Ray Data:
1pip install -U "ray[data]"
Install Pymilvus:
1pip install -U pymilvus "pymilvus[model]" langchain
The BGE-M3 embedding model comes packaged already with Pymilvus since v2.4.
1import ray, os, pprint, time, boto3
2from langchain.text_splitter import RecursiveCharacterTextSplitter
3import numpy as np
4import pymilvus
5print(pymilvus.__version__) # must be >= 2.4.0
6from pymilvus.model.hybrid import BGEM3EmbeddingFunction
Zilliz: To use the Zilliz free tier (up to 2 collections, 1 million vectors each), sign up for an account, and create a Starter cluster.
The code in this blog uses the well-known public Kaggle IMDB poster data. It contains about 48,000 movies, reviews, poster links, and more metadata.
I copied all the text fields (movie name, description, review text) into a new column called ‘text’ and saved it in Parquet format, since it is more efficient than CSV.
The steps to create embeddings are:
1) Chunk the data: Split the input text into chunks, to keep the semantically related pieces of text together.
2) Call an embedding model in inference mode to generate vector representations of the chunks.
Ray Data is able to parallelize these data operations using:
1) flat_map() for chunking the data since the output will have more rows than the input.
2) map_batches() for calling the embedding model from inside a callable Class method.
1# 1. Define a regular python function to chunk data.
2chunk_size = 512
3chunk_overlap = np.round(chunk_size * 0.10, 0)
4
5# Define a LangChain text splitter.
6text_splitter = RecursiveCharacterTextSplitter(
7 chunk_size=chunk_size,
8 chunk_overlap=chunk_overlap,
9 length_function=len) #len is a built-in Python function
10
11# Define a regular python function for chunking.
12def chunk_row(row, splitter=text_splitter):
13
14 # Copy the row columns into metadata.
15 metadata = row.copy()
16 del metadata['text'] # Remove text from metadata
17
18 # Split the text into chunks.
19 chunks = splitter.create_documents(
20 texts=[row["text"]],
21 metadatas=[metadata])
22 chunk_list = [{
23 "text": chunk.page_content,
24 **chunk.metadata} for chunk in chunks]
25
26 return chunk_list
27
28# 2. Define a class with a callable method to compute embeddings.
29class ComputeEmbeddings:
30 def __init__(self):
31 # Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder.
32 # https://huggingface.co/BAAI/bge-m3
33 self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
34 print(f"dense_dim: {self.model.dim['dense']}")
35 print(f"sparse_dim: {self.model.dim['sparse']}")
36
37 def __call__(self, batch):
38
39 # Ray data batch is a dictionary where values are array values.
40 # BGEM3EmbeddingFunction input is docs as a list of strings.
41 docs = list(batch['text'])
42
43 # Encode the documents. bge-m3 dense embeddings are already normalized.
44 embeddings = self.model(docs)
45 batch['vector_dense'] = embeddings['dense']
46 return batch
47
48if __name__ == "__main__":
49
50 FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"
51
52 # Read data from local drive.
53 ds = ray.data.read_parquet(FILE_PATH)
54
55 # Chunk the input text
56 chunked_ds = ds.flat_map(chunk_row)
57
58 # Compute embeddings with a class that calls the embeddings model.
59 embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)
60
61 # Save the embeddings to S3 in a folder of parquet part files.
62embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')
To run this, you’ll submit it as a Ray job:
Save the code into a Python script file. I called it ray_data_demo.py
To run locally from your laptop, create a clean directory, with only the .py script file and the .parquet data file. Only put the bare minimum in this clean directory. I called mine ‘ray_cluster’.
Run the Python script. This will start a Ray cluster and submit a job automatically.
Navigate over to http://127.0.0.1:8265. View the Cluster and Jobs timings.
Approach | Input data size | Total time | Screenshot |
---|---|---|---|
Pandas | 100 rows | 23 sec | |
Ray Data | 100 rows | 50 sec | |
Pandas | 45K rows | >4 hours | |
Ray Data | 45K rows | 4 min |
Table: Timings for embedding data on a M2 16GB laptop. Ray data batch processing was on a single-node Ray cluster, concurrency= 4 workers. Pandas was slow because it only had one processor, while Ray Data had 4 processors. Both would run faster on a bigger cluster.
Both Milvus and Zilliz offer bulk-insert to import already embedded data directly from AWS, GCP, or Azure. In addition to the web console (shown below), Zilliz also offers a restful API and SDK.
For a large corpus of batch-generated embeddings, using bulk import can significantly save the machine resource and shorten insertion time compared to incremental insertion. More importantly, the vector search index built by bulk import is much more efficient than that from incremental insertion (think of global optimization v.s. local optimization).
Let’s see how to conveniently conduct bulk import with a few simple clicks on the Zilliz Cloud web console. Starting from the Cluster where you want to create the new collection, create a new collection with AutoID, only the “vector” column with correct EMBEDDING_DIMENSION, use the convenient “Dynamic Field” option, and click “Create Collection”.
Next, click “Import Data” and follow the screen instructions to copy the path to the parquet files written by the Ray Data job. (Note that you need to also specify the Access Key and Secret Key if your S3 bucket is private, so that Zilliz Cloud would be able to read the data in it). Any of Amazon S3, Google Cloud Storage, or Azure Blob Storage cloud sources are supported. Click “Import” to start importing all data into the vector database collection.
Once imported, you can optionally click on the build index on the collection to make the vector search more efficient in the Query your data step.
To test the newly imported collection, let’s ask a question and retrieve answers from our movie data.
1def mc_run_search(question, output_fields, top_k=2, filter_expression=""):
2
3 # Embed the question using the same encoder.
4 embeddings = model_bgem3([question])
5 query_embeddings = embeddings['dense']
6
7 # Run semantic vector search using your query and the vector database.
8 results = mc.search(
9 COLLECTION_NAME,
10 data=query_embeddings,
11 search_params=SEARCH_PARAMS,
12 output_fields=output_fields,
13 # Milvus can utilize metadata in boolean expressions to filter search.
14 filter=filter_expression,
15 limit=top_k,
16 consistency_level="Eventually"
17 )
18
19 # Assemble retrieved context and context metadata.
20 # The search result is in the variable `results[0]`, which is type
21 # 'pymilvus.orm.search.SearchResult'.
22 METADATA_FIELDS = [f for f in output_fields if f != 'chunk']
23 formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context(
24 results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k)
25 return formatted_results, context, context_metadata
26
27SAMPLE_QUESTION = "muybridge horse movie"
28
29# Return top k unique results with HNSW index.
30TOP_K = 2
31
32# Define output fields to return.
33OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]
34
35formatted_results, context, context_metadata = \
36 mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)
Looping through the top 2 unique results, we can see the following content closely returned from the above search query:
The full Ray Data script is available on GitHub.
This blog showed how to use Ray Data and Milvus Bulk Import features to significantly speed up the vector generation and efficiently batch load them into a vector database. For example, Embedding 102K rows of data using Ray Data took 4 minutes compared to 4 hours using a naive Pandas approach! Furthermore, using Bulk Import in Milvus can build a highly efficient vector index and save resources and time compared to regular incremental insertion. Check out Ray Data and Bulk Import features in Milvus and Zilliz Cloud for more details!