Project Icon

datatrove

用于大规模文本数据处理和去重的开源Python库

DataTrove是一个开源Python库,专门用于处理、过滤和去重大规模文本数据。它提供预构建的常用处理模块和自定义功能支持。该库的处理流程可在本地或Slurm集群上运行,具有低内存消耗和多步骤设计,适合处理大型语言模型训练数据等大规模工作负载。DataTrove支持多种文件系统,为数据处理提供灵活解决方案。

DataTrove

DataTrove是一个用于大规模处理、过滤和去重文本数据的库。它提供了一套预建的常用处理模块,并提供了一个框架来轻松添加自定义功能。

DataTrove处理流水线是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用和多步骤设计使其非常适合大型工作负载,例如处理LLM的训练数据。

通过fsspec支持本地、远程和其他文件系统。

目录

安装

pip install datatrove[FLAVOUR]

可用的版本(可以用,组合,例如[processing,s3]):

  • all 安装所有内容:pip install datatrove[all]
  • io 用于读取warc/arc/wet文件和arrow/parquet格式的依赖:pip install datatrove[io]
  • processing 用于文本提取、过滤和分词的依赖:pip install datatrove[processing]
  • s3 s3支持:pip install datatrove[s3]
  • cli 命令行工具:pip install datatrove[cli]

快速开始示例

你可以查看以下示例

流水线

DataTrove文档

每个流水线模块处理datatrove Document格式的数据:

  • text 每个样本的实际文本内容
  • id 此样本的唯一标识符(字符串)
  • metadata 可以存储任何额外信息的字典

流水线模块类型

每个流水线模块接受一个Document生成器作为输入,并返回另一个Document生成器。

  • readers 从不同格式读取数据并生成Document
  • writers 以不同格式将Document保存到磁盘/云端
  • extractors 从原始格式(如网页html)中提取文本内容
  • filters 根据特定规则/标准过滤掉(删除)一些Document
  • stats 收集数据集统计信息的模块
  • tokens 用于分词或计算标记数的模块
  • dedup 用于去重的模块

完整流水线

流水线被定义为一系列流水线模块。例如,以下流水线将从磁盘读取数据,随机过滤(删除)一些文档,然后将它们写回磁盘:

from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    CSVReader(
        data_folder="/my/input/path"
    ),
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

执行器

流水线是平台无关的,这意味着同一流水线可以在不同的执行环境中无缝运行,而无需更改其步骤。每个环境都有自己的PipelineExecutor。 所有执行器的一些通用选项:

  • pipeline 由应运行的流水线步骤组成的列表
  • logging_dir 用于保存日志文件、统计信息等的数据文件夹。不要为不同的流水线/作业重复使用文件夹,因为这会覆盖你的统计信息、日志和完成情况。
  • skip_completed布尔值,默认为True)datatrove会跟踪已完成的任务,以便在重新启动作业时可以跳过这些任务。设置为False可禁用此行为
  • randomize_start_duration整数,默认为0)延迟每个任务开始的最大秒数,以防止所有任务同时开始并可能使系统过载。

调用执行器的run方法来执行其流水线。

[!TIP] Datatrove通过在${logging_dir}/completions文件夹中创建标记(空文件)来跟踪哪些任务成功完成。一旦作业完成,如果某些任务失败,你可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。

[!CAUTION] 如果你因为某些任务失败而重新启动流水线,不要改变总任务数,因为这会影响输入文件/分片的分布。

LocalPipelineExecutor

此执行器将在本地机器上启动流水线。 选项:

  • tasks 要运行的总任务数
  • workers 同时运行多少个任务。如果为-1,则不限制。任何> 1的值都将使用多进程来执行任务。
  • start_method 用于生成多进程Pool的方法。如果workers为1则忽略
执行器示例
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
    pipeline=[
        ...
    ],
    logging_dir="logs/",
    tasks=10,
    workers=5
)
executor.run()
多节点并行

你可以让不同的节点/机器处理总任务的不同部分,方法是使用local_taskslocal_rank_offset。对于每个节点/实例/机器,使用以下选项启动:

  • tasks 要执行的总任务数(跨所有机器)。这个值在每台机器上必须相同,否则输入文件分布可能会重叠! 示例:500
  • local_tasks 在这台特定机器上将执行多少个总任务。注意,你可以为每台机器使用不同的值。示例:100
  • local_rank_offset 在这台机器上执行的第一个任务的序号。如果这是你启动作业的第3台机器,而前两台机器分别运行了250和150个作业,那么当前机器的这个值应该是400

要获得最终合并的统计信息,你需要在包含所有机器统计信息的路径上手动调用merge_stats脚本。

SlurmPipelineExecutor

此执行器将在Slurm集群上启动流水线,使用Slurm作业数组来分组和管理任务。 选项:

  • tasks 要运行的总任务数。必需
  • time Slurm时间限制字符串。必需
  • partition Slurm分区。必需
  • workers 同时运行多少个任务。如果为-1,则不限制。Slurm将一次运行workers个任务。(默认:-1
  • job_name Slurm作业名称(默认:"data_processing")
  • depends 另一个SlurmPipelineExecutor实例,它将作为此流水线的依赖项(当前流水线只有在依赖的流水线成功完成后才会开始执行)
  • sbatch_args 包含你想传递给sbatch的任何其他参数的字典
  • slurm_logs_folder 保存Slurm日志文件的位置。如果为logging_dir使用本地路径,它们将保存在logging_dir/slurm_logs中。如果不是,它们将保存在当前目录的子目录中。
其他选项
  • cpus_per_task 给每个任务分配多少CPU(默认:1
  • qos Slurm qos(默认:"normal")
  • mem_per_cpu_gb 每个CPU的内存,单位为GB(默认:2)
  • env_command 激活Python环境的自定义命令(如果需要)
  • condaenv 要激活的conda环境
  • venv_path 要激活的Python环境路径
  • max_array_size $ scontrol show config中的_MaxArraySize_值。如果任务数超过此数字,将分成多个数组作业(默认:1001)
  • max_array_launch_parallel 如果由于max_array_size需要多个作业,是否一次性全部启动(并行)或顺序启动(默认:False
  • stagger_max_array_jobs 当max_array_launch_parallel为True时,这决定了在启动每个并行作业之间等待多少秒(默认:0
  • run_on_dependency_fail 当我们依赖的作业完成时开始执行,即使它失败了(默认:False
  • randomize_start 在约3分钟的窗口内随机化每个任务的开始时间。在大量访问S3存储桶等情况下很有用。(默认:False
执行器示例
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job1",
    logging_dir="logs/job1",
    tasks=500,
    workers=100,  # 省略以一次运行所有任务
    time="10:00:00",  # 10小时
    partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job2",
    logging_dir="logs/job2",
    tasks=1,
    time="5:00:00",  # 5小时
    partition="hopper-cpu",
    depends=executor1  # 此流水线只有在executor1成功完成后才会启动
)
# executor1.run()
executor2.run() # 这实际上会启动executor1,因为它是一个依赖项,所以不需要显式启动它
## 日志记录

对于 logging_dirmylogspath/exp1 的管道,将创建以下文件夹结构:

查看文件夹结构
└── mylogspath/exp1
    │── executor.json ⟵ 执行器选项和管道步骤的 json 转储
    │── launch_script.slurm ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
    │── executor.pik ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
    │── ranks_to_run.json ⟵ 正在运行的任务列表
    │── logs/
    │   └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ 每个任务的单独日志文件
    │── completions/
    │   └──[00004, 00007, 00204, ...] ⟵ 标记任务已完成的空文件。用于重新启动/恢复作业时(只会运行未完成的任务)
    │── stats/
    │   └──[00000.json, 00001.json, 00002.json, ...] ⟵ 每个任务的单独统计信息(处理的样本数量、过滤的、移除的等)
    └── stats.json ⟵ 所有任务的全局统计信息

颜色化

日志消息支持颜色化。默认情况下,控制台消息会自动检测颜色化,而日志文件(logs/task_XXXXX.log)则禁用颜色化。 要明确启用或禁用颜色化,可以设置以下环境变量:

  • DATATROVE_COLORIZE_LOGS 设为 "1" 为控制台日志消息添加 ANSI 颜色,设为 "0" 禁用颜色化。
  • DATATROVE_COLORIZE_LOG_FILES 设为 "1" 为保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。

DataFolder / 路径

Datatrove 通过 fsspec 支持多种输入/输出源。

有几种方法可以为 datatrove 块提供路径(用于 input_folderlogging_dirdata_folder 等参数):

  • str:最简单的方法是传递单个字符串。例如:/home/user/mydirs3://mybucket/myinputdatahf://datasets/allenai/c4/en/

  • (str, fsspec filesystem instance):一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))

  • (str, dict):一个字符串路径和一个用于初始化文件系统的选项字典。例如(等同于上一行):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})

  • DataFolder:你可以直接初始化一个 DataFolder 对象并将其作为参数传递

在底层,这些参数组合由 get_datafolder 解析。

实用指南

读取数据

通常,管道会以 Reader 块开始。 大多数读取器都有一个 data_folder 参数 — 指向包含要读取数据的文件夹的路径。

这些文件将分布在每个任务中。如果你有 N 个任务,排名为 i(从 0 开始)的任务将处理文件 i, i+N, i+2N, i+3N,...

在内部,每个读取器读取数据并将其转换为字典,然后创建一个 Document 对象。

大多数读取器的一些常见选项:

  • text_key 包含每个样本文本内容的字典键。默认:text
  • id_key 包含每个样本 id 的字典键。默认:id
  • default_metadata 一个字典,用于添加任何你想要的默认元数据值(例如它们的来源)
  • recursive 是否递归查找 data_folder 子目录中的文件
  • glob_pattern 用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz" 将匹配 data_folder 的每个子目录的 warc/ 文件夹中扩展名为 .warc.gz 的文件
  • adapter 此函数接收从读取器获得的原始字典,并返回带有 Document 字段名的字典。如果需要,你可以覆盖此函数(_default_adapter)。
  • limit 只读取一定数量的样本。对测试/调试有用

提取文本

你可以使用 extractors 从原始 html 中提取文本内容。datatrove 中最常用的提取器是 Trafilatura,它使用 trafilatura 库。

过滤数据

Filters 是任何数据处理管道中最重要的块之一。Datatrove 的过滤器块接收一个 Document 并返回一个布尔值(True 保留文档,False 移除它)。被移除的样本不会继续到下一个管道阶段。你还可以通过将 Writer 传递给 excluded_writer 参数来将被移除的样本保存到磁盘。

保存数据

一旦你完成了数据处理,你可能想要将其保存到某个地方。为此,你可以使用 writer。 写入器需要一个 output_folder(数据应该保存的路径)。你可以选择要使用的 compression(默认:gzip)和每个文件保存的文件名。 对于 output_filename,使用以下参数应用模板:

  • ${rank} 替换为当前任务的排名。注意,如果没有这个标签,不同的任务可能会尝试写入相同的位置
  • ${id} 替换为样本 id
  • 元数据:任何其他 ${tag} 将被相应的 document.metadata['tag'] 值替换

一个根据 lang 元数据字段分离样本的示例:

JsonlWriter(
    f"{MAIN_OUTPUT_PATH}/non_english/",
    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # 文件夹结构:language/dump/file
)

去重数据

对于去重,请查看示例 minhash_deduplication.pysentence_deduplication.pyexact_substrings.py

摘要统计

对于数据的摘要统计,你可以使用 Stats 块。这些块提供了一种简单的方法来以分布式方式收集数据集的数据概况。这是一个两步过程:

  1. 对于每个分片,遍历文档并将统计信息收集到以下分组之一:summary(所有文档计入 "summary" 键)、fqdn(完全限定域名分组)、suffix(URL 路径最后部分分组)或 histogram(基于值的分组)。
  2. 将不同分片的统计信息合并到一个文件中。 有关更多详细信息,请参阅 summary_stats.py

每个结果统计信息保存在具有以下结构的单独文件中:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json

每个这样的文件都是一个 MetricStatsDict 对象,你可以使用以下方法轻松加载:

from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))

# 例如,nytimes.com 文档的总长度
stats["nytimes.com"].total

# 或 cnn.com 文档的平均值
stats["cnn.com"].mean

可用的统计信息如下:

  • contamination_stats.pyword_contamination_{words[0]}:文档中单词污染的频率。
  • doc_stats.pylength:文档长度,white_space_ratio:空白字符比例,non_alpha_digit_ratio:非字母和非数字字符比例,digit_ratio:数字比例,uppercase_ratio:大写字母比例,elipsis_ratio:省略号字符比例,punctuation_ratio:标点符号比例
  • lang_stats.pyfasttext_{language}:使用 fastText 的文档语言
  • line_stats.pyn_lines:每个文档的行数,avg_line_length:每个文档的平均行长度,long_line_ratio_words:超过 k 个字符的行比例,short_line_ratio_chars:少于 k 个字符的行比例,bullet_point_lines_ratio:项目符号行比例,line_duplicates:重复行比例,line_char_duplicates:重复行中字符的比例
  • paragraph_stats.pyn_paragraphs:段落数,avg_paragraph_length:平均段落长度,short_paragraph_ratio_{chars}:短段落比例(<{chars} 字符),long_paragraph_ratio_{chars}:长段落比例(>{chars} 字符)
  • perplexity_stats.pyccnet_perplexity_{model_dataset}_{language}:使用 CCNet 模型在 {dataset} 上的 {language} 中的文档困惑度
  • sentence_stats.pyn_sentences:句子数,avg_sentence_length:平均句子长度,short_sentence_ratio_{chars}:短句比例(<{chars} 字符),long_sentence_ratio_{chars}:长句比例(>{chars} 字符)
  • token_stats.pytoken_count:文档中的令牌数
  • word_stats.pyn_words:文档中的单词数,avg_word_length:文档中单词的平均长度,avg_words_per_line:文档中每行的平均单词数,short_word_ratio_{chars}:短于 {chars} 字符的单词比例,stop_word_ratio:停用词比例,long_word_ratio_{chars}:长于 {chars} 字符的单词比例,type_token_ratio:唯一单词数 / 令牌数,capitalized_word_ratio:首字母大写单词比例,uppercase_word_ratio:全大写单词比例

自定义块

简单数据

你可以直接将 Document 的可迭代对象作为管道块传递,如下所示:

from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    [
        Document(text="一些数据", id="0"),
        Document(text="更多数据", id="1"),
        Document(text="更多数据", id="2"),
    ],
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

但请注意,这个可迭代对象不会被分片(如果你启动多个任务,它们都会获得完整的可迭代对象)。 这通常用于小型工作负载/测试。

自定义函数

对于简单的处理,你可以直接传入具有以下签名的自定义函数:

from datatrove.data import DocumentsPipeline
def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
    """
        `data`是Document生成器。你必须返回一个Document生成器(yield)
        你可以选择使用`rank`和`world_size`进行分片
    """
    for document in data:
        document.text = document.text.upper()
        yield document

pipeline = [
    ...,
    uppercase_everything,
    ...
]

[!提示] 由于导入问题,你可能会遇到一些pickle相关的问题。如果发生这种情况,只需将你需要的导入移到函数体内即可。

自定义块

你也可以定义一个完整的块,继承自PipelineStep或其子类之一:

from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder


class UppercaserBlock(PipelineStep):
    def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
        super().__init__()
        # 你可以接受任何需要的参数并将它们保存在这里
        self.some_param = some_param
        # 使用get_datafolder()加载数据文件夹
        self.some_folder = get_datafolder(some_folder)

    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        # 你也可以从`some_folder`加载数据:
        for filepath in self.some_folder.get_shard(rank, world_size): # 它也接受glob模式,以及其他选项
            with self.some_folder.open(filepath, "rt") as f:
                # 做一些处理
                ...
                yield doc

        #
        # 或处理来自前一个块的数据(`data`)
        #

        for doc in data:
            with self.track_time():
                # 你可以将主要处理代码包装在`track_time`中,以了解每个文档处理所需的时间
                nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
                # 你也可以使用stat_update跟踪每个文档的统计信息
                self.stat_update("og_upper_letters", value=nr_uppercase_letters)
                doc.text = doc.text.upper()
            # 确保将yield保持在track_time块之外,否则会影响时间计算
            yield doc

        #
        # 或将数据保存到磁盘
        #

        with self.some_folder.open("myoutput", "wt") as f:
            for doc in data:
                f.write(doc...)
pipeline = [
    ...,
    UppercaserBlock("somepath"),
    ...
]

你也可以继承自BaseExtractorBaseFilterBaseReader/BaseDiskReaderDiskWriter

贡献

git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"

安装pre-commit代码风格钩子:

pre-commit install

运行测试:

pytest -sv ./tests/

引用

@misc{penedo2024datatrove,
  author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
  title = {DataTrove: large scale data processing},
  year = {2024},
  publisher = {GitHub},
  journal = {GitHub repository},
  url = {https://github.com/huggingface/datatrove}
}
项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号