Data curation plays a crucial role in the development of effective and fair large language models (LLMs). High-quality, diverse training data directly impacts LLM performance, addressing issues like bias, inconsistencies, and redundancy. By curating high-quality datasets, we can ensure that LLMs are accurate, reliable, and generalizable.
When training a localized multilingual LLM, especially for low-resourced languages, web-crawled data like OSCAR is crucial. However, web-crawled data often contains noise, irrelevant content, duplicates, and formatting issues. Effective data curation is essential to address these problems and ensure high-quality LLM performance.
NVIDIA recently announced the open-source release of NVIDIA NeMo Curator, a data curation library designed for scalable and efficient dataset preparation, enhancing LLM training accuracy through GPU-accelerated data curation using Dask and RAPIDS. NeMo Curator offers a customizable and modular interface that simplifies pipeline expansion and accelerates model convergence by preparing high-quality tokens. The modules within NeMo Curator enable you to mine high-quality text at scale from massive uncurated web corpora as well as custom datasets.
This post introduces a data curation pipeline for an open-source multilingual dataset using Thai Wikipedia as an example. We explain how to construct a scalable and GPU-accelerated data curation pipeline with NVIDIA NeMo Curator.
Overview
This tutorial introduces a data curation pipeline using the Thai Wikipedia dataset, a smaller subset of the Wikipedia dataset, which can be processed on a single GPU. Wikipedia is considered high-quality for LLM pretraining due to its accurate, well-structured content, contributed by a large community. NeMo Curator enhances this by detecting and filtering low-quality documents, ensuring that only the best data is used for training.
For the complete code sample for this tutorial, see the /NVIDIA/NeMo-Curator GitHub repo.
Prerequisites
For using GPU-accelerated deduplication, we recommend using the following hardware setup:
- NVIDIA GPU: This tutorial is developed using the NVIDIA A10 24GB GPU
- CUDA and NVIDIA Drivers: CUDA 12.2 with Driver 535.154.05
- Ubuntu 22.04
- NVIDIA-container-toolkit version 1.14.6
To install the NeMo Curator library, run the following command:
git clone https://github.com/NVIDIA/NeMo-Curator.git
cd NeMo-Curator
pip install --extra-index-url https://pypi.nvidia.com ".[cuda12x]"
You can also run this tutorial in the NeMo framework container. For more information, see the NeMo Curator README file.
Environment and helper function setup
Run the following code to perform the necessary imports:
!pip install jsonlines
from nemo_curator.utils.distributed_utils import get_client,get_num_workers
from nemo_curator.utils.file_utils import get_all_files_paths_under, separate_by_metadata
from nemo_curator.utils.distributed_utils import read_data,write_to_disk
from nemo_curator.datasets import DocumentDataset
import os
import pandas as pd
import time
import cudf
import dask_cudf
import dask
import numpy as np
import jsonlines
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
cur_dir = os.getcwd()
data_dir = f"{cur_dir}/workspace/"
Run the following code to define the necessary helper functions:
def pre_imports():
import cudf
def check_jsonl_file(file_dir):
for file in os.listdir(file_dir):
if 'jsonl' not in file:
continue
with open(os.path.join(file_dir,file), 'r', encoding='utf-8') as f:
first_line = f.readline()
print(first_line)
break
def extract_lines_with_id(file_path,target_list):
with jsonlines.open(file_path) as reader:
for obj in reader:
if obj.get('id') in target_list:
yield obj
Data curation pipeline for multilingual dataset
The following steps are involved in the data curation pipeline for the Thai Wikipedia dataset:
- Download Thai Wikipedia from archives and extract the dataset to a JSONL file.
- Perform preliminary data cleaning:
- Filter out non-Thai major content in the dataset using a language separator.
- Fix Unicode text in the documents.
- Perform advanced data cleaning:
- Remove identical documents with GPU-accelerated exact deduplication.
- Remove near-identical documents with GPU-accelerated fuzzy deduplication.
- Filter out low-quality documents by applying predefined heuristic filters.
Data download
First, download the Thai Wikipedia data from the archive. The downloading pipeline in NeMo Curator consists of the following classes:
DocumentDownloader
: Abstract class for downloading remote data to disk.DocumentIterator
: Abstract class for reading dataset raw records from the disk.DocumentExtractor
: Abstract class for extracting text records, as well as any relevant metadata from the records on the disk.
These classes are highly flexible so you can modify the implementation to download any desirable dataset. NeMo Curator also provides the implementation for downloading popular open-source datasets such as CommonCrawl, Wikipedia, and arXiv. For this post, use the predefined downloader to download the Wikipedia dataset.
Before downloading, run the following code to start a Dask client. This starts a Dask LocalCluster
on your CPU. It can be reused for all modules except for deduplication, which requires a CPU cluster.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=10, processes=True, memory_limit='16GB')
client = Client(cluster)
Run the following code to download the Thai Wikipedia dataset. This downloads the Thai Wikipedia β20240201β snapshot to your local disk. To download other snapshots, you can replace the dump_date
parameter. To download other Wikipedia datasets for other languages, you can replace the language
parameter. The downloading process takes approximately 1-2 hours.
from nemo_curator.download import download_wikipedia
download_base_path = os.path.join(data_dir,"wiki_downloads")
download_output_path = os.path.join(download_base_path,"data")
dump_date = "20240201"
language = 'th'
res = download_wikipedia(download_output_path,
language=language,
dump_date=dump_date,
url_limit=url_limit).df.compute()
Basic cleaning
Large unlabeled text corpora often contain multiple languages. Data curation typically involves language-specific steps, such as using language-tuned heuristics for quality filtering.
Datasets may also have improperly decoded Unicode characters. Tokenizing such text can propagate these issues, potentially leading to inaccurate or nonsensical tokens that affect downstream tasks.
Language separation (optional)
The Thai Wikipedia dataset downloaded might contain documents in other languages. If you want to retain only the Thai documents, you can perform language separation.
To classify and separate documents into their own languages, NeMo Curator provides a predefined heuristic filter FastTextLangId
in which a language score and language label are computed for each document. The filtering process is applied to the dataset through the ScoreFilter
helper.
Run the following code for language separation:
from nemo_curator import ScoreFilter
from nemo_curator.filters import FastTextLangId
multilingual_data_path = f"{download_output_directory}/thwiki-20240201-pages-articles-multistream.xml.bz2.jsonl"
language_base_output_path = os.path.join(data_dir,"language_sep")
language_data_output_path = os.path.join(language_base_output_path,"data")
language_separated_output_path = os.path.join(language_data_output_path,"language")
model_path = language_base_output_path
# Define key in output .jsonl files to store the language information
language_field = "language"
#Download language classification model
!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}
multilingual_dataset = DocumentDataset.read_json(multilingual_data_path,add_filename=True)
lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))
language_id_pipeline = ScoreFilter(lang_filter, score_field=language_field, score_type='object')
filtered_dataset = language_id_pipeline(multilingual_dataset)
filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(lambda score: score[1],meta = (language_field, 'object'))
language_stats = separate_by_metadata(filtered_dataset.df, language_separated_output_path, metadata_field=language_field).compute()
Upon completion, run the following code to print a document that is identified as English. From the output, you can see that the document contains some Thai but the majority of the document is actually written in English.
check_jsonl_file(os.path.join(language_separated_output_path,'EN'))
Unicode reformatter
Another preliminary data-cleaning process used is unification. Data scraped from the Internet often contains various Unicode encodings and special characters that can lead to inconsistencies and errors in further processing. Running unification on the scraped data helps standardize the text into a consistent format, making it cleaner for LLM training.
In NeMo Curator, you can use the DocumentModifier
interface to define how documents in the dataset should be modified. The helper function Modify
takes a DocumentModifier
object as well as a DocumentDataset
object and modifies the dataset based on the modifier.
In the following code example, the Thai subset from the language separation output is used and a predefined UnicodeReformatter
modifier is applied.
from nemo_curator import Modify
from nemo_curator.modifiers import UnicodeReformatter
lang_sep_cleaned_data_output_path = os.path.join(language_data_output_path,"cleaned")
target_language = "TH"
lang_data_path = os.path.join(language_separated_output_path, target_language)
lang_data = DocumentDataset.read_json(lang_data_path,add_filename=True)
cleaner = Modify(UnicodeReformatter())
cleaned_data = cleaner(lang_data)
cleaned_data.to_json(lang_sep_cleaned_data_output_path, write_to_filename=True)
Advanced cleaning
Data quality is undoubtedly one of the most important factors regarding LLM training performance. Advanced data curation techniques such as deduplication and heuristic filtering are often applied to yield better data quality.Β
This section walks you through how to apply these advanced techniques using NeMo Curator.
Preparation
Before proceeding, we recommend that you preprocess the dataset by adding a customized ID for each document. The ID is used as a tracker to identify duplicate documents or low-quality documents.
When processing multiple datasets, adding customized IDs also becomes important, as the original ID of each dataset might be duplicated. In this case, customized IDs can be used to distinguish between different datasets. NeMo Curator provides an AddId
class for you to insert customized IDs in the format of <prefix>_<id>
.
from nemo_curator import AddId
add_id_input_data_dir = lang_sep_cleaned_data_output_path
added_id_output_path = os.path.join(data_dir,"add_id/cleaned")
#Format of output ID will be <prefix>_<id>, Define prefix here
add_ID_id_prefix="TH_wiki"
dataset = DocumentDataset.read_json(add_id_input_data_dir,add_filename=True)
add_id = AddId(id_field='id',id_prefix=add_ID_id_prefix,start_index=0)
id_dataset = add_id(dataset)
id_dataset.to_json(added_id_output_path, write_to_filename=True)
Upon completion, run the following code to check the output. The ID
field is now following the format of TH_wiki-<id>
.
check_jsonl_file(added_id_output_path)
Document-level exact deduplication
Web-scraped datasets often contain many verbatim duplicate text sequences across documents. Training on datasets with significant duplication can lead to LLMs that generate memorized text from the training data more frequently, learn less efficiently, and have inflated perplexity scores on held-out data that contains duplicates from training.
In NeMo Curator, the ExactDuplicates
class removes the identical documents.
The ExactDuplicates
class uses available CUDA devices and GPU-accelerated implementations from the RAPIDS cuDF library to efficiently identify duplicate documents. By using the GPU’s parallel processing capabilities to independently hash each document, the compute-intensive deduplication stage is significantly accelerated compared to CPU-based approaches.
Run the following code to stop the running CPU Dask client and start a GPU Dask client:
client.cluster.close()
client.shutdown()
client = get_client(cluster_type = 'gpu', set_torch_to_use_rmm=False)
print(f"Number of dask worker:{get_num_workers(client)}")
client.run(pre_imports)
Run the following code for exact deduplication:
from nemo_curator.modules import ExactDuplicates
exact_dedup_input_dataset_dir = added_id_output_path
exact_dedup_base_output_path = os.path.join(data_dir,"exact_dedup")
exact_dedup_log_dir = os.path.join(exact_dedup_base_output_path,'log')
exact_dedup_cache_dir = os.path.join(exact_dedup_base_output_path,'cache')
exact_dedup_output_dir = os.path.join(exact_dedup_base_output_path,'data')
id_field="id"
!mkdir -p {exact_dedup_log_dir}
!mkdir -p {exact_dedup_cache_dir}
!mkdir -p {exact_dedup_output_dir}
input_dataset = DocumentDataset.read_json(exact_dedup_input_dataset_dir, backend='cudf')
exact_dup = ExactDuplicates(
logger=exact_dedup_log_dir,
id_field="id",
text_field="text",
hash_method="md5",
cache_dir=exact_dedup_cache_dir
)
duplicates = exact_dup(dataset=input_dataset)
exact_docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)
result = input_dataset.df[
~input_dataset.df[id_field].isin(exact_docs_to_remove[id_field].compute())
]
DocumentDataset(result).to_json(exact_dedup_output_dir, write_to_filename=True)
You can also run the following code to look into the duplicate documents identified:
exact_dedup_res = pd.read_parquet(os.path.join(exact_dedup_cache_dir,"_exact_duplicates.parquet"))
print(f"Number of exact duplicated document:{len(exact_dedup_res)}")
exact_dedup_res.groupby('_hashes')['id'].agg(lambda x: ' '.join(x)).reset_index().head()
The previous code example groups duplicate documents by the hash key. You can print out the document under the same group to see if they are truly identical.
target_list =[<duplicat_document_ID1>,...,<duplicat_document_IDX>]
for line in extract_lines_with_id(os.path.join(exact_dedup_input_dataset_dir,'thwiki-20240201-pages-articles-multistream.xml.bz2.jsonl'),target_list):
print(line)
Document-level fuzzy deduplication
Exact deduplication only removes identical duplicate documents, but web-scraped datasets often contain many near-duplicate documents with minor differences that exact matching cannot identify. Thus, fuzzy deduplication is necessary to find and remove these near-duplicates to further reduce redundancy in the dataset.
In NeMo Curator, the FuzzyDuplicates
class is used to remove the near-identical documents. Similar to the ExactDuplicates
class, the FuzzyDuplicates
class uses the GPU-accelerated implementations from the RAPIDS cuDF library to accelerate computation.
The FuzzyDuplicates
class is a GPU implementation of the MinhashLSH algorithm, which is a technique for quickly estimating the similarity between sets, such as the similarity between documents represented as sets of shingles (n-grams). It’s able to find Jaccard similarity pairs in the corpus in a much more computationally efficient way.
The implementation of the MinhashLSH algorithm consists of several intermediate steps. This tutorial provides an example of using the high-level FuzzyDuplicates
class. For more information about each intermediate step, see the /NVIDIA/NeMo-Curator GitHub repo.
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
fuzzy_dedup_data_path = exact_dedup_output_dir
fuzzy_dedup_base_output_path = os.path.join(data_dir,"fuzzy_wrapper")
fuzzy_dedup_log_dir = os.path.join(fuzzy_dedup_base_output_path,'log')
fuzzy_dedup_cache_dir = os.path.join(fuzzy_dedup_base_output_path,'cache')
fuzzy_dedup_output_dir = os.path.join(fuzzy_dedup_base_output_path,'data')
id_field = 'id'
text_field = 'text'
!mkdir -p {fuzzy_dedup_log_dir}
!mkdir -p {fuzzy_dedup_cache_dir}
!mkdir -p {fuzzy_dedup_output_dir}
with dask.config.set({"dataframe.backend": 'cudf'}):
input_dataset = DocumentDataset.read_json(fuzzy_dedup_data_path, backend='cudf')
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=fuzzy_dedup_cache_dir,
id_field=id_field,
text_field=text_field,
seed=10,
char_ngrams=5,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=True,
num_anchors=2,
jaccard_threshold=0.8,
)
fuzzy_dup = FuzzyDuplicates(logger=fuzzy_dedup_log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)
duplicates.to_parquet(fuzzy_dedup_cache_dir, write_to_filename=False)
fuzzy_docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)
result = input_dataset.df[
~input_dataset.df[id_field].isin(fuzzy_docs_to_remove[id_field].compute())
]
DocumentDataset(result).to_json(fuzzy_dedup_output_dir, write_to_filename=True)
You can also run the following code to look into the near-duplicate documents identified:
fuzzy_dedup_res = pd.read_parquet(f"{fuzzy_dedup_cache_dir}/part.0.parquet")
fuzzy_dedup_res['id'] = fuzzy_dedup_res['id'].astype(str)
fuzzy_dedup_res.groupby('group')['id'].agg(lambda x: ', '.join(x)).reset_index()
The previous code example groups duplicate documents by the group
field. You can print out the document under the same group to see if they are nearly identical.
target_list = [<duplicat_document_ID1>,...,<duplicat_document_IDX>]
for line in extract_lines_with_id(os.path.join(fuzzy_dedup_data_path,'thwiki-20240201-pages-articles-multistream.xml.bz2.jsonl'),target_list):
print(line)
Heuristic filtering
Heuristic filtering helps remove low-quality content from the dataset, using simple, efficient-to-compute rules. By applying well-designed heuristic filters, you can improve the signal-to-noise ratio of the pretraining data. At the time of publication, NeMo Curator provides 24 heuristics for natural languages, as well as eight heuristics for coding languages.
In this tutorial, you use a YAML config file to define what filters are used for heuristic filtering. The config file can be found in the config folder. The filter_pipeline
helper retrieves the filter setting from the config file and constructs a sequential filter pipeline to apply each filter to the dataset.
#Close the GPU Dask cluster and create a CPU Dask cluster
client.cluster.close()
client.shutdown()
cluster = LocalCluster(n_workers=10, processes=True, memory_limit='16GB')
client = Client(cluster)
from nemo_curator.utils.config_utils import build_filter_pipeline
HF_input_data_dir = fuzzy_dedup_output_dir
HF_base_output_path = os.path.join(data_dir,'heuristic_filtering')
kept_document_dir = os.path.join(HF_base_output_path,'data','hq.parquet')
filter_config_file = './config/heuristic_filter_non-en.yaml'
!mkdir -p {kept_document_dir}
#Load filters from config
filter_pipeline = build_filter_pipeline(filter_config_file)
dataset = DocumentDataset.read_json(HF_input_data_dir, backend='pandas', add_filename=True)
result_data = filter_pipeline(dataset)
result_data.to_parquet(kept_document_dir, write_to_filename=True)
For more information about inspecting the intermediate result of each filter, for example, inspecting documents filtered out for a particular filter, see the example code in the sample data curation pipeline notebook.
Next steps
This tutorial demonstrated how to construct a sample data curation pipeline for Thai Wikipedia data. For easy access, we uploaded the sample data curation pipeline notebook.
In addition to the resources used in this post, NeMo Curator also provides an interface for other advanced techniques, such as task-based deduplication, task identification and decontamination, domain classification, and PII redaction. For more information, see the collection of data curation examples on GitHub.
Star the GitHub repo to stay up-to-date with the latest developments and receive notifications about new features, bug fixes, and future updates.
You can also request access to the NVIDIA NeMo Curator microservice, which provides the easiest path for enterprises to get started with data curation from anywhere. It offers streamlined performance and scalability to shorten the time to market.