The Knowledge Engine component allows plugins to provide full knowledge base indexing and retrieval capabilities for LangBot. When users create a knowledge base in LangBot, they can choose a Knowledge Engine provided by a plugin to manage document ingestion, retrieval, and deletion. Plugins can also bridge advanced knowledge base services such as Dify, RAGFlow, FastGPT, and more.
Adding a Knowledge Engine Component
A single plugin can add any number of Knowledge Engines. Execute the command lbp comp KnowledgeEngine in the plugin directory and follow the prompts to enter the Knowledge Engine configuration.
➜ MyRAGPlugin > lbp comp KnowledgeEngine
Generating component KnowledgeEngine...
Knowledge Engine name: simple_rag
Knowledge Engine description: A simple Knowledge Engine with built-in chunking and retrieval
Component KnowledgeEngine generated successfully.
This will generate simple_rag.yaml and simple_rag.py files in the components/knowledge_engine/ directory. The .yaml file defines the Knowledge Engine’s basic information and configuration schemas, and the .py file is the handler for this engine:
➜ MyRAGPlugin > tree
...
├── components
│ ├── __init__.py
│ └── knowledge_engine
│ ├── __init__.py
│ ├── simple_rag.py
│ └── simple_rag.yaml
...
Manifest File: Knowledge Engine
apiVersion: v1 # Do not modify
kind: KnowledgeEngine # Do not modify
metadata:
name: simple_rag # Knowledge Engine name, used to identify this engine
label:
en_US: Simple Knowledge Engine # Engine display name, shown in LangBot's UI, supports multiple languages
zh_Hans: 简易 RAG 引擎
ja_JP: シンプル RAG エンジン
description:
en_US: 'A simple Knowledge Engine with built-in chunking and retrieval'
zh_Hans: '内置分块和检索能力的简易 RAG 引擎'
ja_JP: 'チャンキングと検索を内蔵したシンプルなRAGエンジン'
spec:
creation_schema: # Configuration parameters users fill in when creating a knowledge base
- name: chunk_size
label:
en_US: Chunk Size
zh_Hans: 分块大小
type: integer
required: false
default: 500
- name: chunk_overlap
label:
en_US: Chunk Overlap
zh_Hans: 分块重叠
type: integer
required: false
default: 50
retrieval_schema: # Configurable parameters during retrieval
- name: score_threshold
label:
en_US: Score Threshold
zh_Hans: 分数阈值
type: float
required: false
default: 0.5
execution:
python:
path: simple_rag.py # Engine handler, do not modify
attr: SimpleRag # Class name of the engine handler, consistent with the class name in simple_rag.py
For configuration item format reference, see: Plugin Manifest Configuration Format
creation_schema and retrieval_schema
Unlike the old KnowledgeRetriever component which used a single spec.config, KnowledgeEngine uses two separate schemas:
- creation_schema: Parameters users fill in when creating a knowledge base. These are passed to the plugin via
creation_settings during ingestion and retrieval.
- retrieval_schema: Parameters users can adjust when querying the knowledge base. These are passed to the plugin via
retrieval_settings.
Capability Declaration
KnowledgeEngine can declare its supported capabilities. LangBot uses these capability declarations to determine UI behavior and available operations:
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [
KnowledgeEngineCapability.DOC_INGESTION, # Supports document upload and processing
]
Available capability constants:
| Capability | Description |
|---|
DOC_INGESTION | Supports document upload and processing. When declared, LangBot shows a “Documents” tab in the knowledge base details. If the plugin is used to bridge other knowledge base services or does not require users to upload documents, this capability declaration can be omitted. |
DOC_PARSING | Supports native document parsing (file-to-text extraction). When declared, users can choose to use the Knowledge Engine’s built-in parsing when uploading documents, without needing an external Parser plugin. If not declared, an external Parser plugin must be installed to upload documents. |
Other retrieval behaviors (such as reranking, hybrid search, etc.) are controlled by the plugin’s retrieval_schema and do not need capability flags.
Plugin Handler
The following code will be generated by default (components/knowledge_engine/<engine_name>.py). You need to implement the three core methods: ingest, retrieve, and delete_document. Complete code can be found in the SimpleKnowledgeEngine example in langbot-plugin-demo.
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
from langbot_plugin.api.entities.builtin.rag.models import (
IngestionContext,
IngestionResult,
)
from langbot_plugin.api.entities.builtin.rag.context import (
RetrievalContext,
RetrievalResponse,
RetrievalResultEntry,
)
from langbot_plugin.api.entities.builtin.rag.enums import DocumentStatus
from langbot_plugin.api.entities.builtin.provider.message import ContentElement
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [KnowledgeEngineCapability.DOC_INGESTION]
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""Callback when a knowledge base is created, can be used to initialize resources"""
pass
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""Callback when a knowledge base is deleted, can be used to clean up resources"""
pass
async def ingest(self, context: IngestionContext) -> IngestionResult:
"""Ingest a document into the knowledge base"""
# 1. Get file content
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
# 2. Parse document and split into chunks
text = file_bytes.decode('utf-8')
chunk_size = context.creation_settings.get('chunk_size', 500)
chunk_overlap = context.creation_settings.get('chunk_overlap', 50)
chunks = self._split_text(text, chunk_size, chunk_overlap)
# 3. Generate embeddings using host embedding model
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
vectors = await self.plugin.invoke_embedding(embedding_model_uuid, chunks)
# 4. Write to host vector database
collection_id = context.get_collection_id()
ids = [f"{context.file_object.metadata.document_id}_{i}" for i in range(len(chunks))]
metadata = [{"document_id": context.file_object.metadata.document_id, "chunk_index": i, "text": chunks[i]} for i in range(len(chunks))]
await self.plugin.vector_upsert(collection_id, vectors, ids, metadata)
return IngestionResult(
document_id=context.file_object.metadata.document_id,
status=DocumentStatus.COMPLETED,
chunks_created=len(chunks),
)
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
"""Retrieve relevant content from the knowledge base"""
# 1. Generate query vector
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
query_vectors = await self.plugin.invoke_embedding(embedding_model_uuid, [context.query])
query_vector = query_vectors[0]
# 2. Vector search
collection_id = context.get_collection_id()
results = await self.plugin.vector_search(collection_id, query_vector, top_k=context.retrieval_settings.get('top_k', 5))
# 3. Convert to retrieval results
entries = []
for r in results:
entry = RetrievalResultEntry(
id=r.get('id', ''),
content=[ContentElement.from_text(r.get('metadata', {}).get('text', ''))],
metadata=r.get('metadata', {}),
distance=r.get('score', 0.0),
)
entries.append(entry)
return RetrievalResponse(
results=entries,
total_found=len(entries),
metadata={},
)
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""Delete a document and its associated data from the knowledge base"""
collection_id = kb_id
deleted = await self.plugin.vector_delete(collection_id, file_ids=[document_id])
return deleted > 0
Lifecycle Hooks
KnowledgeEngine provides two lifecycle hooks that are called when knowledge bases are created and deleted:
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""Callback when a knowledge base using this engine is created
Args:
kb_id: Knowledge base UUID
config: User-provided configuration (fields from creation_schema)
"""
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""Callback when a knowledge base using this engine is deleted
Args:
kb_id: Knowledge base UUID
"""
Document Ingestion
The ingest method is called when a user uploads a document to the knowledge base:
async def ingest(self, context: IngestionContext) -> IngestionResult:
IngestionContext contains the following information:
class IngestionContext(pydantic.BaseModel):
file_object: FileObject # File object to ingest
knowledge_base_id: str # Target knowledge base ID
collection_id: str | None # Vector collection ID (falls back to knowledge_base_id)
creation_settings: dict # Configuration from knowledge base creation
parsed_content: ParseResult | None # Pre-parsed content from external Parser plugin (if any)
FileObject contains file metadata:
class FileObject(pydantic.BaseModel):
metadata: FileMetadata # Contains filename, file_size, mime_type, document_id, knowledge_base_id, etc.
storage_path: str # File path in the storage system
IngestionResult should return the ingestion result:
class IngestionResult(pydantic.BaseModel):
document_id: str # Document ID
status: DocumentStatus # Processing status: COMPLETED / FAILED
chunks_created: int = 0 # Number of chunks created
error_message: str | None = None # Error message on failure
metadata: dict = {} # Additional metadata
Knowledge Retrieval
The retrieve method is called when the knowledge base is queried:
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
RetrievalContext contains the following information:
class RetrievalContext(pydantic.BaseModel):
query: str # Query text
knowledge_base_id: str | None # Knowledge base ID
collection_id: str | None # Vector collection ID
retrieval_settings: dict # Retrieval configuration (fields from retrieval_schema)
creation_settings: dict # Knowledge base creation configuration
filters: dict # Metadata filter conditions (Chroma-style where syntax)
The LangBot host automatically injects a default top_k value (default 5) into retrieval_settings before passing it to the plugin. Plugins can access it via context.retrieval_settings.get('top_k', 5).The filters field contains Chroma-style where filter conditions extracted from retrieval_settings. When the caller provides filters in the retrieval settings (e.g., filtering by time range, file type, or custom metadata fields), the host populates this field so plugins can apply them during retrieval. If no filters are provided, this field is an empty dict.
RetrievalResponse should return the retrieval results:
class RetrievalResponse(pydantic.BaseModel):
results: list[RetrievalResultEntry] # List of retrieval results
total_found: int # Total number of matches
metadata: dict # Additional metadata
RetrievalResultEntry represents a single retrieval result:
class RetrievalResultEntry(pydantic.BaseModel):
id: str # Result ID
content: list[ContentElement] # Content, create using ContentElement.from_text()
metadata: dict # Metadata
distance: float # Distance score (smaller is more relevant)
score: float | None = None # Similarity score (larger is more relevant)
Document Deletion
The delete_document method is called when a user deletes a document from the knowledge base:
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""Delete a document and its associated data
Args:
kb_id: Knowledge base ID
document_id: Document ID
Returns:
Whether the deletion was successful
"""
Host RAG APIs
KnowledgeEngine components can call LangBot host-provided RAG APIs via self.plugin, including embedding model invocation, vector database operations, and file retrieval.
Invoke Embedding Model
async def invoke_embedding(
self,
embedding_model_uuid: str,
texts: list[str],
) -> list[list[float]]:
"""Generate embeddings using host's embedding model
Args:
embedding_model_uuid: Embedding model UUID
texts: List of texts to embed
Returns:
List of embedding vectors, one per input text
"""
# Usage example
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])
Vector Upsert
async def vector_upsert(
self,
collection_id: str,
vectors: list[list[float]],
ids: list[str],
metadata: list[dict] | None = None,
) -> None:
"""Upsert vectors to host's vector database
Args:
collection_id: Target collection ID
vectors: List of vectors
ids: List of unique IDs for vectors
metadata: Optional list of metadata dicts
"""
# Usage example
await self.plugin.vector_upsert(
collection_id="kb_uuid",
vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
ids=["chunk_0", "chunk_1"],
metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
)
Vector Search
async def vector_search(
self,
collection_id: str,
query_vector: list[float],
top_k: int = 5,
filters: dict | None = None,
) -> list[dict]:
"""Search similar vectors in host's vector database
Args:
collection_id: Target collection ID
query_vector: Query vector for similarity search
top_k: Number of results to return
filters: Optional metadata filters
Returns:
List of search results (dict with id, score, metadata, etc.)
"""
# Usage example
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
top_k=5,
)
# Return format: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
Each result returned by vector_search is a dict containing id (vector ID), score (distance score), and metadata (metadata provided during upsert). If you need text content in retrieval results, store the text in metadata during ingestion.
Vector Delete
async def vector_delete(
self,
collection_id: str,
file_ids: list[str] | None = None,
filters: dict | None = None,
) -> int:
"""Delete vectors from host's vector database
Args:
collection_id: Target collection ID
file_ids: File IDs whose vectors should be deleted
filters: Optional metadata filters for deletion
Returns:
Number of deleted items
"""
# Usage example
deleted = await self.plugin.vector_delete(
collection_id="kb_uuid",
file_ids=["doc_001"],
)
The filters parameter supports Chroma-style where syntax for metadata filtering. Multiple top-level keys are AND-ed. Supported operators: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin. Example: {"file_id": {"$eq": "abc"}}.Note: Chroma, Qdrant, and SeekDB store full metadata and can filter on any field. Milvus and pgvector only store text, file_id, and chunk_uuid — filters on other fields will be silently ignored.
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
"""Get file content from host storage
Args:
storage_path: File storage path (from FileObject.storage_path)
Returns:
File content as bytes
"""
# Usage example
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
Testing the Knowledge Engine
After creation, execute the command lbp run in the plugin directory to start debugging. Then in LangBot:
- Go to the “Knowledge Base” page
- Click “Create Knowledge Base”
- Select the Knowledge Engine provided by your plugin and fill in the configuration based on the engine’s
creation_schema
- After creation, upload documents to test ingestion capabilities (if the engine declares
DOC_INGESTION capability)
- Bind the knowledge base to a pipeline and test retrieval capabilities