数据科学

借助 NVIDIA NeMo Curator 简化域自适应预训练的数据处理

大语言模型(LLMs)的领域自适应预训练(DAPT)是构建特定领域模型的重要步骤。与现成的开放或商用模型相比,这些模型在特定领域任务中表现出更出色的功能。

最近,NVIDIA 发表了一篇关于 ChipNeMo 的论文,这是一系列面向工业芯片设计应用的基础模型。ChipNeMo 模型是通过在专有数据和公开可用的特定领域数据的语料库上对 Llama 2 系列模型进行持续预训练的结果。

本文将以 ChipNeMo 数据集为例,介绍使用 NVIDIA NeMo Curator 从各种公开来源收集训练数据集的过程。

NeMo Curator

NeMo Curator 是一个 GPU 加速的数据 curation 库,通过准备用于预训练和自定义的大规模、高质量数据集来提高生成式 AI 模型的性能。

NeMo Curator 通过扩展到多节点多 GPU (MNMG) 来缩短数据处理时间,并支持大型预训练数据集的准备。它提供了从 Common Crawl、Wikipedia 和 arXiv 等开箱即用的各种公共来源下载和整理数据的工作流程。

它还为您提供了自定义数据采集流程的灵活性,以满足它们独特的要求,并创建自定义数据集。

有关基本构建块的更多信息,请参阅“使用 NVIDIA NeMo Curator 为 LLM 训练整理自定义数据集”的教程。

ChipNeMo

ChipNeMo 的大部分训练语料库包括来自 Wikipedia、开源 GitHub 资源库以及 arXiv 出版物的数据。

图 1 显示数据 curation 管道涉及以下高级步骤:

  1. Acquiring data:
    • 下载相关的维基百科文章,并将其转换为JSONL文件。
    • 克隆相关的 GitHub 资源库,确定所有相关的源代码文件,并将其转换为 JSONL 文件。
    • 从 arXiv 下载 PDF 格式的论文,并将其转换为 JSONL 文件。
  2. 使用现有工具统一 Unicode 表征和特殊字符。
  3. 定义自定义过滤器以删除过短、过长、重复或不相关的记录。
  4. 从数据集中编辑所有个人身份信息(PII)。
  5. 根据元数据整理数据,并将结果写入磁盘。
  6. (可选) 混合和混洗数据。
Diagram shows the different steps in processing the data for training domain-specific LLMs, including downloading, extracting, cleaning, blending, and shuffling.
图 1. 处理数据以训练特定领域的LLMs

要访问本教程的完整代码,请参阅 NVIDIA/NeMo-Curator GitHub 资源库。

预备知识

开始之前,请按照 NeMo Curator 的 GitHub README 文件中的说明安装 NeMo Curator。

本教程还依赖于 Tesseract 库来启用 PDF 解析功能,您可以通过获取二进制文件或操作系统的包管理器来安装该功能。

之后,在终端运行以下命令以验证安装。此外,安装后续操作所需的依赖项

$ sudo apt install tesseract-ocr  # For Debian-based Linux distros
$ pip install nemo-curator
$ python -c "import nemo_curator; print(nemo_curator);"
$ pip3 install -r requirements.txt

数据采集

我们提供了 ChipNeMo 训练语料库中使用的 Wikipedia 文章、GitHub 资源库和 arXiv 出版物的列表,并演示了如何将这些数据转换为 JSONL。

转换过程因数据源而异:

  • 对于维基百科文章,请解析网页以提取主要内容。
  • 对于 arXiv 出版物,请将 PDF 文件解析为纯文本。
  • 对于 GitHub 资源库,请识别相关的源代码文件,并忽略不相关的数据。

如前文教程所述,整理数据集的第一步是实现可下载并迭代数据集的文档构建器

要使用 Dask 的并行性,请将文档构建器实现插入 NeMo Curator 提供的 download_and_extract 辅助程序。这辅助程序使用 Dask 工作器并行下载和解析数据,从而在处理多个数据源时显著加快该过程。

文档构建器实现

首先,实现 DocumentDownloader 类,该类获取数据集的 URL 并使用 requests 库进行下载。目前,重点关注下载和解析 GitHub 资源库的任务。您稍后也可以类似地获取 Wikipedia 和 arXiv 数据。

要高效获取 GitHub 资源库,请将其下载为 .zip 存档,而不是通过 git 命令进行克隆。此方法速度更快,并且节省了磁盘空间,因为您可以直接处理 .zip 文件。

要下载 .zip 版本的资源库,请确定该资源库的主分支的名称。在生产流水线中,最好直接查询 GitHub API,并为每个资源库找出主分支。由于 API 通常受速率限制并需要身份验证,因此我们展示了如何尝试一些不同的常见分支名称,以查看哪些分支有效:例如,尝试 “main”、”master” 或 “develop” 等。

import requests
from nemo_curator.download.doc_builder import DocumentDownloader

class GitHubDownloader(DocumentDownloader):
    """
    A class for downloading repositories from GitHub.
    """

    def __init__(self, github_root_dir: str):
        """
        Initializes the DocBuilder object.

        Args:
            github_root_dir: The root directory for GitHub repositories.
        """
        super().__init__()
        # The path under which the repositories will be cloned.
        self.clone_root_dir = os.path.join(github_root_dir, "repos")
        os.makedirs(github_root_dir, exist_ok=True)
        os.makedirs(self.clone_root_dir, exist_ok=True)

    def download(self, url: str) -> str:
        """
        Download a repository as a zip file.

        Args:
            url (str): The URL of the repository.

        Returns:
            str: The path to the downloaded zip file, or None if the download failed.
        """
        repo_name = os.path.basename(url)
        zip_file = os.path.join(self.clone_root_dir, repo_name + ".zip")

        if os.path.exists(zip_file):
            print(f"Repository '{repo_name}' already exists, skipping download.")
            return zip_file

        # Try the common branch names first. A better way to do this would be to
        # query the GitHub API to get the default branch, but that is subject to rate limits.
        success = False

        for branch in ["master", "main"]:
            zip_url = f"https://github.com/{url}/archive/refs/heads/{branch}.zip"

            # Send a GET request to the URL
            response = requests.get(zip_url)

            # Check if the request was successful
            if response.status_code == 200:
                # Write the content of the response to a file
                with open(zip_file, "wb") as file:
                    file.write(response.content)

                # No need to try other branches
                success = True
                break

        if not success:
            print(
                f"Failed to clone repository '{repo_name}' from '{url}' (error code {response.status_code})."
            )
            return None

        return zip_file

解析和迭代数据集

实施 DocumentIteratorDocumentExtractor 类,以遍历数据源并解析所有相关源文件。在迭代器实现中,您可以添加任何其他相关元数据或限制所解析的文件。

以下实现将打开每个存储库的 .zip 文件,并遍历所有文件,同时跳过所有隐藏文件和目录。它通过扩展名确定相关文件,并使用 cchardet 库确定每个文件的编码。除了每个文件的内容外,此实现还存储了一些有用的元数据,并将其返回给调用方。

提取器实现将返回文件的解析内容。

import os
from zipfile import ZipFile, ZipInfo
import cchardet as chardet
from nemo_curator.download.doc_builder import DocumentIterator

class GitHubIterator(DocumentIterator):
    """
    GitHub document iterator. Will go through the files and parse the supported ones.
    """

    # Mapping from file extensions to categories.
    # Will also be used to to ignore irrelevant files.
    SUPPORTED_EXTENSIONS_TO_CATEGORY = {
        ".v": "VerilogVHDL",
        ".vh": "VerilogVHDL",
        ".vhdl": "VerilogVHDL",
        ".va": "VerilogAnalog",
        ".c": "CPP",
        ".cpp": "CPP",
        ".h": "CPP",
        ".hpp": "CPP",
        ".py": "Python",
        ".config": "Config",
        ".mk": "Makefile",
        "makefile": "Makefile",
        "makeppfile": "Makefile",
        ".pm": "Perl",
        ".pl": "Perl",
        ".tcl": "Tcl",
        ".spec": "Spec",
        ".yaml": "Yaml",
        ".yml": "Yaml",
        ".sp": "Spice",
        ".cir": "Spice",
        ".cmd": "Spice",
        ".spf": "Spice",
        ".spice": "Spice",
        ".txt": "text",
        ".json": "text",
        ".xml": "text",
        ".html": "text",
        ".pdf": "text",
        ".md": "text",
        "": "text",  # No extension
    }

    def parse_file(self, zip_ref: ZipFile, file_info: ZipInfo):
        """
        Parses a file from a zip archive and extracts its metadata and content.

        Args:
            zip_ref: The zip archive object.
            file_info: Information about the file in the zip archive.

        Returns:
            A tuple containing the metadata and the content of the file. The metadata is a dictionary.
            If the file extension or filename is not supported, or if the file cannot be decoded,
            None is returned.
        """
        zip_path = zip_ref.filename
        input_fp = file_info.filename
        full_path = os.path.join(zip_path, input_fp)
        # Extract the file name and extension in lower case.
        filename = os.path.basename(input_fp)
        filename_no_ext, ext = os.path.splitext(filename)
        filename_no_ext = filename_no_ext.lower()
        ext = ext.lower()

        # If neither the file extension nor the filename is supported, return None
        if ext not in GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY:
            if filename_no_ext not in GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY:
                return None

            # The filename is there, but the extension is not. The category is determined by the filename.
            category = GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY[filename_no_ext]
        else:
            category = GitHubIterator.SUPPORTED_EXTENSIONS_TO_CATEGORY[ext]

        # Open the file and read its content. Determine the encoding using cchardet. Skip over binary files.
        with zip_ref.open(file_info, "r") as file:
            content = file.read()
            # Determine the encoding of the file
            encoding = chardet.detect(content)["encoding"]

            if not encoding:
                return None

            try:
                content = content.decode(encoding)
            except UnicodeDecodeError:
                # If the file cannot be decoded, return None
                return None

        # Extract the metadata
        line_count = content.count("\n") + 1
        size_in_bytes = file_info.file_size

        if category == "text":
            file_type = "text"
        else:
            file_type = "code"

        metadata = {
            # Use the file path as the unique ID
            "id": full_path,
            "file_extension": ext,
            "file_type": file_type,
            "category": category,
            "line_count": line_count,
            "size_in_bytes": size_in_bytes,
            "path": full_path,
        }
        return metadata, content

    def iterate(self, file_path: str):
        """
        Iterates over the files in a zip archive and yields the parsed content of each file.

        Args:
            file_path: The path to the zip archive.

        Yields:
            Parsed content of each file in the zip archive.
        """

        if not file_path:
            return

        with ZipFile(file_path, "r") as zip_ref:
            for file_info in zip_ref.infolist():
                filename = file_info.filename

                # Skip directories and hidden files
                if file_info.is_dir() or any(
                    part.startswith(".") for part in filename.split(os.sep)
                ):
                    continue

                parsed = self.parse_file(zip_ref, file_info)
                if parsed:
                    yield parsed


class GitHubExtractor(DocumentExtractor):
    def extract(self, content: str):
        # Just return the content.
        return {}, content

下载数据集

将上述已实现的组件插入 NeMo Curator 助手程序,以从所有可用来源获取数据。

以下代码示例演示了 GitHub 资源库的此过程。download_and_extract 函数获取数据集源的列表并将其转发给下载器。然后,它会在每个下载的源上运行迭代器和提取器实现,以获取解析后的数据。

output_format 字典用于向底层的 Dask 模块提供有关每个提取字段的类型信息,从而避免类型推理的运行时惩罚。

from nemo_curator.download.doc_builder import download_and_extract

downloader = GitHubDownloader(output_dir)
iterator = GitHubIterator()
extractor = GitHubExtractor()

output_format = {
    "text": str,
    "id": str,
    "file_extension": str,
    "category": str,
    "line_count": int,
    "size_in_bytes": int,
    "path": str,
}

dataset = download_and_extract(
    urls=urls,
    output_paths=[
        os.path.join(output_jsonl_dir, os.path.basename(url)) for url in urls
    ],
    downloader=downloader,
    iterator=iterator,
    extractor=extractor,
    output_format=output_format,
    keep_raw_download=True,
)

download_and_extract 函数需要每个数据集源的输出路径,该路径用于以 JSONL 格式存储已解析的数据集,从而无需多次下载和提取源。

完成后,此函数将返回 DocumentDataset 实例。

使用文档构建器加载数据集

在 NeMo Curator 中,数据集表示为 DocumentDataset 类型的对象。本类提供了从磁盘加载各种格式的数据集的助手。以 JSONL 格式创建数据集后,您可以使用以下代码进行加载并开始使用它:

from nemo_curator.datasets import DocumentDataset

# define `code_files` to be the path to the JSONL file created above.
dataset_code = DocumentDataset.read_json(code_files, add_filename=True)

# define `text_files` to be the path to the JSONL file created from text sources.
dataset_text = DocumentDataset.read_json(text_files, add_filename=True)

考虑到这些数据来自不同来源,因此可能更容易存储两个独立的数据集实例,一个用于存储来自文本来源(例如 Wikipedia 或 arXiv 论文)的数据,另一个用于存储来自代码来源(例如 GitHub 资源库)的数据。这使您能够定义特定于源的处理流程,例如对文本来源应用 PII 编辑,以及为代码来源删除许可文本。

您现在拥有了定义自定义数据集 curation 管道和准备数据所需的一切。

Unicode 格式和文本统一

通常,最好修复数据集中的所有 Unicode 问题,因为从在线来源抓取的文本可能存在不一致或 Unicode 问题。

为修改文档,NeMo Curator 提供了一个 DocumentModifier 接口以及 Modify 辅助程序,用于定义如何修改每个文档中的给定文本。有关实现您自己的自定义文档修饰符的更多信息,请参阅“使用 NVIDIA NeMo Curator 进行 LLM 参数高效微调来管理自定义数据集”教程中的“文本清理和统一”部分。

在这里,只需对数据集应用 NeMo Curator UnicodeReformatter 修饰符即可。

此外,您还可以修改数据集中的所有引号,并确保没有角度引号变体。您可以通过使用所需逻辑实现 DocumentModifier 接口来做到这一点。

考虑到每条记录都有多个字段,请仅将操作应用于数据集中的相关字段(在本例中为“text”)。使用Sequential类将这些操作链式关联在一起:

Sequential([
    Modify(QuotationUnifier(), text_field="text"),
    Modify(UnicodeReformatter(), text_field="text"),
])

数据集过滤

统一数据集中的所有文本后,请为数据集应用一些过滤器,以确保文档符合特定的标准;例如,它们都应有合理的长度,并且 URL 或其他重复文本不应过多。

NeMo Curator 提供了许多此类滤镜。你还可以通过实现 DocumentFilter 接口来创建自己的自定义滤镜。有关更多信息,请参阅“Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo Curator”教程中的“Designing custom dataset filters”部分。

以下代码示例展示了适用于文本数据的各种过滤器的链式连接。

def filter_text(dataset: DocumentDataset) -> DocumentDataset:
    """
    Filters the given dataset based on various criteria.
    Refer to the full list of all filters here:
    https://github.com/NVIDIA/NeMo-Curator/blob/main/config/heuristic_filter_en.yaml
    https://github.com/NVIDIA/NeMo-Curator/blob/main/tutorials/peft-curation/main.py

    Args:
        dataset (DocumentDataset): The dataset to be filtered.

    Returns:
        DocumentDataset: The filtered dataset.
    """
    filters = Sequential(
        [
            # If a document contains a number of words not
            # within a specified range then discard
            ScoreFilter(
                WordCountFilter(min_words=50, max_words=100000),
                text_field="text",
                score_field="word_count",
                score_type=int,
            ),
            # If the document shrinks by > x% in terms of number of characters after
            # removing the top n-grams then discard. Source: Gopher (Rae et al., 2021)
            ScoreFilter(
                RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2),
                text_field="text",
                score_type=float,
            ),
            ScoreFilter(
                RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18),
                text_field="text",
                score_type=float,
            ),
            ScoreFilter(
                RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16),
                text_field="text",
                score_type=float,
            ),
            ScoreFilter(
                RepeatedParagraphsFilter(max_repeated_paragraphs_ratio=0.7),
                text_field="text",
                score_type=float,
            ),
            # If more than 20% of the document is comprised of URLs then discard
            ScoreFilter(
                UrlsFilter(max_url_to_text_ratio=0.2),
                text_field="text",
                score_type=float,
            ),
        ]
    )
    filtered_dataset = filters(dataset)
    return filtered_dataset

PII 编辑

接下来,定义处理步骤,以编辑记录中的所有个人身份信息(PII)。根据数据源(文本或代码),确保将操作应用到适当的数据集和数据字段。此外,定义检测到 PII 时应采取的操作。

以下代码示例定义了两个函数,分别用于文本源和代码源,用于 PII 编辑。

   def redact_pii(dataset: DocumentDataset) -> DocumentDataset:
    redactor = Modify(
        PiiModifier(
            supported_entities=[
                "PERSON",
                "EMAIL_ADDRESS",
            ],
            anonymize_action="replace",
            device="gpu",
        ),
        text_field="extracted_comment",
    )
    return redactor(dataset)

def redact_code(dataset: DocumentDataset) -> DocumentDataset:
    # functions to extract comment lines from each row in a dataframe
    def func(row):
        return row["text"][row["text"].find("/*") : row["text"].find("*/") + 2]

    def func2(row):
        comment = row["text"][row["text"].find("/*") : row["text"].find("*/") + 2]
        return row["text"].replace(comment, str(row["extracted_comment"]))

    dataset.df["extracted_comment"] = dataset.df.apply(func, axis=1, meta=(None, str))
    redacted_dataset = redact_pii(dataset)
    redacted_dataset.df["text"] = redacted_dataset.df.apply(
        func2, axis=1, meta=(None, str)
    )
    redacted_dataset.df = redacted_dataset.df.drop(["extracted_comment"], axis=1)

    return redacted_dataset

重复数据删除

获取的数据可能包含大量重复记录。对于从 GitHub 抓取的代码文件尤其如此。

定义处理步骤,在该步骤中检测并删除包含相同信息的文档。这通常称为精确去重复,适用于许多数据整理流程。

def dedupe(dataset: DocumentDataset) -> DocumentDataset:
    """
    Remove exact duplicates from the given DocumentDataset.

    Args:
        dataset (DocumentDataset): The dataset containing documents.

    Returns:
        DocumentDataset: The deduplicated dataset.
    """
    deduplicator = ExactDuplicates(id_field="id", text_field="text", hash_method="md5")
    # Find the duplicates
    duplicates = deduplicator(dataset)
    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x._hashes.duplicated(keep="first")]
    )
    # Remove the duplicates using their IDs.
    duplicate_ids = list(docs_to_remove.compute().id)
    dataset_df = dataset.df
    deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
    return DocumentDataset(deduped)

此函数为数据集中的每个文档计算哈希签名,并标记共享相同签名以供删除的那些文档。

整合管护流程

现在,管护流程的每个步骤都已实施,现在可以集成所有内容,并按顺序将每个操作应用到数据集。

使用 Sequential 类将策划操作连接在一起。

# Define data curation steps for text and pdf files
    curation_steps_text = Sequential(
        [
            clean_and_unify,
            ScoreFilter(
                TextLineCountFilter(), text_field="file_type_count", score_type=bool
            ),
            filter_text,
            dedupe,
        ]
    )

    # Define data curation steps for code files
    curation_steps_code = Sequential(
        [
            clean_and_unify,
            ScoreFilter(
                CodeLineCountFilter(), text_field="file_type_count", score_type=bool
            ),
            filter_code,
            dedupe,
            redact_code,
        ]
    )


dataset_text = curation_steps_text(dataset_text).persist()
dataset_code = curation_steps_text(dataset_code).persist()

dataset_text.to_json(out_path, write_to_filename=True)
dataset_code.to_json(out_path, write_to_filename=True)

# Split the dataset by file category and save curated files (optional - to create blended datasets)
separated_data_text = separate_by_metadata(
    dataset_text.df, out_path, "category"
).compute()
separated_data_code = separate_by_metadata(
    dataset_code.df, out_path, "category"
).compute()

在后端,NeMo Curator 使用 Dask 以分布式方式处理数据集。由于 Dask 操作是延迟评估的,因此只有在调用函数(如本例中的 .persist)时才开始计算。

将数据集保存到磁盘,并通过提供 write_to_filename=True 指示框架使用适当的文件名写入每条记录。

最后,如果您计划执行可选的数据集混洗和混合,请按类别分割数据集。

数据集混合和混洗 (可选)

工作流的最后一步是将不同来源的数据集混合在一起,并对其进行混洗。离线混合和混洗通过集成各种数据,防止通过随机数据暴露进行过拟合,增强了基础LLM的泛化。

为此,请按照以下代码示例中所示定义混合函数,并提供每个数据源、混合比率和目标大小,以定义数据集的最终大小。

def blend_and_shuffle(
    args: Any, dataset_paths: list, dataset_weights: list, target_size: int
) -> None:
    """
    Blend and shuffle curated data based on file paths for continued pre-training

    Args:
        args (Any): Command-line arguments.
        dataset_paths (list): List containing directory paths where the different JSONL files are stored.
        dataset_weights (list): List setting weights for each directory path
        target_size (int): Target number of data samples after blending
    """
    root_path = os.path.join(DATA_DIR, "curated")
    output_path = root_path + "/data_blended"
    if os.path.isdir(output_path):
        shutil.rmtree(output_path)
    os.makedirs(output_path)

    # Blend the datasets
    datasets = [DocumentDataset.read_json(path) for path in dataset_paths]
    blended_dataset = nc.blend_datasets(target_size, datasets, dataset_weights)

    shuffle = nc.Shuffle(seed=42)
    blended_dataset = shuffle(blended_dataset)

    # Save the blend
    blended_dataset.to_json(output_path)

# Function call
root_path = os.path.join(DATA_DIR, "curated")
dataset_paths = [
    root_path + "/CPP",
    root_path + "/VerilogVHDL",
    root_path + "/text",
    root_path + "/Python",
]
dataset_weights = [1.0, 4.0, 4.0, 1.0]
blend_and_shuffle(dataset_paths, dataset_weights, target_size=20)

调用后,精选数据集将保存在 output_path 下。

后续步骤

现在,您已学习如何使用 NeMo Curator 处理 DAPT 数据,现在可以开始尝试了。获取本教程的完整源代码,调整代码以根据您的领域定制数据,并开发功能强大的特定领域 LLMs。

您还可以申请抢先体验 NVIDIA NeMo Curator 微服务,该服务为企业随时随地开始数据 curation 提供了最简单的途径,并提供精简的性能和可扩展性,从而缩短上市时间。

如需申请,请访问NeMo Curator微服务早期访问版

 

Tags