Generative AI

NeMo Curator を使った日本語データのキュレーション

Reading Time: 7 minutes

本記事では、NeMo Curator を使用して、日本語データセットを作成する方法を説明します。

データ キュレーションとは

データ キュレーションとは、データのダウンロードやテキストの抽出、クリーニング、重複排除、フィルタリングなどを通じて、機械学習モデルの開発に必要なデータセットを構築するプロセスです。

図 1. NeMo Curator チームが、Common Crawl データセットで 357M パラメーターの GPT スタイルのモデルを学習するアブレーション(一部の要素だけを追加、削除して比較)実験を実行し、それぞれのキュレーションステップがモデルのパフォーマンスに与える影響を評価。

データ キュレーションは、大規模言語モデル (LLM) の事前学習、カスタマイズにおける最初の、そしておそらく最も重要なステップでもあります。しかし、この重要性にもかかわらず、LLM を学習するための大規模なデータセットを作成するために開発されたソフトウェアやツールのほとんどは、公開されておらず、拡張性もありません。そのため、LLM 開発者は、大規模な言語データセットをキュレートするための独自ツールを構築する必要があります。

このニーズの高まりに応えるため、NVIDIA は生成 AI モデルのための大規模で高品質なデータセットを準備するデータ キュレーション フレームワークである NeMo Curator をオープン ソース でリリースしました。

NeMo Curator とは

NeMo Curator は、基盤モデルの事前学習、ドメイン適応型事前学習 (DAPT)、教師ありファインチューニング (SFT)、Parameter-Efficient Fine-Tuning (PEFT) などの大規模言語モデル (LLM) のユース ケース向けに、高速でスケーラブルなデータセットの準備とキュレーションを行うために設計された Python ライブラリです。

DaskRAPIDS で GPU を活用することでデータ キュレーションを大幅に高速化し、大幅な時間の短縮を実現します。このライブラリはカスタマイズ可能なモジュール式インターフェイスを提供し、パイプラインの拡張を簡素化し、高品質のトークンを準備することでモデルの収束を加速します。

NVIDIA NeMo の一部である NeMo Curator は、Common Crawl、Wikipedia、arXiv などのパブリック ソースからすぐにデータをダウンロードしてキュレートするためのワークフローを提供します。また、開発者の独自の要件にカスタマイズされたデータ キュレーション パイプラインにより、簡単にカスタム データセットを作成できる柔軟性も提供します。

現在、NeMo Curator はすぐに使用できる機能として以下を提供しています。

  • データのダウンロードとテキストの抽出
  • 言語の識別と分離
  • テキストの再フォーマット化とクリーニング
  • 品質フィルタリング
  • ドキュメントレベルの重複排除
  • 多言語ダウンストリーム タスクの除染
  • 分散データ分類 (ドメイン、品質分類等)
  • 個人識別情報 (PII) の削除

「図 2. NeMo Curator が提供する機能」には、それぞれのステップとそこで使われている主なテクノロジが記載されています。緑で塗りつぶされたステップでは、GPU を使って処理を大幅に高速化することが可能です。

図 2. NeMo Curator が提供する機能

NeMo Curator は GitHub 上にあるリポジトリ から、または NeMo Framework のコンテナーからすぐに始めることができます。

NeMo Curator チュートリアル

本記事では、日本語 Wikipedia からデータをダウンロードして、いくつかのキュレーション ステップについて、その使用方法を紹介します。

本チュートリアルでの手順は以下の通りです。

  • NeMo Framework コンテナー を起動
  • データのダウンロードとテキストの抽出
  • 言語の検出と分離、テキストの再フォーマット化
  • ID の付与
  • ドキュメント レベルの重複排除
  • 合成データの生成

チュートアルを開始する前に、NeMo Curator の核となっているデータセット クラス DocumentDataset に関する Working with DocumentDataset と CPU、GPU 上で処理を大規模にスケーリングする Dask に関する CPU and GPU Modules with Dask を一読することをお勧めします。

また、今回のチュートリアルの検証環境は以下の条件で行っております。こちらの構成は一例であり、NeMo Curator は Volta 以降の任意の NVIDIA GPU、任意の計算ノードでジョブを実行することが可能です。Kubernetes クラスター上で NeMo Curator を実行する際のサンプルはこちらにあります。

  • ハードウェア
    • DGX Cloud A100
    • GPU: 1 x NVIDIA A100 80 GB GPUs (driver version: 535.183.08)
    • CPU: AMD EPYC 7V12 64-Core Processor
    • システム メモリ: 128 GB
  • ソフトウェア
    • OS: Ubuntu 22.04.4 LTS
    • Container: nvcr.io/nvidia/nemo:24.07

事前準備

以下のコマンドで作業用のディレクトリを作成し、移動します。

mkdir curator-example
cd curator-example

Docker コンテナーの起動

以下のコマンドでコンテナーを起動します。

sudo docker run --rm -it --gpus all --ulimit memlock=-1 --network=host -v ${PWD}:/workspace -w /workspace  nvcr.io/nvidia/nemo:24.07 bash

データのダウンロードとテキストの抽出

前述したように、NeMo Curator では、Common Crawl、Wikipedia、arXiv について、それぞれ専用の関数が用意されており、引数を与えて実行することですぐにダウンロードおよびテキストの抽出を開始できます。

(オプション): 独自のデータ ソースをダウンロード、テキスト抽出することも可能です。この処理には、Common Crawl 用などの専用関数でも使われている NeMo Curator の DocumentDownloader を継承したダウンロード用のクラス、DocumentIterator を継承した反復処理用のクラス、DocumentExtractor を継承したテキスト抽出用のクラスを定義する必要があります。これらの例は Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo CuratorCurating Custom Datasets for LLM Training with NVIDIA NeMo Curator に記載がありますのでぜひ参考にしてください。

このチュートリアルでは、日本語 Wikipedia のデータをダウンロードして、そこからテキストを抽出します。

日本語 Wikipedia を対象にする際は、download_wikipedia() というあらかじめ用意された関数を使用し、language='ja' によって日本語を対象に指定、dump_date にいつの時点のスナップショットをダウンロードするか指定します。ここでは時間およびリソースを節約するため、ダウンロードするファイル数を制限する url_limit を設定します (全てのファイルを対象にしたい場合はコメントアウトしてください)。

LocalCluster の引数にあるワーカー数やメモリ制限は実行する環境に合わせて変更してください。今回の検証環境と以下の設定ではこの処理に 2 時間ほど要しました(このステップは本チュートリアルで最も時間がかかるパートになります)。

以下、コンテナー起動後のスクリプトは Jupyter Notebook 上でのセル実行を想定しています。また、それぞれのステップは入力に使用するデータセットがすでに存在する状況であればパスを変更することで個々に実行することが可能です。

import os
from nemo_curator.download import download_wikipedia
from dask.distributed import Client, LocalCluster


cur_dir = os.getcwd()
print(cur_dir)
data_dir = f"{cur_dir}/"

cluster = LocalCluster(n_workers=48, processes=True, memory_limit='24GB')
client = Client(cluster)

# Output
download_base_directory= os.path.join(data_dir, "wiki_downloads")
download_output_directory = os.path.join(download_base_directory, "data")

# Relevant parameters
dump_date = "20240801"
language = 'ja'
url_limit = 1  # 1 file (jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2)

res = download_wikipedia(
    download_output_directory,
    language=language, 
    dump_date=dump_date,
    url_limit=url_limit
).df.compute()

#client.cluster.close()
#client.shutdown()

処理が完了すると wiki_downloads/data/ というディレクトリに jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。このファイルのドキュメント数は 59,652 です。

言語の検出と分離、テキストの再フォーマット化

このセクションでは、先ほど抽出したドキュメントを fasttext の言語識別モデルを使用して、言語ごとに分類します。これによって、ドキュメントが言語ごとに作成されるサブフォルダーへ振り分けられます。

import os
import time
from dask.distributed import Client, LocalCluster

from nemo_curator import ScoreFilter,Modify
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import FastTextLangId
from nemo_curator.modifiers import UnicodeReformatter
from nemo_curator.utils.file_utils import separate_by_metadata


cur_dir = os.getcwd()
print(cur_dir)
data_dir = f"{cur_dir}/"

# 前の処理でclusterを落としている場合は以下をアンコメントして再度起動してください
#cluster = LocalCluster(n_workers=48, processes=True, memory_limit='24GB')
#client = Client(cluster)

# Input path
multilingual_data_path = "./wiki_downloads/data/jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl"

# Output path
language_base_output_path = os.path.join(data_dir,"language_sep")
language_data_output_path = os.path.join(language_base_output_path,"data")
language_separated_output_path = os.path.join(language_data_output_path,"language")

# Fasttext model path
model_path = language_base_output_path

# Define key in output .jsonl files to store the language information
language_field = "language"

!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}
t0 = time.time()

# Load dataset 
multilingual_dataset = DocumentDataset.read_json(multilingual_data_path, add_filename=True)

# Define Language separation pipeline
lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))
language_id_pipeline = ScoreFilter(lang_filter, score_field=language_field, score_type='object')
filtered_dataset = language_id_pipeline(multilingual_dataset)

# The language separation pipeline will produce a result looks like ['JA',0.96873], we only want to keep the 'JA' label and drop the detailed classifier score
filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(lambda score: score[1],meta = (language_field, 'object'))

# Split the dataset to corresponding language sub-folders
language_stats = separate_by_metadata(filtered_dataset.df, language_separated_output_path, metadata_field=language_field).compute()

print(f"Time taken for splitting language:{time.time()-t0}")

この処理が完了 (検証環境では 1-2 分) すると language_sep/data/language/ の下に分類された言語ごとにドキュメントが保存されています。

次に、先ほど言語ごとに振り分けられたドキュメントの日本語を対象に、UnicodeReformatter (内部で ftfy を実行) を使用して、ドキュメント内のユニコードを再フォーマットします。

t0 = time.time()

# Define desired language
target_language = "JA"

# Output path
lang_sep_cleaned_data_output_path = os.path.join(language_data_output_path, "cleaned")

# Read the language specific data and fix the unicode in it
lang_data_path = os.path.join(language_separated_output_path, target_language)
lang_data = DocumentDataset.read_json(lang_data_path,add_filename=True)

cleaner = Modify(UnicodeReformatter())
cleaned_data = cleaner(lang_data)

# Write the cleaned_data
cleaned_data.to_json(lang_sep_cleaned_data_output_path, write_to_filename=True)

print(f"Time taken for fixing unicode:{time.time()-t0}")

この処理が完了 (検証環境では 7 分ほど) すると language_sep/data/cleaned/jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。このファイルのドキュメント数は 59,603 です。

ID の付与

日本語 Wikipedia データにはすでに id がありますが、<prefix>_<id> フォーマットのように統一した id を付与すると複数のデータセットを扱う際に、どのデータセットのどのドキュメントが削除されたのかがわかりやすくなります。これは重複排除やフィルタリングなどを行う際に役に立ちます。

この処理を実行する関数が AddID() です。この関数の引数は以下の通りです。

  • id_field: フィールドが入力 json ファイルに追加されます。キーが jsonl にすでに存在する場合は、その値が置き換えられます。
  • id_prefix: ID で使用される接頭辞。デフォルトは「doc_id」です。
  • start_index: ID の開始インデックス。デフォルトは「None」です。「None」に設定すると、高速な計算のために順序のない ID スキームが使用されます。ここでは、参照を容易にするために「0」に設定しています。
import os
import time

from nemo_curator import AddId
from nemo_curator.datasets import DocumentDataset


cur_dir = os.getcwd()
data_dir = f"{cur_dir}/"

# Input
add_id_input_data_dir = "./language_sep/data/cleaned"

# Output
added_id_output_path = os.path.join(data_dir, "add_id/cleaned")

# Format of output ID will be <prefix>_<id>, Define prefix here
add_ID_id_prefix="JA_wiki"

t0 = time.time()
# Read input files
dataset = DocumentDataset.read_json(add_id_input_data_dir,add_filename=True)

# Run AddID() on the input dataset
add_id = AddId(id_field='id',id_prefix=add_ID_id_prefix,start_index=0)
id_dataset = add_id(dataset)

# Output files
id_dataset.to_json(added_id_output_path, write_to_filename=True)

print(f"Time taken for add ID:{time.time()-t0}")

この処理が完了 (検証環境では数十秒ほど) すると、add_id/cleaned/ jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。結果を確認すると、id の value が “JA_wiki-0000000000” といった形式に変更されています。

もし、ここまでで Dask クラスター が起動しているようであれば、次のセクションでは、GPUの Dask クラスター を起動するため、以下のコマンドで閉じてください (何かエラーが出た際は再度、実行してください)。

client.cluster.close()
client.shutdown()

重複排除

重複排除では、Exact Deduplication, Fuzzy Deduplication, 埋め込みを使用した Semantic Deduplication がサポートされています。ここでは、Exact Deduplication と Fuzzy Deduplication を扱います。Semantic Deduplication については、こちらを参照してください。

Note: これらの処理を実行するには、前セクションの AddID() などを使用して、コーパス内の id が一意の状態になっている必要があります。

Exact Deduplication

Exact Deduplication では、ドキュメントのテキストは「md5」などの特定のハッシュ アルゴリズムを使用して一意の文字列にハッシュ化されます。厳密に同一ハッシュ値を持つドキュメントは、同一のテキストを持っています。

ここで使用する関数は ExactDuplicates() です。この関数の引数には、以下のものがあります。

  • id_field: ドキュメント ID を識別するための入力ファイル内のキー
  • text_field: ドキュメントのテキストを含む入力ファイル内のキー
  • hash_method: 使用されるハッシュ アルゴリズム。デフォルトは md5
  • cache_dir: 指定された場合、重複したドキュメント ID は cache_dir に出力されます。指定されていない場合は、ID は保存されません

また、GPU の dask クラスターを使用して、重複排除の計算を高速化します。

%env DASK_DATAFRAME__QUERY_PLANNING=False

import os
import time
import pandas as pd

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, get_num_workers

def pre_imports():
    import cudf 


cur_dir = os.getcwd()
data_dir = f"{cur_dir}/"


client = get_client(cluster_type='gpu', set_torch_to_use_rmm=False)
print(f"Number of dask worker:{get_num_workers(client)}")
client.run(pre_imports)

# Input
exact_dedup_input_dataset_dir = "./add_id/cleaned"

# Output
exact_dedup_base_output_path = os.path.join(data_dir, "exact_dedup")
exact_dedup_log_dir = os.path.join(exact_dedup_base_output_path, 'log')
exact_dedup_output_dir = os.path.join(exact_dedup_base_output_path, 'data')

# Parameters for ExactDuplicates()
exact_dedup_dataset_id_field = "id"
exact_dedup_dataset_text_field = "text"

!mkdir -p {exact_dedup_log_dir}
!mkdir -p {exact_dedup_output_dir}

t0 = time.time()
# Read input dataset
input_dataset = DocumentDataset.read_json(exact_dedup_input_dataset_dir, backend='cudf')

# Run exact deduplication to the input
exact_dup = ExactDuplicates(
    logger=exact_dedup_log_dir,
    id_field=exact_dedup_dataset_id_field,
    text_field=exact_dedup_dataset_text_field,
    hash_method="md5",
    cache_dir=exact_dedup_output_dir  # Duplicated document ID list is output to the cache_dir
)
duplicates = exact_dup(dataset=input_dataset)

print(f"Number of exact duplicated file:{len(duplicates)}")
print(f"Time taken for exact duplicate:{time.time()-t0}")

exact_dedup_res = pd.read_parquet(os.path.join(exact_dedup_output_dir, "_exact_duplicates.parquet"))
print(f"Number of exact duplicated document:{len(exact_dedup_res)}")
exact_dedup_res.head()

exact_dedup_res.groupby('_hashes')['id'].agg(lambda x: ' '.join(x)).reset_index().head()

この処理が完了すると、exact_dedup/data/ に重複のあったドキュメントの id が保存されます。重複したドキュメントは 2 件ありました。検証環境では CPU を使用した場合に 11.2 秒ほど処理に時間がかかりましたが、GPU を使用した場合、3.8 秒と約 3 倍処理が高速化しました。

Fuzzy Deduplication

Fuzzy Deduplication は、Exact Deduplication とは異なり、厳密な重複を見つけ出すのではなく、GPU 実装された MinhashLSH アルゴリズムによって、テキストの統計に基づき類似テキストを抽出します (意味的な類似性とは異なります)。この重複を抽出するには、複数の中間ステップがありますが、詳細はこちらを参照してください。

Fuzzy Deduplication は複数の手順を個々にステップバイステップで実行することも可能ですが、FuzzyDuplicates() を使用すると簡単に実行できます。FuzzyDuplicatesConfig() 内の引数として、n-gram の長さ、バケットやバケット内の hash の数、重複と判断する Jaccard 類似度の閾値などを設定し、FuzzyDuplicates() に渡します。

Note: Fuzzy Deduplication は、AddID() を使用した id 形式、もしくは整数 id の形式でのみ機能します。

%env DASK_DATAFRAME__QUERY_PLANNING=False

import os
import time
import pandas as pd

from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, get_num_workers

import dask

def pre_imports():
    import cudf 

    
cur_dir = os.getcwd()
data_dir = f"{cur_dir}/"

client = get_client(cluster_type='gpu', set_torch_to_use_rmm=False)
print(f"Number of dask worker:{get_num_workers(client)}")
client.run(pre_imports)

# Input
fuzzy_dedup_data_path = "./add_id/cleaned"

# Output
fuzzy_dedup_base_output_path = os.path.join(data_dir, "fuzzy_wrapper")
fuzzy_dedup_log_dir = os.path.join(fuzzy_dedup_base_output_path, 'log')
fuzzy_dedup_cache_dir = os.path.join(fuzzy_dedup_base_output_path, 'cache')
fuzzy_dedup_output_dir = os.path.join(fuzzy_dedup_base_output_path, 'data')

# Relevant parameters
id_field = 'id'
text_field = 'text'
filetype = "parquet"

!mkdir -p {fuzzy_dedup_base_output_path}
!mkdir -p {fuzzy_dedup_log_dir}
!mkdir -p {fuzzy_dedup_cache_dir}
!mkdir -p {fuzzy_dedup_output_dir}

#!rm -r {fuzzy_dedup_cache_dir}

with dask.config.set({"dataframe.backend": 'cudf'}):
        
    t0 = time.time()
        
    input_dataset = DocumentDataset.read_json(fuzzy_dedup_data_path, backend='cudf')
    fuzzy_dedup_config = FuzzyDuplicatesConfig(
        cache_dir=fuzzy_dedup_cache_dir,
        id_field=id_field,
        text_field=text_field,
        seed=42,  # Use the seed set in Minhash section for consistency
        char_ngrams=5,
        num_buckets=20,
        hashes_per_bucket=13,
        use_64_bit_hash=False,
        buckets_per_shuffle=5,
        false_positive_check=True,
        num_anchors=2,
        jaccard_threshold=0.8,
    )
    fuzzy_dup = FuzzyDuplicates(logger=fuzzy_dedup_log_dir, config=fuzzy_dedup_config)
    duplicates = fuzzy_dup(dataset=input_dataset)
        
    duplicates.to_parquet(fuzzy_dedup_output_dir, write_to_filename=False)
       
    print(f"Time taken for Connected Component: {time.time()-t0} s")    
        
fuzzy_dedup_res = pd.read_parquet(fuzzy_dedup_output_dir)
fuzzy_dedup_res.head()

この処理が完了 (検証環境では数十秒ほど) すると、fuzzy_wrapper/data/ にパラメーターで指定した重複条件に合致したドキュメントの id が保存されます。

Exact Deduplication と Fuzzy Deduplication によって、重複ドキュメントの id が出力されました。ここで重複しているドキュメントをデータセットから削除します。

import os
import pandas as pd

from nemo_curator.datasets import DocumentDataset


cur_dir = os.getcwd()
data_dir = f"{cur_dir}/"

# Input
dataset_dir = "./add_id/cleaned"
exact_dedup_output_dir="./exact_dedup/data"
fuzzy_dedup_output_dir="./fuzzy_wrapper/data"

# Output
dudped_output_dir = os.path.join(data_dir, "remove_duplicate/result.parquet")

# Relevant parameters
input_id_field = 'id'
id_prefix = "JA_wiki"

!mkdir -p {dudped_output_dir}

#Load .jsonl dataset (GPUメモリが足りない場合はbackend='pandas'へ変更してください)
input_dataset = DocumentDataset.read_json(dataset_dir, backend='cudf')

# Load exact deduplicate result and extract list of duplicated document ID (GPUメモリが足りない場合はbackend='pandas'へ変更してください)
exact_duplicates = DocumentDataset.read_parquet(os.path.join(exact_dedup_output_dir, "_exact_duplicates.parquet"), backend='cudf')
exact_docs_to_remove = exact_duplicates.df.map_partitions(
    lambda x: x[x._hashes.duplicated(keep="first")]
)

# Remove the duplicated document from input dataset
result = input_dataset.df[
    ~input_dataset.df[input_id_field].isin(exact_docs_to_remove[input_id_field].compute())
]

# Loads result from fuzzy dedup wrapper
fuzzy_duplicates = pd.read_parquet(fuzzy_dedup_output_dir)

# Generate list of near duplicate document ID
fuzzy_docs_to_remove = fuzzy_duplicates.drop_duplicates(subset=['group'], keep='first')

# Remove near duplicates
result = result[~result[input_id_field].isin(fuzzy_docs_to_remove[input_id_field])]

# Save final result to local (backend='pandas'の場合は、write_to_filename=Trueをコメントアウトしてください)
result.to_parquet(dudped_output_dir, write_to_filename=True)

res = pd.read_parquet(dudped_output_dir)
print(f"Length of duplicate removed dataset:{len(res)}")

この処理が完了 (検証環境では数秒) すると、remove_duplicate/result.parquet/ に重複が排除されたデータセットが保存されます。処理前のドキュメント数は 59,603 でしたが、Exact Deduplication と Fuzzy Deduplication で重複を排除した結果、59,508 のドキュメントが保存されました。

合成データの生成

LLM を活用して、事前学習やカスタマイズに使用するデータセットを生成することもできます。合成データは、LLM を低リソース言語/ドメインに適応させたり、他のモデルから知識を抽出したりする際などに有用です。ここでは、build.nvidia.com の API エンドポイント にアクセスして合成データを生成するパイプラインを実行してみましょう (このセクションを始めるには build.nvidia.com のアカウントを作成し、APIキーを取得する必要があります)。

このセクションはこれまでのチュートリアルとは依存関係なく実行できます。

まず、以下を実行してAPI エンドポイント へアクセスするための準備と相互の処理に必要なパラメーター、トピック、テンプレートを用意します。

import asyncio
import nest_asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://integrate.api.nvidia.com/v1",
    api_key="nvapi-"  # 取得したAPIキーを入力してください
)

n_subtopics = 2
n_questions = 2
topic = "機械学習"

TOPIC_GENERATION_PROMPT_TEMPLATE = """\
トピックが与えられた場合、そのトピックに関連する {n_subtopics} のサブトピックのリストを生成してください。
トピックは:{topic}
リストは番号なしで、サブトピックの説明なしでなければなりません。サブトピックはコンマで区切られる必要があります。リスト以外のテキストは存在してはなりません。
"""

QUESTION_PROMPT_TEMPLATE = """\
トピックが与えられた場合、そのトピックに関して{n_questions}個の質問を生成してください。
トピックは:{sub_topic}
リスト形式で、質問は改行文字で区切られる必要があります。リスト以外のテキストは存在してはなりません。
"""

RESPONSE_PROMPT_TEMPLATE = """\
質問が与えられた場合、その質問に対して考えられる2つの回答を生成してください。
質問は:{question}
リスト形式は以下の形式である必要があります:

RESPONSE A: ここに回答Aのテキストを入力
RESPONSE B: ここに回答Bのテキストを入力
"""

次に、meta/llama-3.1-405b-instruct を呼び出して先ほど指定した「機械学習」というトピックから、より小さなサブトピックを生成します。

# generate sub topics
async def generate_subtopics(client, topic, n_subtopics):
    prompt = TOPIC_GENERATION_PROMPT_TEMPLATE.format(topic=topic, n_subtopics=n_subtopics)
    response = await client.chat.completions.create(
        model="meta/llama-3.1-405b-instruct",
        messages=[
            {"role" : "user",
             "content" : prompt}
        ],
        temperature=0.2,
        top_p=0.7,
        max_tokens=1024,
    )
    return response
subtopics = await generate_subtopics(client, topic, n_subtopics)
subtopic_list = subtopics.choices[0].message.content.split(",")
print(subtopic_list)

結果として、以下のようなサブトピックが生成できました (乱数の影響で異なる結果が得られることもあります)。

[‘ディープラーニング、強化学習’]

次にサブトピックから質問文を生成してみましょう。

# generate questions of sub topics
async def generate_questions(client, sub_topic, n_questions):
    prompt = QUESTION_PROMPT_TEMPLATE.format(sub_topic=sub_topic, n_questions=n_questions)
    response = await client.chat.completions.create(
        model="meta/llama-3.1-405b-instruct",
        messages=[
            {"role" : "user",
             "content" : prompt}
        ],
        temperature=0.2,
        top_p=0.7,
        max_tokens=1024,
    )
    if hasattr(response, 'choices') and response.choices:
        return response.choices[0].message.content
    else:
        print(f"Unexpected response structure: {response}")
        return None

async def question_generator(client, subtopic_list, n_question):
    tasks = [generate_questions(client, subtopic, n_question) for subtopic in subtopic_list]
    question_list = await asyncio.gather(*tasks)
    return question_list

nest_asyncio.apply()
question_list = asyncio.run(question_generator(client, subtopic_list, n_questions))
print(question_list)

# format questions
question_list_formatted = []
for question_set in question_list:
    question_list_formatted += question_set.split("\n\n")

実行が完了すると以下のような質問文が生成されます。

[‘ディープラーニングと強化学習の違いは何か?\n\nディープラーニングと強化学習はどのように組み合わせて使用できるのか?’]

次に先ほど生成した質問文から応答文を生成してみましょう。

# generate response of each question
async def generate_responses(client, question):
    prompt = RESPONSE_PROMPT_TEMPLATE.format(question=question)
    response = await client.chat.completions.create(
        model="meta/llama-3.1-405b-instruct",
        messages=[
            {"role" : "user",
             "content" : prompt}
        ],
        temperature=0.2,
        top_p=0.7,
        max_tokens=1024,
    )
    if hasattr(response, 'choices') and response.choices:
        return response.choices[0].message.content
    else:
        print(f"Unexpected response structure: {response}")
        return None

async def response_generator(client, question_list):
    tasks = [generate_responses(client, question) for question in question_list]
    response_list = await asyncio.gather(*tasks)
    return response_list

question_response_list = asyncio.run(response_generator(client, question_list_formatted))
print(question_response_list)

生成された応答文の例は以下のようなものになります。各質問に対し、応答候補が 2 つずつ生成されます。これは DPO (Direct Preference Optimization) の学習にも利用できるフォーマットです。

[‘RESPONSE A: ディープラーニングは、主に大規模なデータセットを使用してパターンを学習し、画像や音声の認識などのタスクに適用される一方、強化学習は、エージェントが環境とやり取りすることで、報酬を最大化する行動を学習する手法です。\n\nRESPONSE B: ディープラーニングは、ニューラルネットワークを使用してデータから特徴を抽出することに重点を置いており、強化学習は、エージェントが環境からのフィードバックに基づいて決定を下す方法を学習することに重点を置いています。’,

 ‘RESPONSE A: ディープラーニングと強化学習は、ディープラーニングを使用して強化学習のエージェントの状態と行動を表現することで組み合わせることができます。ディープラーニングのネットワークは、強化学習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。\n\nRESPONSE B: 強化学習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を学習し、最適な行動を選択することができます。ディープラーニングのネットワークは、強化学習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。また、強化学習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を学習し、最適な行動を選択することができます。’]

以下は先ほど生成したデータを jsonl 形式で出力するためのスクリプトです。

# prepare question:response pair set list
question_response_pair_list = []
for question, response_set in zip(question_list_formatted, question_response_list):
    question_response_pair_list.append(
        {
            "question" : question, 
            "responses" : {
                "response_a" : {"response" : response_set.split("RESPONSE B:")[0].replace("RESPONSE A:", "").strip().split("\n\n")[-1].strip()},
                "response_b" : {"response" : response_set.split("RESPONSE B:")[-1].split("\n\n")[0].strip()}
            },
        }
    )

import json

# export to jsonl file
with open('synthetic_data.jsonl', 'w') as f:
    for item in question_response_pair_list:
        f.write(json.dumps(item, ensure_ascii=False))
        f.write('\n')

(オプション): 合成データの検証

ここでは先ほど生成したデータを報酬モデル (Nemotron-4-340B-Reward) を使用して、品質ベースのフィルタリングを行う手順を紹介します。

Nemotron-4-340B-Reward」は、研究者や開発者が独自の LLM を構築するためのトレーニング データを生成する合成データ生成パイプラインの一部として利用できる多次元報酬モデルです。Nemotron-4-340B-Reward は、Nemotron-4-340B-Baseモデルと応答の最終トークンの表現を5つの HelpSteer2 属性に対応するスカラー値に変換する線形レイヤーで構成されています。

このモデルは、最大 4,096 トークンのコンテキストをサポートしており、以下の属性を (通常 0 から 4 の範囲で) 評価します。

  • 有用性 (Helpfulness): プロンプトに対する応答の全体的な有用性。
  • 正確性 (Correctness): 関連する事実がエラーなく含まれているかどうか。
  • 一貫性 (Coherence): 表現の一貫性と明瞭さ。
  • 複雑性 (Complexity): 応答を書くために必要な知的深さ (基本的な言語能力を持つ誰でも書けるものか、深い専門知識が必要かどうか)。
  • 冗長性 (Verbosity): プロンプトで要求される詳細の量に対して、応答に含まれる詳細の量。

以下で、先ほど生成したデータの応答文に対して、報酬モデルでスコアリングを実行してみましょう。

# running reward scoring model to evaluate the responses 
def get_scores_from_response(openai_response_template):
    logprobs = openai_response_template.choices[0].logprobs.content
    score_dict = {}
    for score in logprobs:
        score_dict[score.token] = score.logprob
    return score_dict

async def get_response_and_scores(client, question, response_content):
    messages = [
        {"role": "user","content": question},
        {"role": "assistant","content": response_content},]
    response = await client.chat.completions.create(
        model="nvidia/nemotron-4-340b-reward",
        messages=messages,
    )
    scores = get_scores_from_response(response)
    return scores

# scoring for question:response pair set
async def process_question_response_pairs(client,question_response_score_list):
    tasks = []
    for question_response_pair in question_response_score_list:
        question = question_response_pair["question"]
        
        task_a = get_response_and_scores(client, question, question_response_pair["responses"]["response_a"]["response"])
        task_b = get_response_and_scores(client, question, question_response_pair["responses"]["response_b"]["response"])
        
        tasks.append((task_a, question_response_pair, "response_a"))
        tasks.append((task_b, question_response_pair, "response_b"))
    results = await asyncio.gather(*[task[0] for task in tasks])
    
    for i, (result, task_info) in enumerate(zip(results, tasks)):
        _, question_response_pair, response_key = task_info
        question_response_pair["responses"][response_key].update(result)
question_response_score_list = question_response_pair_list.copy()
await process_question_response_pairs(client, question_response_score_list)
print(question_response_score_list)

出力は以下のようになります。

[{'question': 'ディープラーニングと強化学習の違いは何か?',
  'responses': {'response_a': {'response': 'ディープラーニングは、主に大規模なデータセットを使用してパターンを学習し、画像や音声の認識などのタスクに適用される一方、強化学習は、エージェントが環境とやり取りすることで、報酬を最大化する行動を学習する手法です。',
    'helpfulness': 2.796875,
    'correctness': 2.875,
    'coherence': 3.59375,
    'complexity': 1.53125,
    'verbosity': 1.3359375,
    'question': 'ディープラーニングと強化学習の違いは何か?'},
   'response_b': {'response': 'ディープラーニングは、ニューラルネットワークを使用してデータから特徴を抽出することに重点を置いており、強化学習は、エージェントが環境からのフィードバックに基づいて決定を下す方法を学習することに重点を置いています。',
    'helpfulness': 2.671875,
    'correctness': 2.796875,
    'coherence': 3.65625,
    'complexity': 1.46875,
    'verbosity': 1.3515625,
    'question': 'ディープラーニングと強化学習の違いは何か?'}}},
 {'question': 'ディープラーニングと強化学習はどのように組み合わせて使用できるのか?',
  'responses': {'response_a': {'response': 'ディープラーニングと強化学習は、ディープラーニングを使用して強化学習のエージェントの状態と行動を表現することで組み合わせることができます。ディープラーニングのネットワークは、強化学習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。',
    'helpfulness': 2.609375,
    'correctness': 2.84375,
    'coherence': 3.65625,
    'complexity': 1.6953125,
    'verbosity': 1.3828125,
    'question': 'ディープラーニングと強化学習はどのように組み合わせて使用できるのか?'},
   'response_b': {'response': '強化学習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を学習し、最適な行動を選択することができます。ディープラーニングのネットワークは、強化学習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。また、強化学習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を学習し、最適な行動を選択することができます。',
    'helpfulness': 2.671875,
    'correctness': 2.703125,
    'coherence': 3.390625,
    'complexity': 1.7109375,
    'verbosity': 1.6640625,
    'question': 'ディープラーニングと強化学習はどのように組み合わせて使用できるのか?'}}}]

「有用性」のスコアを元にサンプルをフィルタリングし、データセットの品質を高めることや、各サンプルの「有用性」が高い方の応答を「選ばれた応答」とすることで、DPO のデータセットとしても活用できます。

本記事で紹介できなかった機能

  • ルールベースもしくは分類器ベースの品質フィルタリング
    大規模なデータセット内には、多くの場合、低品質とみなされるドキュメントが多数含まれています。品質を定義する指標はさまざまありますが、ドキュメントに含まれる句読点の数、長さ、繰り返しの頻度などのシンプルな統計情報で測定してフィルタリングするアプローチや高品質なデータが存在する場合はそれを使用して単純な分類器を学習し、フィルタリングするアプローチなどがあります。NeMo Curator ではこれらのアプローチに対応したモジュールを提供しています。詳細はこちらを参照してください。
  • PyTorch モデルを使ったデータ分類
    RAPIDS AI の CrossFit を活用して、マルチノード、マルチ GPU へスケーリング可能な PyTorch のモデルを使用したドキュメントの分類 (例: ドメインや品質) を実行可能です。NVIDIA では、ドキュメントのドメインを分類する nvidia/domain-classifier、品質を分類する nvidia/quality-classifier-deberta をオープン ソースでリリースしています。実行の手順はこちらを参照してください。
  • 個人識別情報 (PII) の削除
    データセットには意図せずに個人情報 (氏名、電話番号、メール アドレス、住所など) が含まれてしまっている可能性があります。PII の削除、匿名化によって、データセットから機密データを除去することが望ましいです。実行の手順はこちらを参照してください。
  • 指示プロンプトの追加
    指示チューニング用にデータセット内の各レコードへ指示プロンプトを追加することも可能です。実行の手順はこちらを参照してください。
  • ダウンストリーム タスクの除染、重複排除
    大規模なデータセット内には、ダウンストリーム タスクで LLM を評価するために使用するテスト データが紛れてしまっている可能性があります。NeMo Curator は OpenAI GPT-3Microsoft Turing NLG 530B のアプローチに従い 、ダウンストリーム タスクに存在するデータを大規模データセットから削除する機能を提供しています。詳細はこちらを参照してください。

テキストの品質に関する言及が複数ありましたが、NeMo Curator はテキストの品質を判断する複数の方法を提供しています。ドキュメントには、ベスト プラクティスとして紹介した各手法の長所、短所がまとめられています。

まとめ

本記事では、NeMo Curator を使用したデータセットの構築方法を紹介しました。NeMo Curator を使用して、開発者の方のデータセット構築がより効率的に進むと嬉しいです。


関連情報

Tags