DataTrove
DataTrove是一个用于大规模处理、过滤和去重文本数据的库。它提供了一套预建的常用处理模块,并提供了一个框架来轻松添加自定义功能。
DataTrove处理流水线是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用和多步骤设计使其非常适合大型工作负载,例如处理LLM的训练数据。
通过fsspec支持本地、远程和其他文件系统。
目录
安装
pip install datatrove[FLAVOUR]
可用的版本(可以用,
组合,例如[processing,s3]
):
all
安装所有内容:pip install datatrove[all]
io
用于读取warc/arc/wet
文件和arrow/parquet格式的依赖:pip install datatrove[io]
processing
用于文本提取、过滤和分词的依赖:pip install datatrove[processing]
s3
s3支持:pip install datatrove[s3]
cli
命令行工具:pip install datatrove[cli]
快速开始示例
你可以查看以下示例:
- process_common_crawl_dump.py 完整流水线,用于读取commoncrawl warc文件,提取文本内容,过滤并将结果数据保存到s3。在Slurm上运行
- tokenize_c4.py 直接从huggingface的hub读取数据,使用
gpt2
分词器对C4数据集的英语部分进行分词 - minhash_deduplication.py 完整流水线,用于运行文本数据的minhash去重
- sentence_deduplication.py 运行句子级别精确去重的示例
- exact_substrings.py 运行ExactSubstr的示例(需要这个仓库)
流水线
DataTrove文档
每个流水线模块处理datatrove Document
格式的数据:
text
每个样本的实际文本内容id
此样本的唯一标识符(字符串)metadata
可以存储任何额外信息的字典
流水线模块类型
每个流水线模块接受一个Document
生成器作为输入,并返回另一个Document
生成器。
- readers 从不同格式读取数据并生成
Document
- writers 以不同格式将
Document
保存到磁盘/云端 - extractors 从原始格式(如网页html)中提取文本内容
- filters 根据特定规则/标准过滤掉(删除)一些
Document
- stats 收集数据集统计信息的模块
- tokens 用于分词或计算标记数的模块
- dedup 用于去重的模块
完整流水线
流水线被定义为一系列流水线模块。例如,以下流水线将从磁盘读取数据,随机过滤(删除)一些文档,然后将它们写回磁盘:
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
CSVReader(
data_folder="/my/input/path"
),
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
执行器
流水线是平台无关的,这意味着同一流水线可以在不同的执行环境中无缝运行,而无需更改其步骤。每个环境都有自己的PipelineExecutor。 所有执行器的一些通用选项:
pipeline
由应运行的流水线步骤组成的列表logging_dir
用于保存日志文件、统计信息等的数据文件夹。不要为不同的流水线/作业重复使用文件夹,因为这会覆盖你的统计信息、日志和完成情况。skip_completed
(布尔值,默认为True
)datatrove会跟踪已完成的任务,以便在重新启动作业时可以跳过这些任务。设置为False
可禁用此行为randomize_start_duration
(整数,默认为0
)延迟每个任务开始的最大秒数,以防止所有任务同时开始并可能使系统过载。
调用执行器的run
方法来执行其流水线。
[!TIP] Datatrove通过在
${logging_dir}/completions
文件夹中创建标记(空文件)来跟踪哪些任务成功完成。一旦作业完成,如果某些任务失败,你可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。
[!CAUTION] 如果你因为某些任务失败而重新启动流水线,不要改变总任务数,因为这会影响输入文件/分片的分布。
LocalPipelineExecutor
此执行器将在本地机器上启动流水线。 选项:
tasks
要运行的总任务数workers
同时运行多少个任务。如果为-1
,则不限制。任何> 1
的值都将使用多进程来执行任务。start_method
用于生成多进程Pool的方法。如果workers
为1则忽略
执行器示例
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
pipeline=[
...
],
logging_dir="logs/",
tasks=10,
workers=5
)
executor.run()
多节点并行
你可以让不同的节点/机器处理总任务的不同部分,方法是使用local_tasks
和local_rank_offset
。对于每个节点/实例/机器,使用以下选项启动:
tasks
要执行的总任务数(跨所有机器)。这个值在每台机器上必须相同,否则输入文件分布可能会重叠! 示例:500local_tasks
在这台特定机器上将执行多少个总任务。注意,你可以为每台机器使用不同的值。示例:100local_rank_offset
在这台机器上执行的第一个任务的序号。如果这是你启动作业的第3台机器,而前两台机器分别运行了250和150个作业,那么当前机器的这个值应该是400
。
要获得最终合并的统计信息,你需要在包含所有机器统计信息的路径上手动调用merge_stats
脚本。
SlurmPipelineExecutor
此执行器将在Slurm集群上启动流水线,使用Slurm作业数组来分组和管理任务。 选项:
tasks
要运行的总任务数。必需time
Slurm时间限制字符串。必需partition
Slurm分区。必需workers
同时运行多少个任务。如果为-1
,则不限制。Slurm将一次运行workers
个任务。(默认:-1
)job_name
Slurm作业名称(默认:"data_processing")depends
另一个SlurmPipelineExecutor实例,它将作为此流水线的依赖项(当前流水线只有在依赖的流水线成功完成后才会开始执行)sbatch_args
包含你想传递给sbatch的任何其他参数的字典slurm_logs_folder
保存Slurm日志文件的位置。如果为logging_dir
使用本地路径,它们将保存在logging_dir/slurm_logs
中。如果不是,它们将保存在当前目录的子目录中。
其他选项
cpus_per_task
给每个任务分配多少CPU(默认:1
)qos
Slurm qos(默认:"normal")mem_per_cpu_gb
每个CPU的内存,单位为GB(默认:2)env_command
激活Python环境的自定义命令(如果需要)condaenv
要激活的conda环境venv_path
要激活的Python环境路径max_array_size
$ scontrol show config
中的_MaxArraySize_值。如果任务数超过此数字,将分成多个数组作业(默认:1001)max_array_launch_parallel
如果由于max_array_size需要多个作业,是否一次性全部启动(并行)或顺序启动(默认:False
)stagger_max_array_jobs
当max_array_launch_parallel为True时,这决定了在启动每个并行作业之间等待多少秒(默认:0
)run_on_dependency_fail
当我们依赖的作业完成时开始执行,即使它失败了(默认:False
)randomize_start
在约3分钟的窗口内随机化每个任务的开始时间。在大量访问S3存储桶等情况下很有用。(默认:False
)
执行器示例
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job1",
logging_dir="logs/job1",
tasks=500,
workers=100, # 省略以一次运行所有任务
time="10:00:00", # 10小时
partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job2",
logging_dir="logs/job2",
tasks=1,
time="5:00:00", # 5小时
partition="hopper-cpu",
depends=executor1 # 此流水线只有在executor1成功完成后才会启动
)
# executor1.run()
executor2.run() # 这实际上会启动executor1,因为它是一个依赖项,所以不需要显式启动它
对于 logging_dir
为 mylogspath/exp1 的管道,将创建以下文件夹结构:
查看文件夹结构
└── mylogspath/exp1
│── executor.json ⟵ 执行器选项和管道步骤的 json 转储
│── launch_script.slurm ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
│── executor.pik ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
│── ranks_to_run.json ⟵ 正在运行的任务列表
│── logs/
│ └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ 每个任务的单独日志文件
│── completions/
│ └──[00004, 00007, 00204, ...] ⟵ 标记任务已完成的空文件。用于重新启动/恢复作业时(只会运行未完成的任务)
│── stats/
│ └──[00000.json, 00001.json, 00002.json, ...] ⟵ 每个任务的单独统计信息(处理的样本数量、过滤的、移除的等)
└── stats.json ⟵ 所有任务的全局统计信息
颜色化
日志消息支持颜色化。默认情况下,控制台消息会自动检测颜色化,而日志文件(logs/task_XXXXX.log)则禁用颜色化。 要明确启用或禁用颜色化,可以设置以下环境变量:
DATATROVE_COLORIZE_LOGS
设为 "1" 为控制台日志消息添加 ANSI 颜色,设为 "0" 禁用颜色化。DATATROVE_COLORIZE_LOG_FILES
设为 "1" 为保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。
DataFolder / 路径
Datatrove 通过 fsspec 支持多种输入/输出源。
有几种方法可以为 datatrove 块提供路径(用于 input_folder
、logging_dir
、data_folder
等参数):
-
str
:最简单的方法是传递单个字符串。例如:/home/user/mydir
、s3://mybucket/myinputdata
、hf://datasets/allenai/c4/en/
-
(str, fsspec filesystem instance)
:一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))
-
(str, dict)
:一个字符串路径和一个用于初始化文件系统的选项字典。例如(等同于上一行):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})
-
DataFolder
:你可以直接初始化一个 DataFolder 对象并将其作为参数传递
在底层,这些参数组合由 get_datafolder
解析。
实用指南
读取数据
通常,管道会以 Reader 块开始。
大多数读取器都有一个 data_folder
参数 — 指向包含要读取数据的文件夹的路径。
这些文件将分布在每个任务中。如果你有 N
个任务,排名为 i
(从 0 开始)的任务将处理文件 i, i+N, i+2N, i+3N,...
。
在内部,每个读取器读取数据并将其转换为字典,然后创建一个 Document
对象。
大多数读取器的一些常见选项:
text_key
包含每个样本文本内容的字典键。默认:text
id_key
包含每个样本 id 的字典键。默认:id
default_metadata
一个字典,用于添加任何你想要的默认元数据值(例如它们的来源)recursive
是否递归查找data_folder
子目录中的文件glob_pattern
用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz"
将匹配data_folder
的每个子目录的warc/
文件夹中扩展名为.warc.gz
的文件adapter
此函数接收从读取器获得的原始字典,并返回带有Document
字段名的字典。如果需要,你可以覆盖此函数(_default_adapter)。limit
只读取一定数量的样本。对测试/调试有用
提取文本
你可以使用 extractors 从原始 html 中提取文本内容。datatrove 中最常用的提取器是 Trafilatura,它使用 trafilatura 库。
过滤数据
Filters 是任何数据处理管道中最重要的块之一。Datatrove 的过滤器块接收一个 Document
并返回一个布尔值(True
保留文档,False
移除它)。被移除的样本不会继续到下一个管道阶段。你还可以通过将 Writer 传递给 excluded_writer
参数来将被移除的样本保存到磁盘。
保存数据
一旦你完成了数据处理,你可能想要将其保存到某个地方。为此,你可以使用 writer。
写入器需要一个 output_folder
(数据应该保存的路径)。你可以选择要使用的 compression
(默认:gzip
)和每个文件保存的文件名。
对于 output_filename
,使用以下参数应用模板:
${rank}
替换为当前任务的排名。注意,如果没有这个标签,不同的任务可能会尝试写入相同的位置${id}
替换为样本 id- 元数据:任何其他
${tag}
将被相应的document.metadata['tag']
值替换
一个根据 lang
元数据字段分离样本的示例:
JsonlWriter(
f"{MAIN_OUTPUT_PATH}/non_english/",
output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz", # 文件夹结构:language/dump/file
)
去重数据
对于去重,请查看示例 minhash_deduplication.py、sentence_deduplication.py 和 exact_substrings.py。
摘要统计
对于数据的摘要统计,你可以使用 Stats 块。这些块提供了一种简单的方法来以分布式方式收集数据集的数据概况。这是一个两步过程:
- 对于每个分片,遍历文档并将统计信息收集到以下分组之一:
summary
(所有文档计入 "summary" 键)、fqdn
(完全限定域名分组)、suffix
(URL 路径最后部分分组)或histogram
(基于值的分组)。 - 将不同分片的统计信息合并到一个文件中。 有关更多详细信息,请参阅 summary_stats.py。
每个结果统计信息保存在具有以下结构的单独文件中:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json
每个这样的文件都是一个 MetricStatsDict
对象,你可以使用以下方法轻松加载:
from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))
# 例如,nytimes.com 文档的总长度
stats["nytimes.com"].total
# 或 cnn.com 文档的平均值
stats["cnn.com"].mean
可用的统计信息如下:
contamination_stats.py
:word_contamination_{words[0]}
:文档中单词污染的频率。doc_stats.py
:length
:文档长度,white_space_ratio
:空白字符比例,non_alpha_digit_ratio
:非字母和非数字字符比例,digit_ratio
:数字比例,uppercase_ratio
:大写字母比例,elipsis_ratio
:省略号字符比例,punctuation_ratio
:标点符号比例lang_stats.py
:fasttext_{language}
:使用 fastText 的文档语言line_stats.py
:n_lines
:每个文档的行数,avg_line_length
:每个文档的平均行长度,long_line_ratio_words
:超过 k 个字符的行比例,short_line_ratio_chars
:少于 k 个字符的行比例,bullet_point_lines_ratio
:项目符号行比例,line_duplicates
:重复行比例,line_char_duplicates
:重复行中字符的比例paragraph_stats.py
:n_paragraphs
:段落数,avg_paragraph_length
:平均段落长度,short_paragraph_ratio_{chars}
:短段落比例(<{chars} 字符),long_paragraph_ratio_{chars}
:长段落比例(>{chars} 字符)perplexity_stats.py
:ccnet_perplexity_{model_dataset}_{language}
:使用 CCNet 模型在 {dataset} 上的 {language} 中的文档困惑度sentence_stats.py
:n_sentences
:句子数,avg_sentence_length
:平均句子长度,short_sentence_ratio_{chars}
:短句比例(<{chars} 字符),long_sentence_ratio_{chars}
:长句比例(>{chars} 字符)token_stats.py
:token_count
:文档中的令牌数word_stats.py
:n_words
:文档中的单词数,avg_word_length
:文档中单词的平均长度,avg_words_per_line
:文档中每行的平均单词数,short_word_ratio_{chars}
:短于 {chars} 字符的单词比例,stop_word_ratio
:停用词比例,long_word_ratio_{chars}
:长于 {chars} 字符的单词比例,type_token_ratio
:唯一单词数 / 令牌数,capitalized_word_ratio
:首字母大写单词比例,uppercase_word_ratio
:全大写单词比例
自定义块
简单数据
你可以直接将 Document
的可迭代对象作为管道块传递,如下所示:
from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
[
Document(text="一些数据", id="0"),
Document(text="更多数据", id="1"),
Document(text="更多数据", id="2"),
],
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
但请注意,这个可迭代对象不会被分片(如果你启动多个任务,它们都会获得完整的可迭代对象)。 这通常用于小型工作负载/测试。
自定义函数
对于简单的处理,你可以直接传入具有以下签名的自定义函数:
from datatrove.data import DocumentsPipeline
def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""
`data`是Document生成器。你必须返回一个Document生成器(yield)
你可以选择使用`rank`和`world_size`进行分片
"""
for document in data:
document.text = document.text.upper()
yield document
pipeline = [
...,
uppercase_everything,
...
]
[!提示] 由于导入问题,你可能会遇到一些pickle相关的问题。如果发生这种情况,只需将你需要的导入移到函数体内即可。
自定义块
你也可以定义一个完整的块,继承自PipelineStep
或其子类之一:
from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder
class UppercaserBlock(PipelineStep):
def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
super().__init__()
# 你可以接受任何需要的参数并将它们保存在这里
self.some_param = some_param
# 使用get_datafolder()加载数据文件夹
self.some_folder = get_datafolder(some_folder)
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
# 你也可以从`some_folder`加载数据:
for filepath in self.some_folder.get_shard(rank, world_size): # 它也接受glob模式,以及其他选项
with self.some_folder.open(filepath, "rt") as f:
# 做一些处理
...
yield doc
#
# 或处理来自前一个块的数据(`data`)
#
for doc in data:
with self.track_time():
# 你可以将主要处理代码包装在`track_time`中,以了解每个文档处理所需的时间
nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
# 你也可以使用stat_update跟踪每个文档的统计信息
self.stat_update("og_upper_letters", value=nr_uppercase_letters)
doc.text = doc.text.upper()
# 确保将yield保持在track_time块之外,否则会影响时间计算
yield doc
#
# 或将数据保存到磁盘
#
with self.some_folder.open("myoutput", "wt") as f:
for doc in data:
f.write(doc...)
pipeline = [
...,
UppercaserBlock("somepath"),
...
]
你也可以继承自BaseExtractor
、BaseFilter
、BaseReader
/BaseDiskReader
或DiskWriter
。
贡献
git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"
安装pre-commit代码风格钩子:
pre-commit install
运行测试:
pytest -sv ./tests/
引用
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}