数据管护是预训练和持续训练模型的第一步,也可以说是最重要的一步,对于 大型语言模型 (LLM) 和小型语言模型 (SLM) 都至关重要。NVIDIA 最近宣布了 NVIDIA NeMo Curator,这是一个数据管护框架,旨在准备大规模、高质量的数据集,以用于预训练生成式 AI 模型。
NeMo Curator 是 NVIDIA NeMo 的一部分,提供了从 Common Crawl、Wikipedia 和 arXiv 等多种公共来源下载和整理数据的开箱即用工作流。此外,它还为开发者提供了自定义数据整理流程的灵活性,以满足其独特需求并创建自定义数据集。
本文将介绍如何使用 NeMo Curator 创建自定义数据管护工作流。这样一来,您可以:
- 根据生成式 AI 项目的特定需求,定制数据管护和定制流程。
- 应用严格的过滤器和重复数据删除技术,使用最佳数据集训练模型,从而确保数据质量。
- 通过识别和删除个人识别信息 (PII) 来保护隐私,并遵守数据保护法规。
- 通过自动管理流程简化开发流程,节省时间和资源,让您专注于解决业务特定问题。
概述
本教程的重点是介绍如何创建一个简单的数据管护流程,用于下载、处理和筛选 TinyStories 数据集。TinyStories 是一个由 GPT-3.5 和 GPT-4 生成的约 220 万个短篇故事组成的数据集,其中包含 3 至 4 岁儿童能够理解的英语单词。该数据集在 Hugging Face 上公开提供。如果您需要详细了解数据集,请参阅 TinyStories:语言模型的规模有多小,并且仍然会讲连贯一致的英语?
此数据集体积小巧,非常适合在本地机器上创建和验证数据管护流程。数据集分为训练和验证文件。本教程主要使用验证文件,其中包含大约 22000 条记录。
定义数据管护管道涉及以下高级步骤:
- 定义自定义文档构建器,以便:
- 从 Web 下载数据集并转换为 JSONL 格式。
- 对数据集进行迭代并提取每个文档。
- 定义自定义修改器以清理和统一文本数据。
- 使用预定义和用户定义的启发式算法过滤数据集。
- 删除重复数据集并删除相同的记录。
- 编辑数据集中的所有个人识别信息 (PII)。
- 将结果输出为 JSONL 格式。
在消费级硬件上执行此策展流程需要不到 5 分钟,且策展数据集在策展后应包含大约 21500 条记录。要访问本教程的完整代码,请访问 NVIDIA/NeMo-Curator 的 GitHub 仓库。
预备知识
在开始之前,必须安装 NeMo Curator 框架。按照项目中的说明操作,按照 NeMo Curator GitHub 自述文件 中的说明安装框架。然后,从终端运行以下命令以验证安装。此外,还需要安装后续操作所需的其他依赖项。
$ python -c "import nemo_curator; print(nemo_curator);"
$ pip3 install requests
定义自定义文档构建器
为支持处理任意数据集,NeMo Curator 提供了一组 文档构建器,以抽象化底层数据集的表示,包括:
DocumentDownloader
:一种用于将远程数据下载到磁盘的抽象类。DocumentIterator
:一种用于从磁盘读取数据集原始记录的抽象基类。DocumentExtractor
:抽象类,用于从磁盘上的记录中提取文本记录及其相关元数据。
用户可在 Omniverse 上使用多种实现,以便将其与 CommonCrawl、Wikipedia 和 arXiv 等数据集结合使用。NVIDIA/NeMo-Curator GitHub 库提供了相关资源。以下部分将展示如何实现每个抽象类,以使用 TinyStories 数据集自定义工作流程。
下载 TinyStories 数据集
首先,实现 DocumentDownloader
类,该类负责获取数据集验证拆分的 URL,并使用 requests
库。
import requests
from nemo_curator.download.doc_builder import DocumentDownloader
class TinyStoriesDownloader(DocumentDownloader):
def __init__(self, download_dir: str):
super().__init__()
if not os.path.isdir(download_dir):
os.makedirs(download_dir)
self._download_dir = download_dir
print("Download directory: ", self._download_dir)
def download(self, url: str) -> str:
filename = os.path.basename(url)
output_file = os.path.join(self._download_dir, filename)
if os.path.exists(output_file):
print(f"File '{output_file}' already exists, skipping download.")
return output_file
print(f"Downloading TinyStories dataset from '{url}'...")
response = requests.get(url)
with open(output_file, "wb") as file:
file.write(response.content)
return output_file
接下来,使用以下代码下载实际数据集:
# Download the TinyStories dataset.
downloader = TinyStoriesDownloader("/path/to/download/")
tinystories_fp = downloader.download(TINY_STORIES_URL)
write_jsonl(tinystories_fp, jsonl_dir)
数据集将作为纯文本文件下载。为解析此数据集,请使用 DocumentIterator
和 DocumentExtractor
类。这将使您能够将其转换为 JSONL 格式,该格式为 NeMo Curator 支持的格式之一。
从数据集中迭代和提取文本
在下载的文件中,每个记录(或故事)跨越几行,并通过<|endoftext|>
令牌分隔。DocumentIterator
类定义了iterate
函数,该函数会获取要迭代的文件的路径,并以记录中的原始文本和(可选)该记录的任何相关元数据的形式生成该文件的每条记录。虽然向每条记录添加元数据并不是强制性的,但是一些数据处理算法(例如重复数据删除)依赖于此类数据来唯一标识每个文档并正确执行其预期功能。
接下来,为 TinyStories 数据集实现迭代器。鉴于每个故事可以跨越多行,请定义迭代器函数,以便它会继续读取 (和存储) 文件中的每行,直到到达分隔符令牌。
到达分隔符后,连接到目前为止看到的所有行,将一些元数据固定到记录中,并生成结果。为确保记录是唯一可识别的,请使用数据集的文件名以及内部计数器创建唯一的 id
和(可选)filename
,每条记录中都包含这些元数据:
from nemo_curator.download.doc_builder import DocumentIterator
class TinyStoriesIterator(DocumentIterator):
SEPARATOR_TOKEN = "<|endoftext|>"
def __init__(self):
super().__init__()
self._counter = -1
def iterate(self, file_path):
self._counter = -1
file_name = os.path.basename(file_path)
with open(file_path, "r") as file:
example = []
def split_meta(example):
if example:
self._counter += 1
content = " ".join(example)
meta = {
"filename": file_name,
"id": f"{file_name}-{self._counter}",
}
return meta, content
for line in file:
if line.strip() == TinyStoriesIterator.SEPARATOR_TOKEN:
if example:
yield split_meta(example)
example = []
else:
example.append(line.strip())
if example:
yield split_meta(example)
最后一个要实现的文档构建器是DocumentExtractor
类,它只需返回每个记录的文本。请注意,您可以选择将提取的文本的一些元数据关联起来,但这种元数据的使用超出了本教程的范围。
from nemo_curator.download.doc_builder import DocumentExtractor
class TinyStoriesExtractor(DocumentExtractor):
def extract(self, content: str) -> Tuple[Set, str]:
# No metadata for the text, just the content.
return {}, content
将数据集写入 JSONL 格式
NeMo Curator 提供帮助程序,可以从磁盘加载 JSONL、Parquet 或 Pickle 格式的数据集。鉴于 JSONL 格式的流行程度,本节演示如何使用之前实现的迭代器和提取器类将原始文本数据集转换为此格式。
要将数据集转换为 JSONL,只需将 TinyStoriesIterator
实例应用于下载的纯文本文件中,对每条记录进行迭代,并使用 TinyStoriesExtractor
实例对其进行处理。然后,从每个记录(故事)创建一个 JSON 对象,并将其写入输出文件中的一行。整个过程非常简单:
import os
import json
def write_jsonl(input_filename: str, output_dir: str, dump_every_n: int = 10000):
basename = os.path.basename(input_filename)
iterator = TinyStoriesIterator()
extractor = TinyStoriesExtractor()
to_dump = []
dump_ctr = 0
def dump_to_file(to_dump, dump_ctr):
"""Helper function to facilitate dumping to file."""
output_filename = f"{basename}-{dump_ctr}.jsonl"
with open(os.path.join(output_dir, output_filename), "w") as output_file:
output_file.writelines(to_dump)
# Empty out the list and increment the counter.
return [], dump_ctr + 1
for item in iterator.iterate(input_filename):
record_meta, content = item
extracted = extractor.extract(content)
if extracted is None:
continue
text_meta, text = extracted
if text is None:
continue
line = {
"text": text,
**text_meta,
**record_meta,
}
json_out = json.dumps(line, ensure_ascii=False)
to_dump.append(json_out + "\n")
# Should we dump what we have so far?
if len(to_dump) == dump_every_n:
to_dump, dump_ctr = dump_to_file(to_dump, dump_ctr)
# Dump the remaining records.
if to_dump:
dump_to_file(to_dump, dump_ctr)
请注意,默认情况下,此函数为每 10000 条记录创建一个 JSONL 文件。虽然完全是可选的,但这是为了确保每个输出文件保持足够小,以便使用文本编辑器轻松进行手动检查,而不会消耗过多内存。
另请注意,每个故事的内容都写入了text
字段,该字段位于每个 JSON 对象中。整个 NeMo Curator 中的许多数据管护操作都需要知道每条记录中的哪个字段包含该记录的文本数据。如果未明确指定,这些操作将假设存在text
字段中的数据集。因此,通常较好的做法是始终填充每个记录的text
字段,以及感兴趣的文本数据。
使用文档构建器加载数据集
在 NeMo Curator 中,数据集表示为类型对象 DocumentDataset
,这为从磁盘加载各种格式的数据集提供了辅助工具。例如,以 JSONL 格式创建数据集后,您可以使用以下代码加载数据集并开始使用:
from nemo_curator.datasets import DocumentDataset
# define `files` to be a list of all the JSONL files to load
dataset = DocumentDataset.read_json(files, add_filename=True)
您现在拥有了定义自定义数据集策管线和为训练 (或验证) 用例准备数据所需的一切。
文本清理和统一
在涉及文本数据的数据管护流程中,文本统一和清理是一项基本操作,因为从在线来源抓取的文本可能存在不一致或 Unicode 问题。为了修改文档,NeMo Curator 提供了一个 DocumentModifier
界面,用于定义如何修改每个文档中的给定文本。实际修改通过 Modify
帮助器实现,它需要 DocumentDataset
对象以及 DocumentModifier
,并将修饰符应用到数据集。
TinyStories 数据集具有不一致的引号,其中一些引号是卷曲的,而另一些则是笔直的。此类不一致 (例如,质量不佳的标记) 可能会给基于此数据训练的模型带来问题。
要解决这些问题,请创建 DocumentModifier
,它通过将所有卷曲引号替换为其直接变体,从而统一文档中的所有单引号和双引号:
from nemo_curator.modifiers import DocumentModifier
class QuotationUnifier(DocumentModifier):
def modify_document(self, text: str) -> str:
text = text.replace("‘", "'").replace("’", "'")
text = text.replace("“", '"').replace("”", '"')
return text
NeMo Curator 提供了各种开箱即用的 DocumentModifier
实现。其中一个修饰符是 UnicodeReformatter
,它使用 ftfy 解决数据集中的所有 Unicode 问题。然后,将这些修改器连接在一起并清理下载的数据集。通过 Sequential
类,它接受要按顺序执行的操作列表,并将这些操作应用到给定的 DocumentDataset
实例中:
from nemo_curator import Sequential
from nemo_curator.modules.modify import Modify
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter
def clean_and_unify(dataset: DocumentDataset) -> DocumentDataset:
cleaners = Sequential(
[
# Unify all the quotation marks
Modify(QuotationUnifier()),
# Unify all unicode
Modify(UnicodeReformatter()),
]
)
return cleaners(dataset)
数据集过滤
数据集管理过程中的另一个重要步骤是数据过滤,其中会丢弃一些不符合某些标准的文档。例如,您可能想丢弃过短、过长或不完整的文档。在撰写本文时,NeMo Curator 为自然语言提供了 24 种启发式算法,为编码语言提供了 8 种启发式算法。
NeMo Curator 提供了一个 DocumentFilter
界面,该界面定义了一种基于各种标准对文档进行评分的方法,并提供了一个 ScoreFilter
辅助工具,用于筛选文档。ScoreFilter
辅助工具需要一个 DocumentDataset
和 DocumentFilter
,以确定数据集中的每个文档是否通过筛选条件。
创建一个简单的DocumentFilter
,用于确定故事是否以句子字符结尾。我们的目标是筛选掉所有未以句子字符结尾的故事:
from nemo_curator.filters import DocumentFilter
class IncompleteStoryFilter(DocumentFilter):
def __init__(self):
super().__init__()
self._story_terminators = {".", "!", "?", '"', "”"}
def score_document(self, text: str) -> bool:
ret = text.strip()[-1] in self._story_terminators
return ret
def keep_document(self, score) -> bool:
return score
主要功能体现在score_document
和keep_document
函数中,其中如果文档未以句子结束字符结尾,则返回False
(即不要保留此文档)。
要将此过滤器应用于数据集,请传递一个IncompleteStoryFilter
对象作为ScoreFilter
。NeMo Curator 提供了许多开箱即用的DocumentFilter
实现。这些过滤器可以通过Sequential
类来组合使用。以下代码展示了如何对数据集应用各种过滤器:
def filter_dataset(dataset: DocumentDataset) -> DocumentDataset:
filters = Sequential(
[
ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
),
ScoreFilter(IncompleteStoryFilter(), text_field="text"),
ScoreFilter(
RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2),
text_field="text",
),
ScoreFilter(
RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18),
text_field="text",
),
ScoreFilter(
RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16),
text_field="text",
),
]
)
return filters(dataset)
此代码将过滤所有短篇(不超过 80 个字)或不完整的故事,以及任何其他具有一定重复 n – 克比率的故事。请注意:text_field=”text”
这告诉我们:ScoreFilter
将传递数据集的内容:text
每个筛选条件的列。
重复数据删除
在处理大量文本数据时,可能会存在彼此相同(或几乎完全相同)的记录。针对此类数据进行训练可能会产生额外的计算和存储开销。NeMo Curator 提供了查找和丢弃此类重复数据的功能。为简单起见,请专注于在数据集中查找精确的重复记录。这可以使用 ExactDuplicates
类,如下所示。
本模块将自动利用现有的 CUDA 设备和 GPU 加速的实现,RAPIDS cuDF 库 识别重复文档,从而大幅缩短处理时间。这是因为重复数据删除阶段需要计算每个文档的哈希值,而这是计算密集型的任务。每个文档都可以独立进行哈希处理,因此此工作负载非常适合在 GPU 上并行运行。
from nemo_curator.modules import ExactDuplicates
def dedupe(dataset: DocumentDataset) -> DocumentDataset:
deduplicator = ExactDuplicates(id_field="id", text_field="text", hash_method="md5")
# Find the duplicates
duplicates = deduplicator(dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)
# Remove the duplicates using their IDs.
duplicate_ids = list(docs_to_remove.compute().id)
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)
这指定了每条记录的唯一标识符和内容,分别是id
和text
。回想一下,在下载和提取阶段,已为每个文档分配了唯一标识符。这使得重复数据删除器能够以唯一方式识别彼此之间的文档。重复数据删除器对象将返回一组已确定为重复的 ID,只需从数据集中删除这些文档即可。
PII 编辑
本教程中讨论的最后一个处理步骤是编辑个人识别信息 (PII)。NeMo Curator 使用 PiiModifier
类,这是对之前讨论过的 DocumentModifier
类的扩展。此修饰符利用了 Presidio 框架,并允许您指定要检测的 PII、对每次检测采取的操作,以及批量处理数据以加速操作。
TinyStories 数据集中的故事包含许多名字的实例。此示例旨在检测所有此类名字,并将其替换为匿名令牌。这可以使用几行代码来完成:
from nemo_curator.modifiers.pii_modifier import PiiModifier
def redact_pii(dataset: DocumentDataset) -> DocumentDataset:
redactor = Modify(
PiiModifier(
supported_entities=["PERSON"],
anonymize_action="replace",
device="cpu",
),
)
return redactor(dataset)
此操作会获取整个数据集,并返回修改后的数据集。
整合管线
在实现管线的每个步骤后,是时候将所有内容放在一起,并按顺序对数据集应用每个操作了。您可以使用Sequential
API 将类链式管理操作结合在一起:
curation_steps = Sequential(
[
clean_and_unify,
filter_dataset,
dedupe,
redact_pii,
]
)
dataset = curation_steps(dataset)
print("Executing the pipeline...")
dataset = dataset.persist()
dataset.to_json("/output/path", write_to_filename=True)
在幕后,NeMo Curator 使用 Dask 以分布式方式处理数据集。由于 Dask 操作是延迟评估的,因此有必要调用.persist
方法,以指示 Dask 应用操作的函数。处理完成后,您可以通过调用.to_json
方法,并提供输出路径,以获取处理结果。
后续步骤
NeMo Curator 支持许多先进的数据处理和过滤技术,例如模糊或基于任务的重复数据删除、任务识别和去污染、域分类等(还有更多内容),本教程中并未涵盖这些技术。欲了解更多信息,请查看 GitHub 上的数据管护示例集合。
此外,您还可以 申请访问 NVIDIA NeMo Curator 微服务,该微服务为企业提供了从任何地方开始数据管护的最简单途径。它提供了精简的性能和可扩展性,以缩短上市时间。