commands.py中的migrate_knowledge_vector_database()函数解析

本文使用Dify v1.4.0版本,主要解析了commands.py中的migrate_knowledge_vector_database()函数的执行逻辑。源码位置:dify\api\commands.py

一.migrate_knowledge_vector_database源码

migrate_knowledge_vector_database 函数用于将所有高质量索引技术(indexing_technique == "high_quality")的数据集的向量索引迁移到目标向量数据库。它会根据不同的向量数据库类型生成集合名,删除旧索引,重新创建新索引,并统计迁移、跳过和总数,最后输出迁移结果。

defmigrate_knowledge_vector_database():
"""
    Migrate vector database datas to target vector database .
    """

    click.echo(click.style("Starting vector database migration.", fg="green"))
    create_count = 0
    skipped_count = 0
    total_count = 0
    vector_type = dify_config.VECTOR_STORE
    upper_collection_vector_types = {
        VectorType.MILVUS,
        VectorType.PGVECTOR,
        VectorType.VASTBASE,
        VectorType.RELYT,
        VectorType.WEAVIATE,
        VectorType.ORACLE,
        VectorType.ELASTICSEARCH,
        VectorType.OPENGAUSS,
        VectorType.TABLESTORE,
    }
    lower_collection_vector_types = {
        VectorType.ANALYTICDB,
        VectorType.CHROMA,
        VectorType.MYSCALE,
        VectorType.PGVECTO_RS,
        VectorType.TIDB_VECTOR,
        VectorType.OPENSEARCH,
        VectorType.TENCENT,
        VectorType.BAIDU,
        VectorType.VIKINGDB,
        VectorType.UPSTASH,
        VectorType.COUCHBASE,
        VectorType.OCEANBASE,
    }
    page = 1
whileTrue:
try:
            stmt = (
                select(Dataset).filter(Dataset.indexing_technique == "high_quality").order_by(Dataset.created_at.desc())
            )

            datasets = db.paginate(select=stmt, page=page, per_page=50, max_per_page=50, error_out=False)
except NotFound:
break

        page += 1
for dataset in datasets:
            total_count = total_count + 1
            click.echo(
f"Processing the {total_count} dataset {dataset.id}{create_count} created, {skipped_count} skipped."
            )
try:
                click.echo("Creating dataset vector database index: {}".format(dataset.id))
if dataset.index_struct_dict:
if dataset.index_struct_dict["type"] == vector_type:
                        skipped_count = skipped_count + 1
continue
                collection_name = ""
                dataset_id = dataset.id
if vector_type in upper_collection_vector_types:
                    collection_name = Dataset.gen_collection_name_by_id(dataset_id)
elif vector_type == VectorType.QDRANT:
if dataset.collection_binding_id:
                        dataset_collection_binding = (
                            db.session.query(DatasetCollectionBinding)
                            .filter(DatasetCollectionBinding.id == dataset.collection_binding_id)
                            .one_or_none()
                        )
if dataset_collection_binding:
                            collection_name = dataset_collection_binding.collection_name
else:
raise ValueError("Dataset Collection Binding not found")
else:
                        collection_name = Dataset.gen_collection_name_by_id(dataset_id)

elif vector_type in lower_collection_vector_types:
                    collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
else:
raise ValueError(f"Vector store {vector_type} is not supported.")

                index_struct_dict = {"type": vector_type, "vector_store": {"class_prefix": collection_name}}
                dataset.index_struct = json.dumps(index_struct_dict)
                vector = Vector(dataset)
                click.echo(f"Migrating dataset {dataset.id}.")

try:
                    vector.delete()
                    click.echo(
                        click.style(f"Deleted vector index {collection_name} for dataset {dataset.id}.", fg="green")
                    )
except Exception as e:
                    click.echo(
                        click.style(
f"Failed to delete vector index {collection_name} for dataset {dataset.id}.", fg="red"
                        )
                    )
raise e

                dataset_documents = (
                    db.session.query(DatasetDocument)
                    .filter(
                        DatasetDocument.dataset_id == dataset.id,
                        DatasetDocument.indexing_status == "completed",
                        DatasetDocument.enabled == True,
                        DatasetDocument.archived == False,
                    )
                    .all()
                )

                documents = []
                segments_count = 0
for dataset_document in dataset_documents:
                    segments = (
                        db.session.query(DocumentSegment)
                        .filter(
                            DocumentSegment.document_id == dataset_document.id,
                            DocumentSegment.status == "completed",
                            DocumentSegment.enabled == True,
                        )
                        .all()
                    )

for segment in segments:
                        document = Document(
                            page_content=segment.content,
                            metadata={
"doc_id": segment.index_node_id,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
                            },
                        )

                        documents.append(document)
                        segments_count = segments_count + 1

if documents:
try:
                        click.echo(
                            click.style(
f"Creating vector index with {len(documents)} documents of {segments_count}"
f" segments for dataset {dataset.id}.",
                                fg="green",
                            )
                        )
                        vector.create(documents)
                        click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green"))
except Exception as e:
                        click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red"))
raise e
                db.session.add(dataset)
                db.session.commit()
                click.echo(f"Successfully migrated dataset {dataset.id}.")
                create_count += 1
except Exception as e:
                db.session.rollback()
                click.echo(
                    click.style("Error creating dataset index: {} {}".format(e.__class__.__name__, str(e)), fg="red")
                )
continue

    click.echo(
        click.style(
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green"
        )
    )

二.初始化与配置

向量数据库类型及简要解释,如下所示:

向量数据库 解释
AnalyticDB
阿里云提供的云原生数据仓库,支持向量搜索
Chroma
开源向量数据库,为AI应用设计的嵌入式存储解决方案
Milvus
开源向量数据库,专为大规模相似性搜索优化
MyScale
基于ClickHouse的向量搜索数据库
pgvector
PostgreSQL的扩展,为关系型数据库添加向量能力
Vastbase
华为提供的企业级数据库,支持向量检索
pgvecto-rs
用Rust实现的高性能PostgreSQL向量扩展
Qdrant
开源向量相似性搜索引擎,专注于高负载场景
Relyt
基于Redis的向量搜索解决方案
TiDB Vector
TiDB数据库的向量搜索功能
Weaviate 开源向量搜索引擎,带有语义搜索能力
OpenSearch
基于Elasticsearch的开源分布式搜索引擎
Tencent
腾讯云提供的向量数据库服务
Oracle
Oracle数据库的向量搜索功能
Elasticsearch
分布式搜索和分析引擎,支持向量搜索
Elasticsearch-ja
针对日语优化的Elasticsearch版本
Lindorm
阿里云提供的多模数据库,支持向量搜索
Couchbase
分布式NoSQL数据库,具有向量搜索能力
Baidu
百度提供的向量数据库服务
VikingDB
专为AI应用设计的向量数据库
Upstash
提供向量搜索功能的无服务器数据库
TiDB on Qdrant
TiDB与Qdrant集成的解决方案
OceanBase
分布式关系数据库,支持向量搜索
openGauss
华为开源的关系型数据库,支持向量操作
TableStore
阿里云提供的NoSQL数据存储服务
Huawei Cloud
华为云提供的向量数据库服务
click.echo(click.style("Starting vector database migration.", fg="green"))
create_count = 0
skipped_count = 0
total_count = 0
vector_type = dify_config.VECTOR_STORE
upper_collection_vector_types = {
    VectorType.MILVUS,
    VectorType.PGVECTOR,
    VectorType.VASTBASE,
    VectorType.RELYT,
    VectorType.WEAVIATE,
    VectorType.ORACLE,
    VectorType.ELASTICSEARCH,
    VectorType.OPENGAUSS,
    VectorType.TABLESTORE,
}
lower_collection_vector_types = {
    VectorType.ANALYTICDB,
    VectorType.CHROMA,
    VectorType.MYSCALE,
    VectorType.PGVECTO_RS,
    VectorType.TIDB_VECTOR,
    VectorType.OPENSEARCH,
    VectorType.TENCENT,
    VectorType.BAIDU,
    VectorType.VIKINGDB,
    VectorType.UPSTASH,
    VectorType.COUCHBASE,
    VectorType.OCEANBASE,
}

该代码是向量数据库迁移功能的初始化部分,主要做了以下几件事:

1.输出迁移开始提示

首先打印一条绿色提示信息,表示开始向量数据库迁移过程。

2.初始化计数器

  • create_count:用于记录成功创建的新向量索引数量

  • skipped_count:用于记录被跳过的数据集数量

  • total_count:用于记录处理的总数据集数量

3.获取目标vector_type

从配置中获取当前使用的向量存储类型:vector_type = dify_config.VECTOR_STORE

4.定义不同类型向量数据库的集合

  • upper_collection_vector_types:包含使用大写集合名称的向量数据库类型(如MILVUS、PGVECTOR等)

  • lower_collection_vector_types:包含使用小写集合名称的向量数据库类型(如CHROMA、OPENSEARCH等)

这些类型集合在后续代码中用于确定如何生成向量数据库的集合名称,因为不同的向量数据库可能对集合名称的格式有不同的要求。这是迁移过程的预备步骤,为后续实际迁移数据做准备。

三.分页遍历所有高质量数据集

该代码用于分页查询高质量索引技术(indexing_technique 为 high_quality)的 Dataset 数据集,并按创建时间倒序排列。

page = 1
whileTrue:
try:
        stmt = (
            select(Dataset).filter(Dataset.indexing_technique == "high_quality").order_by(Dataset.created_at.desc())
        )
        datasets = db.paginate(select=stmt, page=page, per_page=50, max_per_page=50, error_out=False)
except NotFound:
break

    page += 1
for dataset in datasets:
        ...

1.构建查询语句

stmt = (...):构建一个 SQLAlchemy 的查询语句,筛选 Dataset 表中 indexing_technique 字段为 high_quality 的记录,并按 created_at 字段降序排序。

2.具体分页查询

datasets = db.paginate(...):使用 db 的 paginate 方法对查询结果进行分页,参数含义如下:

  • select=stmt:要分页的查询语句。

  • page=page:当前页码。

  • per_page=50:每页返回 50 条数据。

  • max_per_page=50:每页最大返回 50 条数据。

  • error_out=False:如果页码超出范围不抛出异常,返回空结果。

这样可以高效地分批处理大量数据集,常用于数据迁移或批量操作场景。

四. 处理每个数据集

该代码是向量数据库迁移功能的核心部分,负责处理每个数据集的向量索引迁移。

total_count = total_count + 1
click.echo(
f"Processing the {total_count} dataset {dataset.id}{create_count} created, {skipped_count} skipped."
)
try:
    click.echo("Creating dataset vector database index: {}".format(dataset.id))
if dataset.index_struct_dict:
if dataset.index_struct_dict["type"] == vector_type:
            skipped_count = skipped_count + 1
continue

    collection_name = ""
    dataset_id = dataset.id
if vector_type in upper_collection_vector_types:
        collection_name = Dataset.gen_collection_name_by_id(dataset_id)
elif vector_type == VectorType.QDRANT:
if dataset.collection_binding_id:
            dataset_collection_binding = (
                db.session.query(DatasetCollectionBinding)
                .filter(DatasetCollectionBinding.id == dataset.collection_binding_id)
                .one_or_none()
            )
if dataset_collection_binding:
                collection_name = dataset_collection_binding.collection_name
else:
raise ValueError("Dataset Collection Binding not found")
else:
            collection_name = Dataset.gen_collection_name_by_id(dataset_id)
elif vector_type in lower_collection_vector_types:
        collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
else:
raise ValueError(f"Vector store {vector_type} is not supported.")
  • 检查数据集是否已有相同类型的向量索引,如果有则跳过当前数据集

  • 根据不同的向量存储类型确定集合名称:

    • 有集合绑定ID时,查询绑定记录并使用其集合名称

    • 无集合绑定时,使用数据集ID生成名称

    • 对于大写集合类型(如MILVUSPGVECTOR等),直接使用数据集ID生成名称

    • 对于QDRANT类型,检查是否存在集合绑定:

    • 对于小写集合类型(如CHROMAMYSCALE等),使用数据集ID生成小写名称

    • 对于不支持的向量存储类型,抛出错误

这部分代码的主要目的是根据目标向量存储类型确定适当的集合名称,为后续创建和迁移向量索引做准备。集合名称的正确生成对于确保向量数据能被正确存储和检索至关重要。

五.更新数据集索引结构

index_struct_dict = {"type": vector_type, "vector_store": {"class_prefix": collection_name}}
dataset.index_struct = json.dumps(index_struct_dict)
vector = Vector(dataset)
click.echo(f"Migrating dataset {dataset.id}.")

该代码的作用是在迁移知识向量数据库时,为每个数据集(dataset)设置新的索引结构,并初始化向量对象,输出迁移提示:

  • index_struct_dict = {"type": vector_type, "vector_store": {"class_prefix": collection_name}}构建一个索引结构的字典,指定向量存储类型和集合前缀。

  • dataset.index_struct = json.dumps(index_struct_dict)将索引结构字典序列化为 JSON 字符串,赋值给数据集的 index_struct 字段。

  • vector = Vector(dataset)用当前数据集实例化一个向量对象,后续用于向量操作。

  • click.echo(f"Migrating dataset {dataset.id}.")输出当前正在迁移的数据集 ID。

六.删除原有向量索引

vector.delete()代码作用是删除与当前 Vector 实例相关的数据向量索引。Vector 是一个用于操作向量数据库的类,delete() 方法会清除指定数据集在向量数据库中的索引数据。这样做通常是为了在重新创建索引前,先移除旧的索引,避免数据冲突或冗余。

try:
    vector.delete()
    click.echo(
        click.style(f"Deleted vector index {collection_name} for dataset {dataset.id}.", fg="green")
    )
except Exception as e:
    click.echo(
        click.style(
f"Failed to delete vector index {collection_name} for dataset {dataset.id}.", fg="red"
        )
    )
raise e

七.获取数据集下所有文档及分段

简单理解,一个数据集(知识库)由文档(Document)构成,而文档(Document)由段落(Segment)构成。如果由RDBMS类似,数据集(Dataset)相当于数据库,而文档(Document)相当于数据表,段落(Segment)相当于一条记录。

dataset_documents = (
    db.session.query(DatasetDocument)
    .filter(
        DatasetDocument.dataset_id == dataset.id,
        DatasetDocument.indexing_status == "completed",
        DatasetDocument.enabled == True,
        DatasetDocument.archived == False,
    )
    .all()
)

documents = []
segments_count = 0
for dataset_document in dataset_documents:
    segments = (
        db.session.query(DocumentSegment)
        .filter(
            DocumentSegment.document_id == dataset_document.id,
            DocumentSegment.status == "completed",
            DocumentSegment.enabled == True,
        )
        .all()
    )

for segment in segments:
        document = Document(
            page_content=segment.content,
            metadata={
"doc_id": segment.index_node_id,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
            },
        )

        documents.append(document)
        segments_count = segments_count + 1

1.查询特定数据集中所有符合条件的文档

  • 属于当前数据集

  • 索引状态为”completed”(完成)

  • 已启用(enabled=True)

  • 未归档(archived=False)

2.初始化两个变量

  • documents:空列表,用于存储所有将要迁移的文档段落

  • segments_count:计数器,记录总段落数

3.对每个文档,再查询其所有符合条件的段落

  • 属于当前文档

  • 状态为”completed”

  • 已启用(enabled=True)

4.对每个段落,创建一个Document对象

  • 将段落内容设置为Document的page_content

  • 设置重要元数据(doc_id、doc_hash、document_id、dataset_id)

5.将每个Document对象添加到列表中并计数

八.创建新向量索引

如果有文档,调用 vector.create(documents) 创建新索引,异常时输出错误。

if documents:
try:
        click.echo(
            click.style(
f"Creating vector index with {len(documents)} documents of {segments_count}"
f" segments for dataset {dataset.id}.",
                fg="green",
            )
        )
        vector.create(documents)
        click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green"))
except Exception as e:
        click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red"))
raise e

九.提交事务、异常处理和迁移完成

1.提交数据库变更

将数据集加入 session 并提交,输出成功信息,计数加一。

db.session.add(dataset)
db.session.commit()
click.echo(f"Successfully migrated dataset {dataset.id}.")
create_count += 1

2.异常处理

如果迁移失败,回滚并输出错误。

except Exception as e:
    db.session.rollback()
    click.echo(
        click.style("Error creating dataset index: {} {}".format(e.__class__.__name__, str(e)), fg="red")
    )
continue

3.迁移完成输出

所有数据集处理完毕后,输出迁移完成信息。

click.echo(
    click.style(
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green"
    )
)

参考文献

[0] commands.py中的migrate_knowledge_vector_database()函数解析:https://z0yrmerhgi8.feishu.cn/wiki/U0flwBdbRiSBEAkOUR6cOfxVn8b

[1] weaviate github:https://github.com/weaviate/weaviate

[2] Weaviate Docs:https://weaviate.io/developers/weaviate


知识星球服务内容:Dify源码剖析及答疑,Dify对话系统源码,NLP电子书籍报告下载,公众号所有付费资料。加微信buxingtianxia21进NLP工程化资料群

(文:NLP工程化)

发表评论