RAGエンジン(KnowledgeEngine)コンポーネントにより、プラグインはLangBotにナレッジベースの完全なインデックス作成と検索機能を提供できます。ユーザーがLangBotでナレッジベースを作成する際、プラグインが提供するRAGエンジンを選択して、ドキュメントの取り込み、検索、削除を管理できます。また、プラグインを通じてDify、RAGFlow、FastGPTなどの業界先進のナレッジベースサービスとブリッジすることも可能です。
RAGエンジンコンポーネントの追加
1つのプラグインには任意の数のRAGエンジンを追加できます。プラグインディレクトリでコマンドlbp comp KnowledgeEngineを実行し、プロンプトに従ってRAGエンジンの設定を入力します。
➜ MyRAGPlugin > lbp comp KnowledgeEngine
Generating component KnowledgeEngine...
Knowledge Engine name: simple_rag
Knowledge Engine description: A simple Knowledge Engine with built-in chunking and retrieval
Component KnowledgeEngine generated successfully.
組件 KnowledgeEngine 生成成功。
これにより、components/knowledge_engine/ディレクトリにsimple_rag.yamlとsimple_rag.pyファイルが生成されます。.yamlファイルはRAGエンジンの基本情報と設定スキーマを定義し、.pyファイルはこのエンジンのハンドラです:
➜ MyRAGPlugin > tree
...
├── components
│ ├── __init__.py
│ └── knowledge_engine
│ ├── __init__.py
│ ├── simple_rag.py
│ └── simple_rag.yaml
...
マニフェストファイル: RAGエンジン
apiVersion: v1 # 変更しないでください
kind: KnowledgeEngine # 変更しないでください
metadata:
name: simple_rag # RAGエンジン名、このエンジンを識別するために使用
label:
en_US: Simple Knowledge Engine # エンジン表示名、LangBotのUIに表示、多言語対応
zh_Hans: 简易 RAG 引擎
ja_JP: シンプル RAG エンジン
description:
en_US: 'A simple Knowledge Engine with built-in chunking and retrieval'
zh_Hans: '内置分块和检索能力的简易 RAG 引擎'
ja_JP: 'チャンキングと検索を内蔵したシンプルなRAGエンジン'
spec:
creation_schema: # ナレッジベース作成時にユーザーが入力する設定パラメータ
- name: chunk_size
label:
en_US: Chunk Size
zh_Hans: 分块大小
type: integer
required: false
default: 500
- name: chunk_overlap
label:
en_US: Chunk Overlap
zh_Hans: 分块重叠
type: integer
required: false
default: 50
retrieval_schema: # 検索時に設定可能なパラメータ
- name: score_threshold
label:
en_US: Score Threshold
zh_Hans: 分数阈值
type: float
required: false
default: 0.5
execution:
python:
path: simple_rag.py # エンジンハンドラ、変更しないでください
attr: SimpleRag # エンジンハンドラのクラス名、simple_rag.pyのクラス名と一致
設定項目フォーマットのリファレンスについては、プラグインマニフェスト設定フォーマットを参照してください。
creation_schema と retrieval_schema
旧バージョンのKnowledgeRetrieverコンポーネントが単一のspec.configを使用していたのとは異なり、KnowledgeEngineは2つの独立したスキーマを使用します:
- creation_schema: ナレッジベース作成時にユーザーが入力するパラメータ。取り込みと検索時に
creation_settingsを通じてプラグインに渡されます。
- retrieval_schema: ナレッジベースのクエリ時にユーザーが調整できるパラメータ。
retrieval_settingsを通じてプラグインに渡されます。
能力宣言
KnowledgeEngineはサポートする能力を宣言でき、LangBotはこれらの能力宣言に基づいてUIの表示と利用可能な操作を決定します:
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [
KnowledgeEngineCapability.DOC_INGESTION, # ドキュメントのアップロードと処理をサポート
]
利用可能な能力定数:
| 能力 | 説明 |
|---|
DOC_INGESTION | ドキュメントのアップロードと処理をサポート。宣言すると、LangBotはナレッジベースの詳細に「ドキュメント」タブを表示します。プラグインが他のナレッジベースサービスとのブリッジ用である場合や、ユーザーによるドキュメントのアップロードが不要な場合は、この能力宣言を省略できます。 |
DOC_PARSING | ネイティブドキュメント解析(ファイルからテキストへの抽出)をサポート。宣言すると、ユーザーはドキュメントアップロード時にRAGエンジンの内蔵解析機能を選択でき、外部Parserプラグインが不要になります。未宣言の場合、ドキュメントをアップロードするには外部Parserプラグインのインストールが必要です。 |
その他の検索動作(リランキング、ハイブリッド検索など)はプラグインのretrieval_schemaで制御され、能力フラグは不要です。
プラグインハンドラ
以下のコードがデフォルトで生成されます(components/knowledge_engine/<エンジン名>.py)。ingest、retrieve、delete_documentの3つのコアメソッドを実装する必要があります。完全なコードはlangbot-plugin-demoのSimpleKnowledgeEngineで見つけることができます。
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
from langbot_plugin.api.entities.builtin.rag.models import (
IngestionContext,
IngestionResult,
)
from langbot_plugin.api.entities.builtin.rag.context import (
RetrievalContext,
RetrievalResponse,
RetrievalResultEntry,
)
from langbot_plugin.api.entities.builtin.rag.enums import DocumentStatus
from langbot_plugin.api.entities.builtin.provider.message import ContentElement
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [KnowledgeEngineCapability.DOC_INGESTION]
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""ナレッジベース作成時のコールバック、リソースの初期化に使用できます"""
pass
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""ナレッジベース削除時のコールバック、リソースのクリーンアップに使用できます"""
pass
async def ingest(self, context: IngestionContext) -> IngestionResult:
"""ドキュメントをナレッジベースに取り込む"""
# 1. ファイル内容を取得
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
# 2. ドキュメントを解析してチャンクに分割
text = file_bytes.decode('utf-8')
chunk_size = context.creation_settings.get('chunk_size', 500)
chunk_overlap = context.creation_settings.get('chunk_overlap', 50)
chunks = self._split_text(text, chunk_size, chunk_overlap)
# 3. ホストの埋め込みモデルを使用してベクトルを生成
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
vectors = await self.plugin.invoke_embedding(embedding_model_uuid, chunks)
# 4. ホストのベクトルデータベースに書き込み
collection_id = context.get_collection_id()
ids = [f"{context.file_object.metadata.document_id}_{i}" for i in range(len(chunks))]
metadata = [{"document_id": context.file_object.metadata.document_id, "chunk_index": i, "text": chunks[i]} for i in range(len(chunks))]
await self.plugin.vector_upsert(collection_id, vectors, ids, metadata)
return IngestionResult(
document_id=context.file_object.metadata.document_id,
status=DocumentStatus.COMPLETED,
chunks_created=len(chunks),
)
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
"""ナレッジベースから関連コンテンツを検索"""
# 1. クエリベクトルを生成
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
query_vectors = await self.plugin.invoke_embedding(embedding_model_uuid, [context.query])
query_vector = query_vectors[0]
# 2. ベクトル検索
collection_id = context.get_collection_id()
results = await self.plugin.vector_search(collection_id, query_vector, top_k=context.retrieval_settings.get('top_k', 5))
# 3. 検索結果に変換
entries = []
for r in results:
entry = RetrievalResultEntry(
id=r.get('id', ''),
content=[ContentElement.from_text(r.get('metadata', {}).get('text', ''))],
metadata=r.get('metadata', {}),
distance=r.get('score', 0.0),
)
entries.append(entry)
return RetrievalResponse(
results=entries,
total_found=len(entries),
metadata={},
)
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""ナレッジベースからドキュメントと関連データを削除"""
collection_id = kb_id
deleted = await self.plugin.vector_delete(collection_id, file_ids=[document_id])
return deleted > 0
ライフサイクルフック
KnowledgeEngineは、ナレッジベースの作成・削除時に呼び出される2つのライフサイクルフックを提供します:
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""このエンジンを使用するナレッジベースが作成された時のコールバック
Args:
kb_id: ナレッジベースUUID
config: ユーザーが作成時に入力した設定(creation_schemaのフィールド)
"""
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""このエンジンを使用するナレッジベースが削除された時のコールバック
Args:
kb_id: ナレッジベースUUID
"""
ドキュメントの取り込み
ingestメソッドは、ユーザーがナレッジベースにドキュメントをアップロードした時に呼び出されます:
async def ingest(self, context: IngestionContext) -> IngestionResult:
IngestionContextには以下の情報が含まれます:
class IngestionContext(pydantic.BaseModel):
file_object: FileObject # 取り込むファイルオブジェクト
knowledge_base_id: str # ターゲットのナレッジベースID
collection_id: str | None # ベクトルコレクションID(デフォルトはknowledge_base_idにフォールバック)
creation_settings: dict # ナレッジベース作成時の設定パラメータ
parsed_content: ParseResult | None # 外部Parserプラグインの事前解析コンテンツ(存在する場合)
FileObjectにはファイルのメタ情報が含まれます:
class FileObject(pydantic.BaseModel):
metadata: FileMetadata # filename, file_size, mime_type, document_id, knowledge_base_id等を含む
storage_path: str # ストレージシステム内のファイルパス
IngestionResultは取り込み結果を返す必要があります:
class IngestionResult(pydantic.BaseModel):
document_id: str # ドキュメントID
status: DocumentStatus # 処理ステータス: COMPLETED / FAILED
chunks_created: int = 0 # 作成されたチャンク数
error_message: str | None = None # 失敗時のエラーメッセージ
metadata: dict = {} # 追加メタデータ
ナレッジ検索
retrieveメソッドは、ナレッジベースがクエリされた時に呼び出されます:
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
RetrievalContextには以下の情報が含まれます:
class RetrievalContext(pydantic.BaseModel):
query: str # クエリテキスト
knowledge_base_id: str | None # ナレッジベースID
collection_id: str | None # ベクトルコレクションID
retrieval_settings: dict # 検索時の設定パラメータ(retrieval_schemaのフィールド)
creation_settings: dict # ナレッジベース作成時の設定パラメータ
filters: dict # メタデータフィルター条件(Chromaスタイルwhere構文)
LangBotホストはretrieval_settingsをプラグインに渡す前にデフォルトのtop_k値(デフォルト5)を自動的に注入します。プラグインはcontext.retrieval_settings.get('top_k', 5)でアクセスできます。filtersフィールドにはretrieval_settingsから抽出されたChromaスタイルのwhereフィルター条件が含まれます。呼び出し元が検索設定でfiltersを指定した場合(例:時間範囲、ファイルタイプ、カスタムメタデータフィールドによるドキュメントフィルタリング)、ホストがこのフィールドを設定し、プラグインは検索時にフィルタリングを適用できます。フィルターが指定されていない場合、このフィールドは空のdictです。
RetrievalResponseは検索結果を返す必要があります:
class RetrievalResponse(pydantic.BaseModel):
results: list[RetrievalResultEntry] # 検索結果リスト
total_found: int # 一致した総数
metadata: dict # 追加メタデータ
RetrievalResultEntryは単一の検索結果を表します:
class RetrievalResultEntry(pydantic.BaseModel):
id: str # 結果ID
content: list[ContentElement] # コンテンツ、ContentElement.from_text()を使用して作成
metadata: dict # メタデータ
distance: float # 距離スコア(小さいほど関連性が高い)
score: float | None = None # 類似度スコア(大きいほど関連性が高い)
ドキュメントの削除
delete_documentメソッドは、ユーザーがナレッジベースからドキュメントを削除した時に呼び出されます:
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""ドキュメントと関連データを削除
Args:
kb_id: ナレッジベースID
document_id: ドキュメントID
Returns:
削除が成功したかどうか
"""
ホストRAG API
KnowledgeEngineコンポーネントはself.pluginを通じて、LangBotホストが提供する埋め込みモデル呼び出し、ベクトルデータベース操作、ファイル読み取りなどのRAG関連APIを呼び出すことができます。
埋め込みモデルの呼び出し
async def invoke_embedding(
self,
embedding_model_uuid: str,
texts: list[str],
) -> list[list[float]]:
"""ホストの埋め込みモデルを使用してベクトルを生成
Args:
embedding_model_uuid: 埋め込みモデルUUID
texts: 埋め込むテキストのリスト
Returns:
ベクトルのリスト、入力テキストごとに1つ
"""
# 使用例
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])
ベクトルの書き込み
async def vector_upsert(
self,
collection_id: str,
vectors: list[list[float]],
ids: list[str],
metadata: list[dict] | None = None,
) -> None:
"""ホストのベクトルデータベースにベクトルを書き込む
Args:
collection_id: ターゲットコレクションID
vectors: ベクトルのリスト
ids: ベクトルの一意識別子リスト
metadata: オプションのメタデータリスト
"""
# 使用例
await self.plugin.vector_upsert(
collection_id="kb_uuid",
vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
ids=["chunk_0", "chunk_1"],
metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
)
ベクトル検索
async def vector_search(
self,
collection_id: str,
query_vector: list[float],
top_k: int = 5,
filters: dict | None = None,
) -> list[dict]:
"""ホストのベクトルデータベースで類似ベクトルを検索
Args:
collection_id: ターゲットコレクションID
query_vector: 類似性検索のクエリベクトル
top_k: 返す結果の数
filters: オプションのメタデータフィルター
Returns:
検索結果のリスト(id, score, metadataなどを含むdict)
"""
# 使用例
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
top_k=5,
)
# 返却フォーマット: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
vector_searchが返す各結果はdictで、id(ベクトルID)、score(距離スコア)、metadata(upsert時に提供したメタデータ)の3つのフィールドを含みます。検索結果にテキスト内容が必要な場合は、取り込み時にmetadataにテキストを保存してください。
ベクトルの削除
async def vector_delete(
self,
collection_id: str,
file_ids: list[str] | None = None,
filters: dict | None = None,
) -> int:
"""ホストのベクトルデータベースからベクトルを削除
Args:
collection_id: ターゲットコレクションID
file_ids: 削除するファイルIDのリスト
filters: オプションのメタデータフィルター
Returns:
削除されたアイテム数
"""
# 使用例
deleted = await self.plugin.vector_delete(
collection_id="kb_uuid",
file_ids=["doc_001"],
)
filtersパラメータはChromaスタイルのwhere構文によるメタデータフィルタリングをサポートしています。複数のトップレベルキーはAND条件として結合されます。サポートされる演算子:$eq、$ne、$gt、$gte、$lt、$lte、$in、$nin。例:{"file_id": {"$eq": "abc"}}。注意: Chroma、Qdrant、SeekDBは完全なメタデータを保存し、任意のフィールドでフィルタリングできます。MilvusとpgvectorはDB側にtext、file_id、chunk_uuidのみ保存しており、それ以外のフィールドでのフィルタリングは無視されます。
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
"""ホストストレージからファイル内容を取得
Args:
storage_path: ファイルのストレージパス(FileObject.storage_pathから取得)
Returns:
ファイル内容のバイトデータ
"""
# 使用例
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
RAGエンジンのテスト
作成後、プラグインディレクトリでコマンドlbp runを実行してデバッグを開始します。その後、LangBotで:
- 「ナレッジベース」ページに移動
- 「ナレッジベースを作成」をクリック
- プラグインが提供するRAGエンジンを選択し、エンジンの
creation_schemaに基づいて設定を入力
- 作成後、ドキュメントをアップロードして取り込み機能をテスト(エンジンが
DOC_INGESTION能力を宣言している場合)
- パイプラインにナレッジベースをバインドし、検索機能をテスト