Skip to main content
The Knowledge Engine component allows plugins to provide full knowledge base indexing and retrieval capabilities for LangBot. When users create a knowledge base in LangBot, they can choose a Knowledge Engine provided by a plugin to manage document ingestion, retrieval, and deletion. Plugins can also bridge advanced knowledge base services such as Dify, RAGFlow, FastGPT, and more.

Adding a Knowledge Engine Component

A single plugin can add any number of Knowledge Engines. Execute the command lbp comp KnowledgeEngine in the plugin directory and follow the prompts to enter the Knowledge Engine configuration.
  MyRAGPlugin > lbp comp KnowledgeEngine
Generating component KnowledgeEngine...
Knowledge Engine name: simple_rag
Knowledge Engine description: A simple Knowledge Engine with built-in chunking and retrieval
Component KnowledgeEngine generated successfully.
This will generate simple_rag.yaml and simple_rag.py files in the components/knowledge_engine/ directory. The .yaml file defines the Knowledge Engine’s basic information and configuration schemas, and the .py file is the handler for this engine:
  MyRAGPlugin > tree
...
├── components
   ├── __init__.py
   └── knowledge_engine
       ├── __init__.py
       ├── simple_rag.py
       └── simple_rag.yaml
...

Manifest File: Knowledge Engine

apiVersion: v1  # Do not modify
kind: KnowledgeEngine  # Do not modify
metadata:
  name: simple_rag  # Knowledge Engine name, used to identify this engine
  label:
    en_US: Simple Knowledge Engine  # Engine display name, shown in LangBot's UI, supports multiple languages
    zh_Hans: 简易 RAG 引擎
    ja_JP: シンプル RAG エンジン
  description:
    en_US: 'A simple Knowledge Engine with built-in chunking and retrieval'
    zh_Hans: '内置分块和检索能力的简易 RAG 引擎'
    ja_JP: 'チャンキングと検索を内蔵したシンプルなRAGエンジン'
spec:
  creation_schema:  # Configuration parameters users fill in when creating a knowledge base
    - name: chunk_size
      label:
        en_US: Chunk Size
        zh_Hans: 分块大小
      type: integer
      required: false
      default: 500
    - name: chunk_overlap
      label:
        en_US: Chunk Overlap
        zh_Hans: 分块重叠
      type: integer
      required: false
      default: 50
  retrieval_schema:  # Configurable parameters during retrieval
    - name: score_threshold
      label:
        en_US: Score Threshold
        zh_Hans: 分数阈值
      type: float
      required: false
      default: 0.5
execution:
  python:
    path: simple_rag.py  # Engine handler, do not modify
    attr: SimpleRag  # Class name of the engine handler, consistent with the class name in simple_rag.py
For configuration item format reference, see: Plugin Manifest Configuration Format

creation_schema and retrieval_schema

Unlike the old KnowledgeRetriever component which used a single spec.config, KnowledgeEngine uses two separate schemas:
  • creation_schema: Parameters users fill in when creating a knowledge base. These are passed to the plugin via creation_settings during ingestion and retrieval.
  • retrieval_schema: Parameters users can adjust when querying the knowledge base. These are passed to the plugin via retrieval_settings.

Capability Declaration

KnowledgeEngine can declare its supported capabilities. LangBot uses these capability declarations to determine UI behavior and available operations:
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability

class SimpleRag(KnowledgeEngine):

    @classmethod
    def get_capabilities(cls) -> list[str]:
        return [
            KnowledgeEngineCapability.DOC_INGESTION,    # Supports document upload and processing
        ]
Available capability constants:
CapabilityDescription
DOC_INGESTIONSupports document upload and processing. When declared, LangBot shows a “Documents” tab in the knowledge base details. If the plugin is used to bridge other knowledge base services or does not require users to upload documents, this capability declaration can be omitted.
DOC_PARSINGSupports native document parsing (file-to-text extraction). When declared, users can choose to use the Knowledge Engine’s built-in parsing when uploading documents, without needing an external Parser plugin. If not declared, an external Parser plugin must be installed to upload documents.
Other retrieval behaviors (such as reranking, hybrid search, etc.) are controlled by the plugin’s retrieval_schema and do not need capability flags.

Plugin Handler

The following code will be generated by default (components/knowledge_engine/<engine_name>.py). You need to implement the three core methods: ingest, retrieve, and delete_document. Complete code can be found in the SimpleKnowledgeEngine example in langbot-plugin-demo.
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
from langbot_plugin.api.entities.builtin.rag.models import (
    IngestionContext,
    IngestionResult,
)
from langbot_plugin.api.entities.builtin.rag.context import (
    RetrievalContext,
    RetrievalResponse,
    RetrievalResultEntry,
)
from langbot_plugin.api.entities.builtin.rag.enums import DocumentStatus
from langbot_plugin.api.entities.builtin.provider.message import ContentElement


class SimpleRag(KnowledgeEngine):

    @classmethod
    def get_capabilities(cls) -> list[str]:
        return [KnowledgeEngineCapability.DOC_INGESTION]

    async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
        """Callback when a knowledge base is created, can be used to initialize resources"""
        pass

    async def on_knowledge_base_delete(self, kb_id: str) -> None:
        """Callback when a knowledge base is deleted, can be used to clean up resources"""
        pass

    async def ingest(self, context: IngestionContext) -> IngestionResult:
        """Ingest a document into the knowledge base"""

        # 1. Get file content
        file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)

        # 2. Parse document and split into chunks
        text = file_bytes.decode('utf-8')
        chunk_size = context.creation_settings.get('chunk_size', 500)
        chunk_overlap = context.creation_settings.get('chunk_overlap', 50)
        chunks = self._split_text(text, chunk_size, chunk_overlap)

        # 3. Generate embeddings using host embedding model
        embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
        vectors = await self.plugin.invoke_embedding(embedding_model_uuid, chunks)

        # 4. Write to host vector database
        collection_id = context.get_collection_id()
        ids = [f"{context.file_object.metadata.document_id}_{i}" for i in range(len(chunks))]
        metadata = [{"document_id": context.file_object.metadata.document_id, "chunk_index": i, "text": chunks[i]} for i in range(len(chunks))]
        await self.plugin.vector_upsert(collection_id, vectors, ids, metadata)

        return IngestionResult(
            document_id=context.file_object.metadata.document_id,
            status=DocumentStatus.COMPLETED,
            chunks_created=len(chunks),
        )

    async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
        """Retrieve relevant content from the knowledge base"""

        # 1. Generate query vector
        embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
        query_vectors = await self.plugin.invoke_embedding(embedding_model_uuid, [context.query])
        query_vector = query_vectors[0]

        # 2. Vector search
        collection_id = context.get_collection_id()
        results = await self.plugin.vector_search(collection_id, query_vector, top_k=context.retrieval_settings.get('top_k', 5))

        # 3. Convert to retrieval results
        entries = []
        for r in results:
            entry = RetrievalResultEntry(
                id=r.get('id', ''),
                content=[ContentElement.from_text(r.get('metadata', {}).get('text', ''))],
                metadata=r.get('metadata', {}),
                distance=r.get('score', 0.0),
            )
            entries.append(entry)

        return RetrievalResponse(
            results=entries,
            total_found=len(entries),
            metadata={},
        )

    async def delete_document(self, kb_id: str, document_id: str) -> bool:
        """Delete a document and its associated data from the knowledge base"""
        collection_id = kb_id
        deleted = await self.plugin.vector_delete(collection_id, file_ids=[document_id])
        return deleted > 0

Lifecycle Hooks

KnowledgeEngine provides two lifecycle hooks that are called when knowledge bases are created and deleted:
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
    """Callback when a knowledge base using this engine is created

    Args:
        kb_id: Knowledge base UUID
        config: User-provided configuration (fields from creation_schema)
    """

async def on_knowledge_base_delete(self, kb_id: str) -> None:
    """Callback when a knowledge base using this engine is deleted

    Args:
        kb_id: Knowledge base UUID
    """

Document Ingestion

The ingest method is called when a user uploads a document to the knowledge base:
async def ingest(self, context: IngestionContext) -> IngestionResult:
IngestionContext contains the following information:
class IngestionContext(pydantic.BaseModel):
    file_object: FileObject        # File object to ingest
    knowledge_base_id: str         # Target knowledge base ID
    collection_id: str | None      # Vector collection ID (falls back to knowledge_base_id)
    creation_settings: dict        # Configuration from knowledge base creation
    parsed_content: ParseResult | None  # Pre-parsed content from external Parser plugin (if any)
FileObject contains file metadata:
class FileObject(pydantic.BaseModel):
    metadata: FileMetadata    # Contains filename, file_size, mime_type, document_id, knowledge_base_id, etc.
    storage_path: str         # File path in the storage system
IngestionResult should return the ingestion result:
class IngestionResult(pydantic.BaseModel):
    document_id: str                  # Document ID
    status: DocumentStatus            # Processing status: COMPLETED / FAILED
    chunks_created: int = 0           # Number of chunks created
    error_message: str | None = None  # Error message on failure
    metadata: dict = {}               # Additional metadata

Knowledge Retrieval

The retrieve method is called when the knowledge base is queried:
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
RetrievalContext contains the following information:
class RetrievalContext(pydantic.BaseModel):
    query: str                           # Query text
    knowledge_base_id: str | None        # Knowledge base ID
    collection_id: str | None            # Vector collection ID
    retrieval_settings: dict             # Retrieval configuration (fields from retrieval_schema)
    creation_settings: dict              # Knowledge base creation configuration
    filters: dict                        # Metadata filter conditions (Chroma-style where syntax)
The LangBot host automatically injects a default top_k value (default 5) into retrieval_settings before passing it to the plugin. Plugins can access it via context.retrieval_settings.get('top_k', 5).The filters field contains Chroma-style where filter conditions extracted from retrieval_settings. When the caller provides filters in the retrieval settings (e.g., filtering by time range, file type, or custom metadata fields), the host populates this field so plugins can apply them during retrieval. If no filters are provided, this field is an empty dict.
RetrievalResponse should return the retrieval results:
class RetrievalResponse(pydantic.BaseModel):
    results: list[RetrievalResultEntry]  # List of retrieval results
    total_found: int                     # Total number of matches
    metadata: dict                       # Additional metadata
RetrievalResultEntry represents a single retrieval result:
class RetrievalResultEntry(pydantic.BaseModel):
    id: str                       # Result ID
    content: list[ContentElement] # Content, create using ContentElement.from_text()
    metadata: dict                # Metadata
    distance: float               # Distance score (smaller is more relevant)
    score: float | None = None    # Similarity score (larger is more relevant)

Document Deletion

The delete_document method is called when a user deletes a document from the knowledge base:
async def delete_document(self, kb_id: str, document_id: str) -> bool:
    """Delete a document and its associated data

    Args:
        kb_id: Knowledge base ID
        document_id: Document ID

    Returns:
        Whether the deletion was successful
    """

Host RAG APIs

KnowledgeEngine components can call LangBot host-provided RAG APIs via self.plugin, including embedding model invocation, vector database operations, and file retrieval.

Invoke Embedding Model

async def invoke_embedding(
    self,
    embedding_model_uuid: str,
    texts: list[str],
) -> list[list[float]]:
    """Generate embeddings using host's embedding model

    Args:
        embedding_model_uuid: Embedding model UUID
        texts: List of texts to embed

    Returns:
        List of embedding vectors, one per input text
    """

# Usage example
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])

Vector Upsert

async def vector_upsert(
    self,
    collection_id: str,
    vectors: list[list[float]],
    ids: list[str],
    metadata: list[dict] | None = None,
) -> None:
    """Upsert vectors to host's vector database

    Args:
        collection_id: Target collection ID
        vectors: List of vectors
        ids: List of unique IDs for vectors
        metadata: Optional list of metadata dicts
    """

# Usage example
await self.plugin.vector_upsert(
    collection_id="kb_uuid",
    vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
    ids=["chunk_0", "chunk_1"],
    metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
)
async def vector_search(
    self,
    collection_id: str,
    query_vector: list[float],
    top_k: int = 5,
    filters: dict | None = None,
) -> list[dict]:
    """Search similar vectors in host's vector database

    Args:
        collection_id: Target collection ID
        query_vector: Query vector for similarity search
        top_k: Number of results to return
        filters: Optional metadata filters

    Returns:
        List of search results (dict with id, score, metadata, etc.)
    """

# Usage example
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    top_k=5,
)
# Return format: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
Each result returned by vector_search is a dict containing id (vector ID), score (distance score), and metadata (metadata provided during upsert). If you need text content in retrieval results, store the text in metadata during ingestion.

Vector Delete

async def vector_delete(
    self,
    collection_id: str,
    file_ids: list[str] | None = None,
    filters: dict | None = None,
) -> int:
    """Delete vectors from host's vector database

    Args:
        collection_id: Target collection ID
        file_ids: File IDs whose vectors should be deleted
        filters: Optional metadata filters for deletion

    Returns:
        Number of deleted items
    """

# Usage example
deleted = await self.plugin.vector_delete(
    collection_id="kb_uuid",
    file_ids=["doc_001"],
)
The filters parameter supports Chroma-style where syntax for metadata filtering. Multiple top-level keys are AND-ed. Supported operators: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin. Example: {"file_id": {"$eq": "abc"}}.Note: Chroma, Qdrant, and SeekDB store full metadata and can filter on any field. Milvus and pgvector only store text, file_id, and chunk_uuid — filters on other fields will be silently ignored.
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
    """Get file content from host storage

    Args:
        storage_path: File storage path (from FileObject.storage_path)

    Returns:
        File content as bytes
    """

# Usage example
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)

Testing the Knowledge Engine

After creation, execute the command lbp run in the plugin directory to start debugging. Then in LangBot:
  1. Go to the “Knowledge Base” page
  2. Click “Create Knowledge Base”
  3. Select the Knowledge Engine provided by your plugin and fill in the configuration based on the engine’s creation_schema
  4. After creation, upload documents to test ingestion capabilities (if the engine declares DOC_INGESTION capability)
  5. Bind the knowledge base to a pipeline and test retrieval capabilities