|PyPI| |Python Version| |Codecov| |Tests|
.. |PyPI| image:: https://img.shields.io/pypi/v/datachain.svg :target: https://pypi.org/project/datachain/ :alt: PyPI .. |Python Version| image:: https://img.shields.io/pypi/pyversions/datachain :target: https://pypi.org/project/datachain :alt: Python Version .. |Codecov| image:: https://codecov.io/gh/iterative/datachain/graph/badge.svg?token=byliXGGyGB :target: https://codecov.io/gh/iterative/datachain :alt: Codecov .. |Tests| image:: https://github.com/iterative/datachain/actions/workflows/tests.yml/badge.svg :target: https://github.com/iterative/datachain/actions/workflows/tests.yml :alt: Tests
AI 🔗 DataChain
DataChain 是一个为人工智能设计的现代 Python 数据框架库。 它可以组织你未结构化的数据集,并在本地机器上大规模处理。
主要特性
📂 存储作为事实来源。
- 处理未结构化数据而不产生冗余副本:支持 S3、GCP、Azure 和本地文件系统。
- 多模态数据:图像、视频、文本、PDF、JSON、CSV、parquet。
- 将文件和元数据合并为持久化、版本化的列式数据集。
🐍 Python 友好型数据管道。
- 操作 Python 对象和对象字段。
- 内置并行化和超内存计算,无需 SQL 或 Spark 任务。
🧠 数据丰富和处理。
- 使用本地 AI 模型和 LLM API 生成元数据列。
- 按 AI 元数据进行过滤、连接和分组。向量相似性搜索。
- 将数据集传递给 Pytorch 和 Tensorflow,或导出回存储。
🚀 高效性。
- 并行化、超内存工作负载和数据缓存。
- 在 Python 对象字段上的矢量化操作:求和、计数、平均值等。
- 嵌入向量搜索。
快速开始
.. code:: console
$ pip install datachain
使用本地模型进行数据整理
我们将评估存储在 Google Cloud Storage 中的聊天机器人对话,这些对话以文本文件形式存储 - 本例中共 50 个文件。 这些对话涉及用户与机器人聊天,同时寻找更好的无线计划。 我们的目标是识别成功的对话。
示例中使用的数据是公开可用的
_。示例代码设计用于在本地机器上运行。
首先,我们将展示使用 transformers
库进行批量推理的简单情感模型:
.. code:: shell
pip install transformers
下面的代码下载云中的文件,并对每个文件应用用户定义的函数。所有检测到正面情感的文件将被复制到本地目录。
.. code:: py
from transformers import pipeline
from datachain import DataChain, Column
classifier = pipeline("sentiment-analysis", device="cpu",
model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")
def is_positive_dialogue_ending(file) -> bool:
dialogue_ending = file.read()[-512:]
return classifier(dialogue_ending)[0]["label"] == "POSITIVE"
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/",
object_name="file", type="text")
.settings(parallel=8, cache=True)
.map(is_positive=is_positive_dialogue_ending)
.save("file_response")
)
positive_chain = chain.filter(Column("is_positive") == True)
positive_chain.export_files("./output")
print(f"{positive_chain.count()} files were exported")
13 个文件被导出
.. code:: shell
$ ls output/datachain-demo/chatbot-KiT/
15.txt 20.txt 24.txt 27.txt 28.txt 29.txt 33.txt 37.txt 38.txt 43.txt ...
$ ls output/datachain-demo/chatbot-KiT/ | wc -l
13
使用 LLM 判断聊天机器人
LLM 可以作为高效的通用分类器。在下面的示例中,我们使用 Mistral 的免费 API 评估聊天机器人的表现。请在 https://console.mistral.ai 获取免费的 Mistral API 密钥。
.. code:: shell
$ pip install mistralai (必须版本 >=1.0.0)
$ export MISTRAL_API_KEY=_your_key_
DataChain 可以并行化 API 调用;Mistral 免费版支持同时最多 4 个请求。
.. code:: py
from mistralai import Mistral
from datachain import File, DataChain, Column
PROMPT = "这个对话成功了吗?请用一个词回答:成功或失败。"
def eval_dialogue(file: File) -> bool:
client = Mistral()
response = client.chat.complete(
model="open-mixtral-8x22b",
messages=[{"role": "system", "content": PROMPT},
{"role": "user", "content": file.read()}])
result = response.choices[0].message.content
return result.lower().startswith("success")
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
.settings(parallel=4, cache=True)
.map(is_success=eval_dialogue)
.save("mistral_files")
)
successful_chain = chain.filter(Column("is_success") == True)
successful_chain.export_files("./output_mistral")
print(f"{successful_chain.count()} files were exported")
根据上述指示,Mistral 模型认为 50 个文件中有 31 个文件包含成功的对话:
.. code:: shell
$ ls output_mistral/datachain-demo/chatbot-KiT/
1.txt 15.txt 18.txt 2.txt 22.txt 25.txt 28.txt 33.txt 37.txt 4.txt 41.txt ...
$ ls output_mistral/datachain-demo/chatbot-KiT/ | wc -l
31
序列化 Python 对象
LLM 响应可能包含对分析有价值的信息,例如使用的 token 数量或模型性能参数。
与从 Mistral 响应数据结构(类 ChatCompletionResponse
)中提取这些信息不同,DataChain 可以将整个 LLM 响应序列化到内部数据库:
.. code:: py
from mistralai import Mistral
from mistralai.models import ChatCompletionResponse
from datachain import File, DataChain, Column
PROMPT = "这个对话成功了吗?请用一个词回答:成功或失败。"
def eval_dialog(file: File) -> ChatCompletionResponse:
client = MistralClient()
return client.chat(
model="open-mixtral-8x22b",
messages=[{"role": "system", "content": PROMPT},
{"role": "user", "content": file.read()}])
chain = (
DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file")
.settings(parallel=4, cache=True)
.map(response=eval_dialog)
.map(status=lambda response: response.choices[0].message.content.lower()[:7])
.save("response")
)
chain.select("file.name", "status", "response.usage").show(5)
success_rate = chain.filter(Column("status") == "success").count() / chain.count()
print(f"{100*success_rate:.1f}% 的对话是成功的")
输出:
.. code:: shell
file status response response response
name usage usage usage
prompt_tokens total_tokens completion_tokens
0 1.txt success 547 548 1
1 10.txt failure 3576 3578 2
2 11.txt failure 626 628 2
3 12.txt failure 1144 1182 38
4 13.txt success 1100 1101 1
[Limited by 5 rows]
64.0% 的对话是成功的
迭代 Python 数据结构
在之前的示例中,数据集被保存在嵌入式数据库 (SQLite
_ 位于工作目录的 .datachain
文件夹中)。
这些数据集自动进行了版本控制,并且可以使用 DataChain.from_dataset("dataset_name")
访问。
以下是如何检索已保存的数据集并逐个迭代对象:
.. code:: py
chain = DataChain.from_dataset("response")
# 逐个迭代:支持超内存工作流
for file, response in chain.limit(5).collect("file", "response"):
# 验证收集的 Python 对象
assert isinstance(response, ChatCompletionResponse)
status = response.choices[0].message.content[:7]
tokens = response.usage.total_tokens
print(f"{file.get_uri()}: {status}, 文件大小: {file.size}, tokens: {tokens}")
输出:
.. code:: shell
gs://datachain-demo/chatbot-KiT/1.txt: Success, 文件大小: 1776, tokens: 548
gs://datachain-demo/chatbot-KiT/10.txt: Failure, 文件大小: 11576, tokens: 3578
gs://datachain-demo/chatbot-KiT/11.txt: Failure, 文件大小: 2045, tokens: 628
gs://datachain-demo/chatbot-KiT/12.txt: Failure, 文件大小: 3833, tokens: 1207
gs://datachain-demo/chatbot-KiT/13.txt: Success, 文件大小: 3657, tokens: 1101
矢量化分析 Python 对象
某些操作可以在数据库内部进行而无需反序列化。例如,让我们计算使用 LLM API 的总成本,假设 Mixtral 调用每 100 万个输入 tokens 费用为 2 美元,每 100 万个输出 tokens 费用为 6 美元:
.. code:: py
chain = DataChain.from_dataset("mistral_dataset")
cost = chain.sum("response.usage.prompt_tokens")*0.000002 \
+ chain.sum("response.usage.completion_tokens")*0.000006
print(f"在 {chain.count()} 次调用中花费了 ${cost:.2f}")
输出:
.. code:: shell
花费了 $0.08 在 50 次调用中
PyTorch 数据加载器
链结果可以导出或直接传递给 PyTorch 数据加载器。 例如,如果我们有兴趣传递图像和基于文件名后缀的标签,可以用以下代码实现:
.. code:: py
from torch.utils.data import DataLoader
from transformers import CLIPProcessor
from datachain import C, DataChain
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
chain = (
DataChain.from_storage("gs://datachain-demo/dogs-and-cats/", type="image")
.map(label=lambda name: name.split(".")[0], params=["file.name"])
.select("file", "label").to_pytorch(
transform=processor.image_processor,
tokenizer=processor.tokenizer,
)
)
loader = DataLoader(chain, batch_size=1)
教程
入门指南
_多模态 <examples/multimodal/clip_fine_tuning.ipynb>
_(Colab <https://colab.research.google.com/github/iterative/datachain/blob/main/examples/multimodal/clip_fine_tuning.ipynb>
__ 中尝试)LLM 评估 <examples/llm/llm_chatbot_evaluation.ipynb>
_读取 JSON 元数据 <examples/get_started/json-metadata-tutorial.ipynb>
_
贡献
非常欢迎贡献。
了解更多,请参见 贡献者指南
_。
社区和支持
文档 <https://datachain.dvc.ai/>
_- 遇到任何问题,请
提交问题
_ Discord 聊天 <https://dvc.org/chat>
_邮件 <mailto:support@dvc.org>
_Twitter <https://twitter.com/DVCorg>
_
.. _PyPI: https://pypi.org/ .. _file an issue: https://github.com/iterative/datachain/issues .. github-only .. _Contributor Guide: CONTRIBUTING.rst .. _Pydantic: https://github.com/pydantic/pydantic .. _publicly available: https://radar.kit.edu/radar/en/dataset/FdJmclKpjHzLfExE.ExpBot%2B-%2BA%2Bdataset%2Bof%2B79%2Bdialogs%2Bwith%2Ban%2Bexperimental%2Bcustomer%2Bservice%2Bchatbot .. _SQLite: https://www.sqlite.org/ .. _Getting Started: https://datachain.dvc.ai/ .. |Flowchart| image:: https://github.com/iterative/datachain/blob/main/docs/assets/flowchart.png?raw=true :alt: DataChain FlowChart