大语言模型(LLMs)的领域自适应预训练(DAPT)是构建特定领域模型的重要步骤。与现成的开放或商用模型相比,这些模型在特定领域任务中表现出更出色的功能。
最近,NVIDIA 发表了一篇关于 ChipNeMo 的论文,这是一系列面向工业芯片设计应用的基础模型。ChipNeMo 模型是通过在专有数据和公开可用的特定领域数据的语料库上对 Llama 2 系列模型进行持续预训练的结果。
本文将以 ChipNeMo 数据集为例,介绍使用 NVIDIA NeMo Curator 从各种公开来源收集训练数据集的过程。
NeMo Curator
NeMo Curator 是一个 GPU 加速的数据 curation 库,通过准备用于预训练和自定义的大规模、高质量数据集来提高生成式 AI 模型的性能。
NeMo Curator 通过扩展到多节点多 GPU (MNMG) 来缩短数据处理时间,并支持大型预训练数据集的准备。它提供了从 Common Crawl、Wikipedia 和 arXiv 等开箱即用的各种公共来源下载和整理数据的工作流程。
它还为您提供了自定义数据采集流程的灵活性,以满足它们独特的要求,并创建自定义数据集。
有关基本构建块的更多信息,请参阅“使用 NVIDIA NeMo Curator 为 LLM 训练整理自定义数据集”的教程。
ChipNeMo
ChipNeMo 的大部分训练语料库包括来自 Wikipedia、开源 GitHub 资源库以及 arXiv 出版物的数据。
图 1 显示数据 curation 管道涉及以下高级步骤:
- Acquiring data:
- 下载相关的维基百科文章,并将其转换为JSONL文件。
- 克隆相关的 GitHub 资源库,确定所有相关的源代码文件,并将其转换为 JSONL 文件。
- 从 arXiv 下载 PDF 格式的论文,并将其转换为 JSONL 文件。
- 使用现有工具统一 Unicode 表征和特殊字符。
- 定义自定义过滤器以删除过短、过长、重复或不相关的记录。
- 从数据集中编辑所有个人身份信息(PII)。
- 根据元数据整理数据,并将结果写入磁盘。
- (可选) 混合和混洗数据。
要访问本教程的完整代码,请参阅 NVIDIA/NeMo-Curator GitHub 资源库。
预备知识
开始之前,请按照 NeMo Curator 的 GitHub README 文件中的说明安装 NeMo Curator。
本教程还依赖于 Tesseract 库来启用 PDF 解析功能,您可以通过获取二进制文件或操作系统的包管理器来安装该功能。
之后,在终端运行以下命令以验证安装。此外,安装后续操作所需的依赖项
$ sudo apt install tesseract-ocr # For Debian-based Linux distros
$ pip install nemo-curator
$ python -c "import nemo_curator; print(nemo_curator);"
$ pip3 install -r requirements.txt
数据采集
我们提供了 ChipNeMo 训练语料库中使用的 Wikipedia 文章、GitHub 资源库和 arXiv 出版物的列表,并演示了如何将这些数据转换为 JSONL。
转换过程因数据源而异:
- 对于维基百科文章,请解析网页以提取主要内容。
- 对于 arXiv 出版物,请将 PDF 文件解析为纯文本。
- 对于 GitHub 资源库,请识别相关的源代码文件,并忽略不相关的数据。
如前文教程所述,整理数据集的第一步是实现可下载并迭代数据集的文档构建器。
要使用 Dask 的并行性,请将文档构建器实现插入 NeMo Curator 提供的 download_and_extract 辅助程序。这辅助程序使用 Dask 工作器并行下载和解析数据,从而在处理多个数据源时显著加快该过程。
文档构建器实现
首先,实现 DocumentDownloader
类,该类获取数据集的 URL 并使用 requests 库进行下载。目前,重点关注下载和解析 GitHub 资源库的任务。您稍后也可以类似地获取 Wikipedia 和 arXiv 数据。
要高效获取 GitHub 资源库,请将其下载为 .zip 存档,而不是通过 git 命令进行克隆。此方法速度更快,并且节省了磁盘空间,因为您可以直接处理 .zip 文件。
要下载 .zip 版本的资源库,请确定该资源库的主分支的名称。在生产流水线中,最好直接查询 GitHub API,并为每个资源库找出主分支。由于 API 通常受速率限制并需要身份验证,因此我们展示了如何尝试一些不同的常见分支名称,以查看哪些分支有效:例如,尝试 “main”、”master” 或 “develop” 等。
import requests
from nemo_curator.download.doc_builder import DocumentDownloader
class GitHubDownloader(DocumentDownloader):
"""
A class for downloading repositories from GitHub.
"""
def __init__(self, github_root_dir: str):
"""
Initializes the DocBuilder object.
Args:
github_root_dir: The root directory for GitHub repositories.
"""
super().__init__()
# The path under which the repositories will be cloned.
self.clone_root_dir = os.path.join(github_root_dir, "repos")
os.makedirs(github_root_dir, exist_ok=True)
os.makedirs(self.clone_root_dir, exist_ok=True)
def download(self, url: str) -> str:
"""
Download a repository as a zip file.
Args:
url (str): The URL of the repository.
Returns:
str: The path to the downloaded zip file, or None if the download failed.
"""
repo_name = os.path.basename(url)
zip_file = os.path.join(self.clone_root_dir, repo_name + ".zip")
if os.path.exists(zip_file):
print(f"Repository '{repo_name}' already exists, skipping download.")
return zip_file
# Try the common branch names first. A better way to do this would be to
# query the GitHub API to get the default branch, but that is subject to rate limits.
success = False
for branch in ["master", "main"]:
zip_url = f"https://github.com/{url}/archive/refs/heads/{branch}.zip"
# Send a GET request to the URL
response = requests.get(zip_url)
# Check if the request was successful
if response.status_code == 200:
# Write the content of the response to a file
with open(zip_file, "wb") as file:
file.write(response.content)
# No need to try other branches
success = True
break
if not success:
print(
f"Failed to clone repository '{repo_name}' from '{url}' (error code {response.status_code})."
)
return None
return zip_file
解析和迭代数据集
实施 DocumentIterator
和 DocumentExtractor
类,以遍历数据源并解析所有相关源文件。在迭代器实现中,您可以添加任何其他相关元数据或限制所解析的文件。
以下实现将打开每个存储库的 .zip 文件,并遍历所有文件,同时跳过所有隐藏文件和目录。它通过扩展名确定相关文件,并使用 cchardet
库确定每个文件的编码。除了每个文件的内容外,此实现还存储了一些有用的元数据,并将其返回给调用方。
提取器实现将返回文件的解析内容。
import os
from zipfile import ZipFile, ZipInfo
import cchardet as chardet
from nemo_curator.download.doc_builder import DocumentIterator
class GitHubIterator(DocumentIterator):
"""
GitHub document iterator. Will go through the files and parse the supported ones.
"""
# Mapping from file extensions to categories.
# Will also be used to to ignore irrelevant files.
SUPPORTED_EXTENSIONS_TO_CATEGORY = {
".v": "VerilogVHDL",
".vh": "VerilogVHDL",
".vhdl": "VerilogVHDL",
".va": "VerilogAnalog",
".c": "CPP",
".cpp": "CPP",
".h": "CPP",
".hpp": "CPP",
".py": "Python",
".config": "Config",
".mk": "Makefile",
"makefile": "Makefile",
"makeppfile": "Makefile",
".pm": "Perl",
".pl": "Perl",
".tcl": "Tcl",
".spec": "Spec",
".yaml": "Yaml",
".yml": "Yaml",
".sp": "Spice",
".cir": "Spice",
".cmd": "Spice",
".spf": "Spice",
".spice": "Spice",
".txt": "text",
".json": "text",
".xml": "text",
".html": "text",
".pdf": "text",
".md": "text",
"": "text", # No extension
}
def parse_file(self, zip_ref: ZipFile, file_info: ZipInfo):
"""
Parses a file from a zip archive and extracts its metadata and content.
Args:
zip_ref: The zip archive object.
file_info: Information about the file in the zip archive.
Returns:
A tuple containing the metadata and the content of the file. The metadata is a dictionary.
If the file extension or filename is not supported, or if the file cannot be decoded,
None is returned.
"""
zip_path = zip_ref.filename
input_fp = file_info.filename
full_path = os.path.join(zip_path, input_fp)
# Extract the file name and extension in lower case.
filename = os.path.basename(input_fp)
filename_no_ext, ext = os.path.splitext(filename)
filename_no_ext = filename_no_ext.lower()
ext = ext.lower()
# If neither the file extension nor the filename is supported, return None
if ext not in GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY:
if filename_no_ext not in GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY:
return None
# The filename is there, but the extension is not. The category is determined by the filename.
category = GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY[filename_no_ext]
else:
category = GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY[ext]
# Open the file and read its content. Determine the encoding using cchardet. Skip over binary files.
with zip_ref.open(file_info, "r") as file:
content = file.read()
# Determine the encoding of the file
encoding = chardet.detect(content)["encoding"]
if not encoding:
return None
try:
content = content.decode(encoding)
except UnicodeDecodeError:
# If the file cannot be decoded, return None
return None
# Extract the metadata
line_count = content.count("\n") + 1
size_in_bytes = file_info.file_size
if category == "text":
file_type = "text"
else:
file_type = "code"
metadata = {
# Use the file path as the unique ID
"id": full_path,
"file_extension": ext,
"file_type": file_type,
"category": category,
"line_count": line_count,
"size_in_bytes": size_in_bytes,
"path": full_path,
}
return metadata, content
def iterate(self, file_path: str):
"""
Iterates over the files in a zip archive and yields the parsed content of each file.
Args:
file_path: The path to the zip archive.
Yields:
Parsed content of each file in the zip archive.
"""
if not file_path:
return
with ZipFile(file_path, "r") as zip_ref:
for file_info in zip_ref.infolist():
filename = file_info.filename
# Skip directories and hidden files
if file_info.is_dir() or any(
part.startswith(".") for part in filename.split(os.sep)
):
continue
parsed = self.parse_file(zip_ref, file_info)
if parsed:
yield parsed
class GitHubExtractor(DocumentExtractor):
def extract(self, content: str):
# Just return the content.
return {}, content
下载数据集
将上述已实现的组件插入 NeMo Curator 助手程序,以从所有可用来源获取数据。
以下代码示例演示了 GitHub 资源库的此过程。download_and_extract
函数获取数据集源的列表并将其转发给下载器。然后,它会在每个下载的源上运行迭代器和提取器实现,以获取解析后的数据。
output_format
字典用于向底层的 Dask 模块提供有关每个提取字段的类型信息,从而避免类型推理的运行时惩罚。
from nemo_curator.download.doc_builder import download_and_extract
downloader = GitHubDownloader(output_dir)
iterator = GitHubIterator()
extractor = GitHubExtractor()
output_format = {
"text": str,
"id": str,
"file_extension": str,
"category": str,
"line_count": int,
"size_in_bytes": int,
"path": str,
}
dataset = download_and_extract(
urls=urls,
output_paths=[
os.path.join(output_jsonl_dir, os.path.basename(url)) for url in urls
],
downloader=downloader,
iterator=iterator,
extractor=extractor,
output_format=output_format,
keep_raw_download=True,
)
download_and_extract
函数需要每个数据集源的输出路径,该路径用于以 JSONL 格式存储已解析的数据集,从而无需多次下载和提取源。
完成后,此函数将返回 DocumentDataset
实例。
使用文档构建器加载数据集
在 NeMo Curator 中,数据集表示为 DocumentDataset
类型的对象。本类提供了从磁盘加载各种格式的数据集的助手。以 JSONL 格式创建数据集后,您可以使用以下代码进行加载并开始使用它:
from nemo_curator.datasets import DocumentDataset
# define `code_files` to be the path to the JSONL file created above.
dataset_code = DocumentDataset.read_json(code_files, add_filename=True)
# define `text_files` to be the path to the JSONL file created from text sources.
dataset_text = DocumentDataset.read_json(text_files, add_filename=True)
考虑到这些数据来自不同来源,因此可能更容易存储两个独立的数据集实例,一个用于存储来自文本来源(例如 Wikipedia 或 arXiv 论文)的数据,另一个用于存储来自代码来源(例如 GitHub 资源库)的数据。这使您能够定义特定于源的处理流程,例如对文本来源应用 PII 编辑,以及为代码来源删除许可文本。
您现在拥有了定义自定义数据集 curation 管道和准备数据所需的一切。
Unicode 格式和文本统一
通常,最好修复数据集中的所有 Unicode 问题,因为从在线来源抓取的文本可能存在不一致或 Unicode 问题。
为修改文档,NeMo Curator 提供了一个 DocumentModifier
接口以及 Modify
辅助程序,用于定义如何修改每个文档中的给定文本。有关实现您自己的自定义文档修饰符的更多信息,请参阅“使用 NVIDIA NeMo Curator 进行 LLM 参数高效微调来管理自定义数据集”教程中的“文本清理和统一”部分。
在这里,只需对数据集应用 NeMo Curator UnicodeReformatter
修饰符即可。
此外,您还可以修改数据集中的所有引号,并确保没有角度引号变体。您可以通过使用所需逻辑实现 DocumentModifier
接口来做到这一点。
考虑到每条记录都有多个字段,请仅将操作应用于数据集中的相关字段(在本例中为“text”
)。使用Sequential
类将这些操作链式关联在一起:
Sequential([
Modify(QuotationUnifier(), text_field="text"),
Modify(UnicodeReformatter(), text_field="text"),
])
数据集过滤
统一数据集中的所有文本后,请为数据集应用一些过滤器,以确保文档符合特定的标准;例如,它们都应有合理的长度,并且 URL 或其他重复文本不应过多。
NeMo Curator 提供了许多此类滤镜。你还可以通过实现 DocumentFilter 接口来创建自己的自定义滤镜。有关更多信息,请参阅“Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo Curator”教程中的“Designing custom dataset filters”部分。
以下代码示例展示了适用于文本数据的各种过滤器的链式连接。
def filter_text(dataset: DocumentDataset) -> DocumentDataset:
"""
Filters the given dataset based on various criteria.
Refer to the full list of all filters here:
https://github.com/NVIDIA/NeMo-Curator/blob/main/config/heuristic_filter_en.yaml
https://github.com/NVIDIA/NeMo-Curator/blob/main/tutorials/peft-curation/main.py
Args:
dataset (DocumentDataset): The dataset to be filtered.
Returns:
DocumentDataset: The filtered dataset.
"""
filters = Sequential(
[
# If a document contains a number of words not
# within a specified range then discard
ScoreFilter(
WordCountFilter(min_words=50, max_words=100000),
text_field="text",
score_field="word_count",
score_type=int,
),
# If the document shrinks by > x% in terms of number of characters after
# removing the top n-grams then discard. Source: Gopher (Rae et al., 2021)
ScoreFilter(
RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2),
text_field="text",
score_type=float,
),
ScoreFilter(
RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18),
text_field="text",
score_type=float,
),
ScoreFilter(
RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16),
text_field="text",
score_type=float,
),
ScoreFilter(
RepeatedParagraphsFilter(max_repeated_paragraphs_ratio=0.7),
text_field="text",
score_type=float,
),
# If more than 20% of the document is comprised of URLs then discard
ScoreFilter(
UrlsFilter(max_url_to_text_ratio=0.2),
text_field="text",
score_type=float,
),
]
)
filtered_dataset = filters(dataset)
return filtered_dataset
PII 编辑
接下来,定义处理步骤,以编辑记录中的所有个人身份信息(PII)。根据数据源(文本或代码),确保将操作应用到适当的数据集和数据字段。此外,定义检测到 PII 时应采取的操作。
以下代码示例定义了两个函数,分别用于文本源和代码源,用于 PII 编辑。
def redact_pii(dataset: DocumentDataset) -> DocumentDataset:
redactor = Modify(
PiiModifier(
supported_entities=[
"PERSON",
"EMAIL_ADDRESS",
],
anonymize_action="replace",
device="gpu",
),
text_field="extracted_comment",
)
return redactor(dataset)
def redact_code(dataset: DocumentDataset) -> DocumentDataset:
# functions to extract comment lines from each row in a dataframe
def func(row):
return row["text"][row["text"].find("/*") : row["text"].find("*/") + 2]
def func2(row):
comment = row["text"][row["text"].find("/*") : row["text"].find("*/") + 2]
return row["text"].replace(comment, str(row["extracted_comment"]))
dataset.df["extracted_comment"] = dataset.df.apply(func, axis=1, meta=(None, str))
redacted_dataset = redact_pii(dataset)
redacted_dataset.df["text"] = redacted_dataset.df.apply(
func2, axis=1, meta=(None, str)
)
redacted_dataset.df = redacted_dataset.df.drop(["extracted_comment"], axis=1)
return redacted_dataset
重复数据删除
获取的数据可能包含大量重复记录。对于从 GitHub 抓取的代码文件尤其如此。
定义处理步骤,在该步骤中检测并删除包含相同信息的文档。这通常称为精确去重复,适用于许多数据整理流程。
def dedupe(dataset: DocumentDataset) -> DocumentDataset:
"""
Remove exact duplicates from the given DocumentDataset.
Args:
dataset (DocumentDataset): The dataset containing documents.
Returns:
DocumentDataset: The deduplicated dataset.
"""
deduplicator = ExactDuplicates(id_field="id", text_field="text", hash_method="md5")
# Find the duplicates
duplicates = deduplicator(dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)
# Remove the duplicates using their IDs.
duplicate_ids = list(docs_to_remove.compute().id)
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)
此函数为数据集中的每个文档计算哈希签名,并标记共享相同签名以供删除的那些文档。
整合管护流程
现在,管护流程的每个步骤都已实施,现在可以集成所有内容,并按顺序将每个操作应用到数据集。
使用 Sequential
类将策划操作连接在一起。
# Define data curation steps for text and pdf files
curation_steps_text = Sequential(
[
clean_and_unify,
ScoreFilter(
TextLineCountFilter(), text_field="file_type_count", score_type=bool
),
filter_text,
dedupe,
]
)
# Define data curation steps for code files
curation_steps_code = Sequential(
[
clean_and_unify,
ScoreFilter(
CodeLineCountFilter(), text_field="file_type_count", score_type=bool
),
filter_code,
dedupe,
redact_code,
]
)
dataset_text = curation_steps_text(dataset_text).persist()
dataset_code = curation_steps_text(dataset_code).persist()
dataset_text.to_json(out_path, write_to_filename=True)
dataset_code.to_json(out_path, write_to_filename=True)
# Split the dataset by file category and save curated files (optional - to create blended datasets)
separated_data_text = separate_by_metadata(
dataset_text.df, out_path, "category"
).compute()
separated_data_code = separate_by_metadata(
dataset_code.df, out_path, "category"
).compute()
在后端,NeMo Curator 使用 Dask 以分布式方式处理数据集。由于 Dask 操作是延迟评估的,因此只有在调用函数(如本例中的 .persist
)时才开始计算。
将数据集保存到磁盘,并通过提供 write_to_filename=True
指示框架使用适当的文件名写入每条记录。
最后,如果您计划执行可选的数据集混洗和混合,请按类别分割数据集。
数据集混合和混洗 (可选)
工作流的最后一步是将不同来源的数据集混合在一起,并对其进行混洗。离线混合和混洗通过集成各种数据,防止通过随机数据暴露进行过拟合,增强了基础LLM的泛化。
为此,请按照以下代码示例中所示定义混合函数,并提供每个数据源、混合比率和目标大小,以定义数据集的最终大小。
def blend_and_shuffle(
args: Any, dataset_paths: list, dataset_weights: list, target_size: int
) -> None:
"""
Blend and shuffle curated data based on file paths for continued pre-training
Args:
args (Any): Command-line arguments.
dataset_paths (list): List containing directory paths where the different JSONL files are stored.
dataset_weights (list): List setting weights for each directory path
target_size (int): Target number of data samples after blending
"""
root_path = os.path.join(DATA_DIR, "curated")
output_path = root_path + "/data_blended"
if os.path.isdir(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path)
# Blend the datasets
datasets = [DocumentDataset.read_json(path) for path in dataset_paths]
blended_dataset = nc.blend_datasets(target_size, datasets, dataset_weights)
shuffle = nc.Shuffle(seed=42)
blended_dataset = shuffle(blended_dataset)
# Save the blend
blended_dataset.to_json(output_path)
# Function call
root_path = os.path.join(DATA_DIR, "curated")
dataset_paths = [
root_path + "/CPP",
root_path + "/VerilogVHDL",
root_path + "/text",
root_path + "/Python",
]
dataset_weights = [1.0, 4.0, 4.0, 1.0]
blend_and_shuffle(dataset_paths, dataset_weights, target_size=20)
调用后,精选数据集将保存在 output_path
下。
后续步骤
现在,您已学习如何使用 NeMo Curator 处理 DAPT 数据,现在可以开始尝试了。获取本教程的完整源代码,调整代码以根据您的领域定制数据,并开发功能强大的特定领域 LLMs。
您还可以申请抢先体验 NVIDIA NeMo Curator 微服务,该服务为企业随时随地开始数据 curation 提供了最简单的途径,并提供精简的性能和可扩展性,从而缩短上市时间。
如需申请,请访问NeMo Curator微服务早期访问版。