几十行代码的RAG Pipeline教程:UltraRAG 2.0+Milvus+MCP
做RAG系统,要解决的核心问题是什么?
总结有二:
成本过高,如何在多轮推理、动态检索、自适应知识组织等高级特性下,避免工程实现的巨大开销?
工程落地太复杂,各种大模型框架、向量数据库门槛不低,时间花费在了各种细节的调参、选索引之上。
要解决这个问题,清华大学THUNLP实验室联合东北大学NEUIR、OpenBMB与AI9Stars推出UltraRAG 2.0,可以作为一个很好的参考。
通过组件化封装、声明式编排以及轻量调度,UltraRAG 2.0可以用YAML配置文件替代传统编码,工程师只要几十行声明式语句就能搞定串行、循环、条件分支等复杂逻辑,以低代码实现多阶段推理系统。
01
RAG Pipeline架构
现如今,RAG技术正从早期检索+生成的简单拼接,升级为工程实现复杂、技术门槛高、能够搞定多轮推理、动态检索的复杂AI应用领域。典型代表如DeepResearch、Search-o1等。
在此基础上,UltraRAG 2.0有三大技术创新:
首先是组件化封装,基于MCP架构将复杂的RAG组件标准化为独立服务,实现了真正的模块化设计,开发者只需关注业务逻辑,无需处理底层通信细节。
其次是声明式编排,面对复杂的多阶段推理流程,传统方法很难定位具体哪个环节出现问题。UltraRAG 2.0的YAML配置让整个执行流程一目了然,每个步骤的输入输出都有清晰的追踪记录,调试效率提升数倍。
最后是轻量调度,内置的Client引擎确保了系统的高效运行和完全解耦,传统RAG系统往往是单体架构,添加新功能需要修改核心代码。UltraRAG 2.0采用微服务化的MCP架构,新组件可以独立开发和部署,系统扩展可以像安装插件一样简单。
02
为RAG Pipeline集成Milvus
在UltraRAG 2.0的技术栈中,向量数据库的选择至关重要,我们可以通过集成方式使项目支持Milvus。集成后用户可以通过简单的 ultrarag build 和 ultrarag run 命令来构建索引和执行查询,系统会自动加载配置并协调各个服务模块完成任务。
演示目标:
1.集成Milvus到UltraRAG2.0项目中
2.自定义基于Milvus的Pipeline
3.测试跑通项目
4.项目运行结果评估分数可忽略(不是本地演示目的)
数据准备
说明:使用Milvus仓库的FAQ作为数据集,格式是jsonl。
{"id": "faq_0", "contents": "If you failed to pull the Milvus Docker image from Docker Hub, try adding other registry mirrors. Users from the Chinese mainland can add the URL https://registry.docker-cn.com to the registry-mirrors array in /etc.docker/daemon.json."}
{"id": "faq_1", "contents": "Docker is an efficient way to deploy Milvus, but not the only way. You can also deploy Milvus from source code. This requires Ubuntu (18.04 or higher) or CentOS (7 or higher). See Building Milvus from Source Code for more information."}
{"id": "faq_2", "contents": "Recall is affected mainly by index type and search parameters. For FLAT index, Milvus takes an exhaustive scan within a collection, with a 100% return. For IVF indexes, the nprobe parameter determines the scope of a search within the collection. Increasing nprobe increases the proportion of vectors searched and recall, but diminishes query performance."}
{"id": "faq_3", "contents": "Milvus does not support modification to configuration files during runtime. You must restart Milvus Docker for configuration file changes to take effect."}
{"id": "faq_4", "contents": "If Milvus is started using Docker Compose, run docker ps to observe how many Docker containers are running and check if Milvus services started correctly. For Milvus standalone, you should be able to observe at least three running Docker containers, one being the Milvus service and the other two being etcd management and storage service."}
{"id": "faq_5", "contents": "The time difference is usually due to the fact that the host machine does not use Coordinated Universal Time (UTC). The log files inside the Docker image use UTC by default. If your host machine does not use UTC, this issue may occur."}
{"id": "faq_6", "contents": "Milvus requires your CPU to support a SIMD instruction set: SSE4.2, AVX, AVX2, or AVX512. CPU must support at least one of these to ensure that Milvus operates normally."}
{"id": "faq_7", "contents": "Milvus requires your CPU to support a SIMD instruction set: SSE4.2, AVX, AVX2, or AVX512. CPU must support at least one of these to ensure that Milvus operates normally. An illegal instruction error returned during startup suggests that your CPU does not support any of the above four instruction sets."}
{"id": "faq_8", "contents": "Yes. You can install Milvus on Windows either by compiling from source code or from a binary package. See Run Milvus on Windows to learn how to install Milvus on Windows."}
{"id": "faq_9", "contents": "It is not recommended to install PyMilvus on Windows. But if you have to install PyMilvus on Windows but got an error, try installing it in a Conda environment. See Install Milvus SDK for more information about how to install PyMilvus in the Conda environment."}
1.第一步:部署Milvus向量数据库
1.1.下载部署文件
wget https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml -O docker-compose.yml
1.2.启动Milvus服务
docker-compose up -d
docker-compose ps -a
2.第二步:Clone项目
git clone https://github.com/OpenBMB/UltraRAG.git
3.第三步:Pipleline实现
3.1 集成Milvus向量数据库
说明:原有支持类型中集成Milvus
vim ultraRAG/UltraRAG/servers/retriever/src/retriever.py
import os
from urllib.parse import urlparse, urlunparse
from typing import Any, Dict, List, Optional
import aiohttp
import asyncio
import jsonlines
import numpy as np
import pandas as pd
from tqdm import tqdm
from flask import Flask, jsonify, request
from openai import AsyncOpenAI, OpenAIError
from fastmcp.exceptions import NotFoundError, ToolError, ValidationError
from ultrarag.server import UltraRAG_MCP_Server
app = UltraRAG_MCP_Server("retriever")
retriever_app = Flask(__name__)
class Retriever:
def __init__(self, mcp_inst: UltraRAG_MCP_Server):
mcp_inst.tool(
self.retriever_init,
output="retriever_path,corpus_path,index_path,faiss_use_gpu,infinity_kwargs,cuda_devices->None",
)
mcp_inst.tool(
self.retriever_init_openai,
output="corpus_path,openai_model,api_base,api_key->None",
)
mcp_inst.tool(
self.retriever_init_Milvus,
output="corpus_path,Milvus_host,Milvus_port,collection_name,embedding_dim->None",
)
mcp_inst.tool(
self.retriever_embed,
output="embedding_path,overwrite,use_alibaba_cloud,alibaba_api_key,alibaba_model,alibaba_endpoint->None",
)
mcp_inst.tool(
self.retriever_embed_openai,
output="embedding_path,overwrite->None",
)
mcp_inst.tool(
self.retriever_index,
output="embedding_path,index_path,overwrite,index_chunk_size->None",
)
mcp_inst.tool(
self.retriever_index_lancedb,
output="embedding_path,lancedb_path,table_name,overwrite->None",
)
# Note: retriever_index_Milvus has been removed
# Use setup_Milvus_collection.py for collection creation and indexing
mcp_inst.tool(
self.retriever_search,
output="q_ls,top_k,query_instruction,use_openai->ret_psg",
)
mcp_inst.tool(
self.retriever_search_lancedb,
output="q_ls,top_k,query_instruction,use_openai,lancedb_path,table_name,filter_expr->ret_psg",
)
mcp_inst.tool(
self.retriever_search_Milvus,
output="q_ls,top_k,query_instruction,use_openai->ret_psg",
)
mcp_inst.tool(
self.retriever_deploy_service,
output="retriever_url->None",
)
mcp_inst.tool(
self.retriever_deploy_search,
output="retriever_url,q_ls,top_k,query_instruction->ret_psg",
)
mcp_inst.tool(
self.retriever_exa_search,
output="q_ls,top_k->ret_psg",
)
mcp_inst.tool(
self.retriever_tavily_search,
output="q_ls,top_k->ret_psg",
)
def retriever_init(
self,
retriever_path: str,
corpus_path: str,
index_path: Optional[str] = None,
faiss_use_gpu: bool = False,
infinity_kwargs: Optional[Dict[str, Any]] = None,
cuda_devices: Optional[str] = None,
):
try:
import faiss
except ImportError:
err_msg = "faiss is not installed. Please install it with `conda install -c pytorch faiss-cpu` or `conda install -c pytorch faiss-gpu`."
app.logger.error(err_msg)
raise ImportError(err_msg)
try:
from infinity_emb.log_handler import LOG_LEVELS
from infinity_emb import AsyncEngineArray, EngineArgs
except ImportError:
err_msg = "infinity_emb is not installed. Please install it with `pip install infinity-emb`."
app.logger.error(err_msg)
raise ImportError(err_msg)
self.faiss_use_gpu = faiss_use_gpu
app.logger.setLevel(LOG_LEVELS["warning"])
if cuda_devices is not None:
assert isinstance(cuda_devices, str), "cuda_devices should be a string"
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_devices
infinity_kwargs = infinity_kwargs or {}
self.model = AsyncEngineArray.from_args(
[EngineArgs(model_name_or_path=retriever_path, **infinity_kwargs)]
)[0]
self.contents = []
with jsonlines.open(corpus_path, mode="r") as reader:
self.contents = [item["contents"] for item in reader]
self.faiss_index = None
if index_path is not None and os.path.exists(index_path):
cpu_index = faiss.read_index(index_path)
if self.faiss_use_gpu:
co = faiss.GpuMultipleClonerOptions()
co.shard = True
co.useFloat16 = True
try:
self.faiss_index = faiss.index_cpu_to_all_gpus(cpu_index, co)
app.logger.info(f"Loaded index to GPU(s).")
except RuntimeError as e:
app.logger.error(
f"GPU index load failed: {e}. Falling back to CPU."
)
self.faiss_use_gpu = False
self.faiss_index = cpu_index
else:
self.faiss_index = cpu_index
app.logger.info("Loaded index on CPU.")
app.logger.info(f"Retriever index path has already been built")
else:
app.logger.warning(f"Cannot find path: {index_path}")
self.faiss_index = None
app.logger.info(f"Retriever initialized")
def retriever_init_openai(
self,
corpus_path: str,
openai_model: str,
api_base: str,
api_key: str,
):
if not openai_model:
raise ValueError("openai_model must be provided.")
if not api_base or not isinstance(api_base, str):
raise ValueError("api_base must be a non-empty string.")
if not api_key or not isinstance(api_key, str):
raise ValueError("api_key must be a non-empty string.")
self.contents = []
with jsonlines.open(corpus_path, mode="r") as reader:
self.contents = [item["contents"] for item in reader]
try:
self.openai_model = openai_model
self.client = AsyncOpenAI(base_url=api_base, api_key=api_key)
app.logger.info(
f"OpenAI client initialized with model '{openai_model}' and base '{api_base}'"
)
except OpenAIError as e:
app.logger.error(f"Failed to initialize OpenAI client: {e}")
def retriever_init_Milvus(
self,
corpus_path: str,
Milvus_host: str = "192.168.8.130",
Milvus_port: int = 19530,
collection_name: str = "ultrarag_collection_v3",
embedding_dim: int = 1024,
):
"""Initialize Milvus vector database connection.
Args:
corpus_path (str): Path to the corpus JSONL file (for reference only)
Milvus_host (str): Milvus server host
Milvus_port (int): Milvus server port
collection_name (str): Name of the existing collection to use
embedding_dim (int): Dimension of embeddings (for reference only)
Note:
This method assumes the collection already exists and is properly configured.
Use setup_Milvus_collection.py to create and configure collections.
"""
try:
from pyMilvus import connections, Collection, utility
except ImportError:
err_msg = "pyMilvus is not installed. Please install it with `pip install pyMilvus`."
app.logger.error(err_msg)
raise ImportError(err_msg)
# Initialize Alibaba Cloud client for embeddings
try:
from openai import AsyncOpenAI
except ImportError:
err_msg = "openai is not installed. Please install it with `pip install openai`."
app.logger.error(err_msg)
raise ImportError(err_msg)
# Set up Alibaba Cloud client for embeddings
self.alibaba_client = AsyncOpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="sk-xxxxxx"
)
self.alibaba_model = "text-embedding-v3"
# Load corpus data (for reference, not used in search)
self.contents = []
with jsonlines.open(corpus_path, mode="r") as reader:
self.contents = [item["contents"] for item in reader]
# Connect to Milvus
try:
connections.connect(
alias="default",
host=Milvus_host,
port=Milvus_port
)
app.logger.info(f"Connected to Milvus at {Milvus_host}:{Milvus_port}")
except Exception as e:
app.logger.error(f"Failed to connect to Milvus: {e}")
raise ConnectionError(f"Failed to connect to Milvus: {e}")
# Store Milvus configuration
self.Milvus_host = Milvus_host
self.Milvus_port = Milvus_port
self.collection_name = collection_name
self.embedding_dim = embedding_dim
# Connect to existing collection (must exist and be loaded)
if not utility.has_collection(collection_name):
raise ValueError(f"Collection '{collection_name}' does not exist. Please create it first using setup_Milvus_collection.py")
self.Milvus_collection = Collection(collection_name)
# Verify collection is loaded
load_state = utility.load_state(collection_name)
if load_state != "Loaded":
app.logger.warning(f"Collection '{collection_name}' is not loaded (state: {load_state}). Attempting to load...")
try:
self.Milvus_collection.load()
utility.wait_for_loading_complete(collection_name=collection_name, timeout=60)
app.logger.info(f"Successfully loaded collection '{collection_name}'")
except Exception as e:
raise RuntimeError(f"Failed to load collection '{collection_name}': {e}")
# Verify collection has data and indexes
entity_count = self.Milvus_collection.num_entities
if entity_count == 0:
app.logger.warning(f"Collection '{collection_name}' is empty")
else:
app.logger.info(f"Connected to collection '{collection_name}' with {entity_count} entities")
app.logger.info("Milvus retriever initialized successfully")
async def retriever_embed(
self,
embedding_path: Optional[str] = None,
overwrite: bool = False,
use_alibaba_cloud: bool = False,
alibaba_api_key: Optional[str] = None,
alibaba_model: str = "text-embedding-v3",
alibaba_endpoint: Optional[str] = None,
):
if embedding_path is not None:
if not embedding_path.endswith(".npy"):
err_msg = f"Embedding save path must end with .npy, now the path is {embedding_path}"
app.logger.error(err_msg)
raise ValidationError(err_msg)
output_dir = os.path.dirname(embedding_path)
else:
current_file = os.path.abspath(__file__)
project_root = os.path.dirname(os.path.dirname(current_file))
output_dir = os.path.join(project_root, "output", "embedding")
embedding_path = os.path.join(output_dir, "embedding.npy")
if not overwrite and os.path.exists(embedding_path):
app.logger.info("embedding already exists, skipping")
return
os.makedirs(output_dir, exist_ok=True)
if use_alibaba_cloud:
# Use Alibaba Cloud API for embeddings
if not alibaba_api_key or not alibaba_endpoint:
raise ValueError("Alibaba Cloud API key and endpoint must be provided")
client = AsyncOpenAI(base_url=alibaba_endpoint, api_key=alibaba_api_key)
async def alibaba_embed(texts):
embeddings = []
batch_size = 100 # Process in batches to avoid rate limits
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
try:
response = await client.embeddings.create(
input=batch, model=alibaba_model
)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
app.logger.info(f"Processed batch {i//batch_size + 1}/{(len(texts)-1)//batch_size + 1}")
except Exception as e:
app.logger.error(f"Error in Alibaba Cloud embedding batch {i//batch_size + 1}: {e}")
raise
return embeddings
embeddings = await alibaba_embed(self.contents)
app.logger.info("Alibaba Cloud embedding completed")
else:
# Use local model for embeddings
async with self.model:
embeddings, usage = await self.model.embed(sentences=self.contents)
embeddings = np.array(embeddings, dtype=np.float16)
np.save(embedding_path, embeddings)
app.logger.info("embedding success")
async def retriever_embed_openai(
self,
embedding_path: Optional[str] = None,
overwrite: bool = False,
):
if embedding_path is not None:
if not embedding_path.endswith(".npy"):
err_msg = f"Embedding save path must end with .npy, now the path is {embedding_path}"
app.logger.error(err_msg)
raise ValidationError(err_msg)
output_dir = os.path.dirname(embedding_path)
else:
current_file = os.path.abspath(__file__)
project_root = os.path.dirname(os.path.dirname(current_file))
output_dir = os.path.join(project_root, "output", "embedding")
embedding_path = os.path.join(output_dir, "embedding.npy")
if not overwrite and os.path.exists(embedding_path):
app.logger.info("embedding already exists, skipping")
os.makedirs(output_dir, exist_ok=True)
async def openai_embed(texts):
embeddings = []
for text in texts:
response = await self.client.embeddings.create(
input=text, model=self.openai_model
)
embeddings.append(response.data[0].embedding)
return embeddings
embeddings = await openai_embed(self.contents)
embeddings = np.array(embeddings, dtype=np.float16)
np.save(embedding_path, embeddings)
app.logger.info("embedding success")
def retriever_index(
self,
embedding_path: str,
index_path: Optional[str] = None,
overwrite: bool = False,
index_chunk_size: int = 50000,
):
"""
Build a Faiss index from an embedding matrix.
Args:
embedding_path (str): .npy file of shape (N, dim), dtype float32.
index_path (str, optional): where to save .index file.
overwrite (bool): overwrite existing index.
index_chunk_size (int): batch size for add_with_ids.
"""
try:
import faiss
except ImportError:
err_msg = "faiss is not installed. Please install it with `conda install -c pytorch faiss-cpu` or `conda install -c pytorch faiss-gpu`."
app.logger.error(err_msg)
raise ImportError(err_msg)
if not os.path.exists(embedding_path):
app.logger.error(f"Embedding file not found: {embedding_path}")
NotFoundError(f"Embedding file not found: {embedding_path}")
if index_path is not None:
if not index_path.endswith(".index"):
app.logger.error(
f"Parameter index_path must end with .index now is {index_path}"
)
ValidationError(
f"Parameter index_path must end with .index now is {index_path}"
)
output_dir = os.path.dirname(index_path)
else:
current_file = os.path.abspath(__file__)
project_root = os.path.dirname(os.path.dirname(current_file))
output_dir = os.path.join(project_root, "output", "index")
index_path = os.path.join(output_dir, "index.index")
if not overwrite and os.path.exists(index_path):
app.logger.info("Index already exists, skipping")
os.makedirs(output_dir, exist_ok=True)
embedding = np.load(embedding_path)
dim = embedding.shape[1]
vec_ids = np.arange(embedding.shape[0]).astype(np.int64)
# with cpu
cpu_flat = faiss.IndexFlatIP(dim)
cpu_index = faiss.IndexIDMap2(cpu_flat)
# chunk to write
total = embedding.shape[0]
for start in range(0, total, index_chunk_size):
end = min(start + index_chunk_size, total)
cpu_index.add_with_ids(embedding[start:end], vec_ids[start:end])
# with gpu
if self.faiss_use_gpu:
co = faiss.GpuMultipleClonerOptions()
co.shard = True
co.useFloat16 = True
try:
gpu_index = faiss.index_cpu_to_all_gpus(cpu_index, co)
index = gpu_index
app.logger.info("Using GPU for indexing with sharding")
except RuntimeError as e:
app.logger.warning(f"GPU indexing failed ({e}); fall back to CPU")
self.faiss_use_gpu = False
index = cpu_index
else:
index = cpu_index
# save
faiss.write_index(cpu_index, index_path)
if self.faiss_index is None:
self.faiss_index = index
app.logger.info("Indexing success")
def retriever_index_lancedb(
self,
embedding_path: str,
lancedb_path: str,
table_name: str,
overwrite: bool = False,
):
"""
Build a Faiss index from an embedding matrix.
Args:
embedding_path (str): .npy file of shape (N, dim), dtype float32.
lancedb_path (str): directory path to store LanceDB tables.
table_name (str): the name of the LanceDB table.
overwrite (bool): overwrite existing index.
"""
try:
import lancedb
except ImportError:
err_msg = "lancedb is not installed. Please install it with `pip install lancedb`."
app.logger.error(err_msg)
raise ImportError(err_msg)
if not os.path.exists(embedding_path):
app.logger.error(f"Embedding file not found: {embedding_path}")
NotFoundError(f"Embedding file not found: {embedding_path}")
if lancedb_path is None:
current_file = os.path.abspath(__file__)
project_root = os.path.dirname(os.path.dirname(current_file))
lancedb_path = os.path.join(project_root, "output", "lancedb")
os.makedirs(lancedb_path, exist_ok=True)
db = lancedb.connect(lancedb_path)
if table_name in db.table_names() and not overwrite:
info_msg = f"LanceDB table '{table_name}' already exists, skipping"
app.logger.info(info_msg)
return {"status": info_msg}
elif table_name in db.table_names() and overwrite:
import shutil
shutil.rmtree(os.path.join(lancedb_path, table_name))
app.logger.info(f"Overwriting LanceDB table '{table_name}'")
embedding = np.load(embedding_path)
ids = [str(i) for i in range(len(embedding))]
data = [{"id": i, "vector": v} for i, v in zip(ids, embedding)]
df = pd.DataFrame(data)
db.create_table(table_name, data=df)
app.logger.info("LanceDB indexing success")
# Note: retriever_index_Milvus method has been removed
# Collection creation and indexing is now handled by setup_Milvus_collection.py
# This simplifies the retriever logic and separates concerns
async def retriever_search(
self,
query_list: List[str],
top_k: int = 5,
query_instruction: str = "",
use_openai: bool = False,
) -> Dict[str, List[List[str]]]:
if isinstance(query_list, str):
query_list = [query_list]
queries = [f"{query_instruction}{query}" for query in query_list]
if use_openai:
async def openai_embed(texts):
embeddings = []
for text in texts:
response = await self.client.embeddings.create(
input=text, model=self.openai_model
)
embeddings.append(response.data[0].embedding)
return embeddings
query_embedding = await openai_embed(queries)
else:
async with self.model:
query_embedding, usage = await self.model.embed(sentences=queries)
query_embedding = np.array(query_embedding, dtype=np.float16)
app.logger.info("query embedding finish")
scores, ids = self.faiss_index.search(query_embedding, top_k)
rets = []
for i, query in enumerate(query_list):
cur_ret = []
for _, id in enumerate(ids[i]):
cur_ret.append(self.contents[id])
rets.append(cur_ret)
app.logger.debug(f"ret_psg: {rets}")
return {"ret_psg": rets}
async def retriever_search_Milvus(
self,
query_list: List[str],
top_k: int = 5,
query_instruction: str = "",
use_openai: bool = False,
) -> Dict[str, List[List[str]]]:
"""
Search in Milvus vector database.
Args:
query_list (List[str]): List of query strings
top_k (int): Number of top results to return
query_instruction (str): Instruction to prepend to queries
use_openai (bool): Whether to use OpenAI for embedding
Returns:
Dict[str, List[List[str]]]: Search results
"""
try:
from pyMilvus import connections, Collection
except ImportError:
err_msg = "pyMilvus is not installed. Please install it with `pip install pyMilvus`."
app.logger.error(err_msg)
raise ImportError(err_msg)
if isinstance(query_list, str):
query_list = [query_list]
queries = [f"{query_instruction}{query}" for query in query_list]
# Generate query embeddings
if use_openai:
async def openai_embed(texts):
embeddings = []
for text in texts:
response = await self.client.embeddings.create(
input=text, model=self.openai_model
)
embeddings.append(response.data[0].embedding)
return embeddings
query_embedding = await openai_embed(queries)
else:
# Use Alibaba Cloud API for embeddings
async def alibaba_embed(texts):
embeddings = []
for text in texts:
response = await self.alibaba_client.embeddings.create(
input=text, model=self.alibaba_model
)
embeddings.append(response.data[0].embedding)
return embeddings
query_embedding = await alibaba_embed(queries)
query_embedding = np.array(query_embedding, dtype=np.float32)
app.logger.info("Query embedding finished")
# Ensure collection is loaded before search
try:
if not self.Milvus_collection.has_index():
app.logger.warning("Collection has no index, search may be slow")
# Always load collection before search to ensure it's available
app.logger.debug("Loading collection for search...")
self.Milvus_collection.load()
app.logger.debug("Collection loaded successfully")
except Exception as load_error:
app.logger.error(f"Failed to load collection: {load_error}")
return {"ret_psg": [[]] * len(query_list)}
# Search in Milvus
search_params = {
"metric_type": "IP",
"params": {"nprobe": 10}
}
rets = []
for i, query_vec in enumerate(query_embedding):
try:
# Perform search with proper error handling
results = self.Milvus_collection.search(
data=[query_vec.tolist()],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=["text"],
expr=None # Explicitly set no filter expression
)
# Extract results with null checks
cur_ret = []
for hit in results[0]:
text_content = hit.entity.get("text")
if text_content is not None:
cur_ret.append(text_content)
else:
app.logger.warning(f"Found null text content in search result")
rets.append(cur_ret)
except Exception as e:
app.logger.error(f"Milvus search failed for query {i}: {e}")
# Return empty result for failed query
rets.append([])
app.logger.debug(f"ret_psg: {rets}")
return {"ret_psg": rets}
async def retriever_search_lancedb(
self,
query_list: List[str],
top_k: Optional[int] | None = None,
query_instruction: str = "",
use_openai: bool = False,
lancedb_path: str = "",
table_name: str = "",
filter_expr: Optional[str] = None,
) -> Dict[str, List[List[str]]]:
try:
import lancedb
except ImportError:
err_msg = "lancedb is not installed. Please install it with `pip install lancedb`."
app.logger.error(err_msg)
raise ImportError(err_msg)
if isinstance(query_list, str):
query_list = [query_list]
queries = [f"{query_instruction}{query}" for query in query_list]
if use_openai:
async def openai_embed(texts):
embeddings = []
for text in texts:
response = await self.client.embeddings.create(
input=text, model=self.openai_model
)
embeddings.append(response.data[0].embedding)
return embeddings
query_embedding = await openai_embed(queries)
else:
async with self.model:
query_embedding, usage = await self.model.embed(sentences=queries)
query_embedding = np.array(query_embedding, dtype=np.float16)
app.logger.info("query embedding finish")
rets = []
if not lancedb_path:
NotFoundError(f"`lancedb_path` must be provided.")
db = lancedb.connect(lancedb_path)
self.lancedb_table = db.open_table(table_name)
for i, query_vec in enumerate(query_embedding):
q = self.lancedb_table.search(query_vec).limit(top_k)
if filter_expr:
q = q.where(filter_expr)
df = q.to_df()
cur_ret = []
for id_str in df["id"]:
id_int = int(id_str)
cur_ret.append(self.contents[id_int])
rets.append(cur_ret)
app.logger.debug(f"ret_psg: {rets}")
return {"ret_psg": rets}
async def retriever_deploy_service(
self,
retriever_url: str,
):
# Ensure URL is valid, adding "http://" prefix if necessary
retriever_url = retriever_url.strip()
if not retriever_url.startswith("http://") and not retriever_url.startswith(
"https://"
):
retriever_url = f"http://{retriever_url}"
url_obj = urlparse(retriever_url)
retriever_host = url_obj.hostname
retriever_port = (
url_obj.port if url_obj.port else 8080
) # Default port if none provided
@retriever_app.route("/search", methods=["POST"])
async def deploy_retrieval_model():
data = request.get_json()
query_list = data["query_list"]
top_k = data["top_k"]
async with self.model:
query_embedding, _ = await self.model.embed(sentences=query_list)
query_embedding = np.array(query_embedding, dtype=np.float16)
_, ids = self.faiss_index.search(query_embedding, top_k)
rets = []
for i, _ in enumerate(query_list):
cur_ret = []
for _, id in enumerate(ids[i]):
cur_ret.append(self.contents[id])
rets.append(cur_ret)
return jsonify({"ret_psg": rets})
retriever_app.run(host=retriever_host, port=retriever_port)
app.logger.info(f"employ embedding server at {retriever_url}")
async def retriever_deploy_search(
self,
retriever_url: str,
query_list: List[str],
top_k: Optional[int] | None = None,
query_instruction: str = "",
):
# Validate the URL format
url = retriever_url.strip()
if not url.startswith("http://") and not url.startswith("https://"):
url = f"http://{url}"
url_obj = urlparse(url)
api_url = urlunparse(url_obj._replace(path="/search"))
app.logger.info(f"Calling url: {api_url}")
if isinstance(query_list, str):
query_list = [query_list]
query_list = [f"{query_instruction}{query}" for query in query_list]
payload = {"query_list": query_list}
if top_k is not None:
payload["top_k"] = top_k
async with aiohttp.ClientSession() as session:
async with session.post(
api_url,
json=payload,
) as response:
if response.status == 200:
response_data = await response.json()
app.logger.debug(
f"status_code: {response.status}, response data: {response_data}"
)
return response_data
else:
err_msg = (
f"Failed to call {retriever_url} with code {response.status}"
)
app.logger.error(err_msg)
raise ToolError(err_msg)
async def retriever_exa_search(
self,
query_list: List[str],
top_k: Optional[int] | None = None,
) -> dict[str, List[List[str]]]:
try:
from exa_py import AsyncExa
from exa_py.api import Result
except ImportError:
err_msg = (
"exa_py is not installed. Please install it with `pip install exa_py`."
)
app.logger.error(err_msg)
raise ImportError(err_msg)
exa_api_key = os.environ.get("EXA_API_KEY", "")
exa = AsyncExa(api_key=exa_api_key if exa_api_key else "EMPTY")
sem = asyncio.Semaphore(16)
async def call_with_retry(
idx: int, q: str, retries: int = 3, delay: float = 1.0
):
async with sem:
for attempt in range(retries):
try:
resp = await exa.search_and_contents(
q,
num_results=top_k,
text=True,
)
results: List[Result] = getattr(resp, "results", []) or []
psg_ls: List[str] = [(r.text or "") for r in results]
return idx, psg_ls
except Exception as e:
status = getattr(
getattr(e, "response", None), "status_code", None
)
if status == 401 or "401" in str(e):
raise RuntimeError(
"Unauthorized (401): Access denied by Exa API. "
"Invalid or missing EXA_API_KEY."
) from e
app.logger.warning(
f"[Retry {attempt+1}] EXA failed (idx={idx}): {e}"
)
await asyncio.sleep(delay)
return idx, []
tasks = [
asyncio.create_task(call_with_retry(i, q)) for i, q in enumerate(query_list)
]
ret: List[List[str]] = [None] * len(query_list)
iterator = tqdm(
asyncio.as_completed(tasks), total=len(tasks), desc="EXA Searching: "
)
for fut in iterator:
idx, psg_ls = await fut
ret[idx] = psg_ls
return {"ret_psg": ret}
async def retriever_tavily_search(
self,
query_list: List[str],
top_k: Optional[int] | None = None,
) -> dict[str, List[List[str]]]:
try:
from tavily import (
AsyncTavilyClient,
BadRequestError,
UsageLimitExceededError,
InvalidAPIKeyError,
MissingAPIKeyError,
)
except ImportError:
err_msg = "tavily is not installed. Please install it with `pip install tavily-python`."
app.logger.error(err_msg)
raise ImportError(err_msg)
tavily_api_key = os.environ.get("TAVILY_API_KEY", "")
if not tavily_api_key:
raise MissingAPIKeyError(
"TAVILY_API_KEY environment variable is not set. Please set it to use Tavily."
)
tavily = AsyncTavilyClient(api_key=tavily_api_key)
sem = asyncio.Semaphore(16)
async def call_with_retry(
idx: int, q: str, retries: int = 3, delay: float = 1.0
):
async with sem:
for attempt in range(retries):
try:
resp = await tavily.search(
query=q,
max_results=top_k,
)
results: List[Dict[str, Any]] = resp["results"]
psg_ls: List[str] = [(r["content"] or "") for r in results]
return idx, psg_ls
except UsageLimitExceededError as e:
app.logger.error(f"Usage limit exceeded: {e}")
raise ToolError(f"Usage limit exceeded: {e}") from e
except InvalidAPIKeyError as e:
app.logger.error(f"Invalid API key: {e}")
raise ToolError(f"Invalid API key: {e}") from e
except (BadRequestError, Exception) as e:
app.logger.warning(
f"[Retry {attempt+1}] Tavily failed (idx={idx}): {e}"
)
await asyncio.sleep(delay)
return idx, []
tasks = [
asyncio.create_task(call_with_retry(i, q)) for i, q in enumerate(query_list)
]
ret: List[List[str]] = [None] * len(query_list)
iterator = tqdm(
asyncio.as_completed(tasks), total=len(tasks), desc="Tavily Searching: "
)
for fut in iterator:
idx, psg_ls = await fut
ret[idx] = psg_ls
return {"ret_psg": ret}
if __name__ == "__main__":
Retriever(app)
app.run(transport="stdio")
3.2 定义parameter配置文件
说明: 配置参数设置文件
vim parameter.yaml
# servers/retriever/parameter.yaml
retriever_path: openbmb/MiniCPM-Embedding-Light
corpus_path: UltraRAG/data/Milvus_faq_corpus.jsonl
embedding_path: embedding/embedding.npy
index_path: index/index.index
# infinify_emb config
infinity_kwargs:
bettertransformer: false
pooling_method: auto
device: cuda
batch_size: 1024
cuda_devices: "0,1"
query_instruction: "Query: "
faiss_use_gpu: True
top_k: 5
overwrite: false
retriever_url: http://localhost:8080
index_chunk_size: 50000
# OpenAI API configuration (if used)
use_openai: false
openai_model: "embedding"
api_base: ""
api_key: ""
# Alibaba Cloud API configuration (alternative to local embedding)
use_alibaba_cloud: true
alibaba_api_key: "sk-xxxxxxx" # Your Alibaba Cloud API key
alibaba_model: "embedding" # Alibaba Cloud embedding model
alibaba_endpoint: "https://dashscope.aliyuncs.com/compatible-mode/v1" # Alibaba Cloud endpoint
# LanceDB configuration (if used)
lancedb_path: "lancedb/"
table_name: "vector_index"
filter_expr: null
# Milvus configuration (if used)
use_Milvus: true
Milvus_host: "192.168.8.130"
Milvus_port: 19530
collection_name: "ultrarag_collection_v3"
embedding_dim: 1024
3.3 定义server配置文件
说明:集成阿里云API功能
vim rag_Milvus_faq_server.yaml
benchmark:
parameter: /root/ultraRAG/UltraRAG/servers/benchmark/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/benchmark/src/benchmark.py
tools:
get_data:
input:
benchmark: $benchmark
output:
- q_ls
- gt_ls
custom:
parameter: /root/ultraRAG/UltraRAG/servers/custom/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/custom/src/custom.py
tools:
output_extract_from_boxed:
input:
ans_ls: ans_ls
output:
- pred_ls
evaluation:
parameter: /root/ultraRAG/UltraRAG/servers/evaluation/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/evaluation/src/evaluation.py
tools:
evaluate:
input:
gt_ls: gt_ls
metrics: $metrics
pred_ls: pred_ls
save_path: $save_path
output:
- eval_res
generation:
parameter: /root/ultraRAG/UltraRAG/servers/generation/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/generation/src/generation.py
tools:
generate:
input:
base_url: $base_url
model_name: $model_name
prompt_ls: prompt_ls
sampling_params: $sampling_params
api_key: $api_key
output:
- ans_ls
prompt:
parameter: /root/ultraRAG/UltraRAG/servers/prompt/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/prompt/src/prompt.py
prompts:
qa_rag_boxed:
input:
q_ls: q_ls
ret_psg: ret_psg
template: $template
output:
- prompt_ls
retriever:
parameter: /root/ultraRAG/UltraRAG/servers/retriever/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/retriever/src/retriever.py
tools:
retriever_init_Milvus:
input:
collection_name: $collection_name
corpus_path: $corpus_path
embedding_dim: $embedding_dim
Milvus_host: $Milvus_host
Milvus_port: $Milvus_port
retriever_search_Milvus:
input:
query_instruction: $query_instruction
query_list: q_ls
top_k: $top_k
use_openai: $use_openai
output:
- ret_psg
3.4.定义索引构建(Build Index)
说明:将文档语料库转换为向量并存储到 Milvus 数据库中 需要配置的关键参数
vim Milvus_index_parameter.yaml
retriever:
alibaba_api_key: sk-xxxxxxx
alibaba_endpoint: https://dashscope.aliyuncs.com/compatible-mode/v1
alibaba_model: text-embedding-v3
collection_name: ultrarag_collection_v3
corpus_path: data/corpus_example.jsonl
embedding_dim: 1024
embedding_path: embedding/embedding.npy
Milvus_host: 192.168.8.130
Milvus_port: 19530
overwrite: false
use_alibaba_cloud: true
vim mivus_index.yaml
# Milvus Index Building Configuration
# Build vector index using Milvus database
# Note: This configuration is now deprecated. Use setup_Milvus_collection.py instead.
# MCP Server
servers:
retriever: servers/retriever
# Parameter Configuration
parameter_config: examples/parameter/Milvus_index_parameter.yaml
# MCP Client Pipeline
# Updated pipeline for new architecture
pipeline:
- retriever.retriever_init_Milvus # Connect to existing Milvus collection
- retriever.retriever_embed # Generate embeddings (if needed)
# Note: Index building is now handled by setup_Milvus_collection.py
# The collection ultrarag_collection_v3 should already exist with proper indexing
3.5.定义RAG 查询(Run RAG)
说明:执行完整的 RAG 流程,包括检索、生成答案、评估等 需要配置的关键参数
vim rag_Milvus_faq_server
benchmark:
parameter: /root/ultraRAG/UltraRAG/servers/benchmark/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/benchmark/src/benchmark.py
tools:
get_data:
input:
benchmark: $benchmark
output:
- q_ls
- gt_ls
custom:
parameter: /root/ultraRAG/UltraRAG/servers/custom/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/custom/src/custom.py
tools:
output_extract_from_boxed:
input:
ans_ls: ans_ls
output:
- pred_ls
evaluation:
parameter: /root/ultraRAG/UltraRAG/servers/evaluation/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/evaluation/src/evaluation.py
tools:
evaluate:
input:
gt_ls: gt_ls
metrics: $metrics
pred_ls: pred_ls
save_path: $save_path
output:
- eval_res
generation:
parameter: /root/ultraRAG/UltraRAG/servers/generation/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/generation/src/generation.py
tools:
generate:
input:
base_url: $base_url
model_name: $model_name
prompt_ls: prompt_ls
sampling_params: $sampling_params
api_key: $api_key
output:
- ans_ls
prompt:
parameter: /root/ultraRAG/UltraRAG/servers/prompt/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/prompt/src/prompt.py
prompts:
qa_rag_boxed:
input:
q_ls: q_ls
ret_psg: ret_psg
template: $template
output:
- prompt_ls
retriever:
parameter: /root/ultraRAG/UltraRAG/servers/retriever/parameter.yaml
path: /root/ultraRAG/UltraRAG/servers/retriever/src/retriever.py
tools:
retriever_init_Milvus:
input:
collection_name: $collection_name
corpus_path: $corpus_path
embedding_dim: $embedding_dim
Milvus_host: $Milvus_host
Milvus_port: $Milvus_port
retriever_search_Milvus:
input:
query_instruction: $query_instruction
query_list: q_ls
top_k: $top_k
use_openai: $use_openai
output:
- ret_psg
vim rag_Milvus_faq.yaml
# Milvus RAG FAQ Demo
# Complete RAG pipeline using Milvus vector database with FAQ dataset
# MCP Server Configuration
servers:
benchmark: UltraRAG/servers/benchmark
retriever: UltraRAG/servers/retriever
prompt: UltraRAG/servers/prompt
generation: UltraRAG/servers/generation
evaluation: UltraRAG/servers/evaluation
custom: UltraRAG/servers/custom
# Parameter Configuration
parameter_config: examples/parameter/rag_Milvus_faq_parameter.yaml
# MCP Client Pipeline
# Sequential execution: data -> init -> search -> prompt -> generate -> extract -> evaluate
pipeline:
- benchmark.get_data
- retriever.retriever_init_Milvus
- retriever.retriever_search_Milvus
- prompt.qa_rag_boxed
- generation.generate
- custom.output_extract_from_boxed
- evaluation.evaluate
3.6 执行build索引
说明:运行成功后,就会得到对应的语料向量和索引文件,后续 RAG Pipeline 就可以直接使用它们来完成检索。
ultrarag build examples/Milvus_index.yaml
ultrarag run examples/Milvus_index.yaml
3.7 执行RAG查询
说明:搭建并运行一个完整的 RAG Pipeline
ultrarag build examples/rag_Milvus.yaml
ultrarag run examples/rag_Milvus.yaml
03
写在最后
传统的RAG,Pipeline需要至少几百甚至上千行代码,UltraRAG 2.0 只要几十行代码就能搞定类似的功能,甚至其中一半都是编排的 Yaml 伪代码,极大降低了很多企业级RAG落地的门槛。
项目地址:
https://github.com/OpenBMB/UltraRAG
相关文章
- MyBatis如何实现分页查询?_mybatis collection分页查询
- 通过Mybatis Plus实现代码生成器,常见接口实现讲解
- MyBatis-Plus 日常使用指南_mybatis-plus用法
- 聊聊:Mybatis-Plus 新增获取自增列id,这一次帮你总结好
- MyBatis-Plus码之重器 lambda 表达式使用指南,开发效率瞬间提升80%
- Spring Boot整合MybatisPlus和Druid
- mybatis 代码生成插件free-idea-mybatis、mybatisX
- mybatis-plus 团队新作 mybatis-mate 轻松搞定企业级数据处理
- Maven 依赖范围(scope) 和 可选依赖(optional)
- Trace Sql:打通全链路日志最后一里路