Hello Techiesš! Iām Samiksha. I hope you all are doing amazing stuff. Welcome to another BlogCast About amazing and trending stuff in the market today: The Nemo Curator(The GPU-Accelerated Open Source Framework for Efficient Generative AI Model Data Curation). Nemo Curator is the one best solution for your Pre-training/Finetuning/Synthetic data Preparation without caring about resources and scaling.
Data is the Fuel for Finetuning LLMāsā¦
In one of my recent works on Data Scraping Pipelines and Research over Data-preparation strategies for pre-training a LLama-3.2 model. If we have millions of raw data & want to pre-process that much data from scratch is time-consuming, resource-intensive. Hence Using Nemo Curator as a distributed framework uses Dask for distributed workloads and supports Massive Scaling with both CPU and GPUās. As we all know Data-preparation is a very crucial step in any Machine learning project lifecycle. Typical data preprocessing involves redundancy removal, filling null values, missing data removal, encoding features, etc. Likewise, LLM data preprocessing especially for text-data preparation involves stopwords removal, stemming/lemmatization, Removal of duplicate data, Making content safe, removal of harmful text data, and deduplication, PII Redaction etc. are necessary to finetune Robust LLM model.
Sounds Exciting!!š¤©
Note: In this article, Iāve discussed a sample data-preprocessing pipeline over some samples of common-crawl data. This article is beginner-friendly for anyone interested in fine-tuning LLMās or exploring data-preparation. generation stratergies:)
Code: Please find the practical code used in this article here: https://github.com/kolhesamiksha/Nemo_Curator/blob/main/Data_Preprocessing_Nemo_Curator.ipynb
Here is the official nemo curator SDK Code, You can analyze each module and understand the framework more deeply.
What you'll learn from this article:
Table of Contents:
Introduction to NVIDIA NeMo Curator
- Overview and Key Pillars: Performance, Scalability, Customizability
Core Functionalities for Data Preparation
Data Acquisition and Preprocessing
Quality and Privacy Filtering
Deduplication and Classification
Scaling and Customization
Streamlined Scalability with Dask.
Customizing Pipelines for Business Needs.
Practical Applications and Synthetic Data Generation
Sample Code for Data Preparation.
Generating Synthetic Data from Preprocessed Inputs.
BONUS: Synthetic Data Generation over preprocessed data.
Wait, if youāre new to the Generative AI or Transformers phase, then letās first understand these topics from my previous articles here. which helps you to understand how the data-preprocessing step is crucial for LLM models to shape their knowledge.
What is Nemo Curator?
NVIDIA Nemo Curator is a GPU-accelerated data-curation tool that improves generative AI model accuracy by processing text, image, and video data at scale for training and customization. It also provides pre-built pipelines for generating synthetic data to customize and evaluate generative AI systems.
NeMo Curator supports data curation for model pretraining and was engineered on the following key pillars: performance, scalability, and customizability.
It can seamlessly scale across thousands of compute cores and uses highly optimized CUDA kernels to effortlessly perform a variety of data acquisition, preprocessing, and cleaning tasks, enabling enterprise developers to focus on problem-solving.
Built with extensibility and flexibility, NeMo Curator enables developers to customize data curation pipelines to suit their business needs and address their unique challenges. Each component can be quickly customized via easy-to-use configuration files.
Simultaneously, the frameworkās Pythonic API offers deeper customization of the data curation pipeline with a few lines of code.
Today, NeMo Curator provides the following functionality out of the box:
Data download and extraction
Text cleaning and language identification
Quality filtering
Privacy filtering
Domain and toxicity classification
Deduplication
Streamlined scalability
Support for model customization tasks
Letās understand each step in more detail further.
Data download and extraction
NeMo Curator comes with several helpers for downloading and extracting data from commonly used sources. Out of the box, NeMo Curator can download Common Crawl snapshots, and arXiv bulk data from Amazon S3 and Wikipedia. It also provides helpers for text extraction and preparation for subsequent data operations by organizing the downloaded data into the JSON Lines format, a widely used format for working with textual data. Users can also adapt and customize these modules to support data from arbitrary sources.
Text cleaning and language identification
After data acquisition but before further processing the data, an important step is to unify all the text into the Unicode format and identify the languages that are present throughout the acquired data. NeMo Curator uses the widely used ftfy: fixes text for your library to resolve all Unicode-related issues. NeMo Curator also provides helpers to identify the languages contained in every acquired document and organize them accordingly, which facilitates discarding irrelevant documents for LLM training.
Quality filtering
NeMo Curator comes with a set of predefined qualitative criteria that are heuristics-based, as well as ML-based. Use the criteria to categorize documents into highā and low-quality buckets, enabling rapid dataset iteration and ensuring an expected level of quality from the acquired data. Customize these predefined criteria with configuration files to tune them to the individual business needs.
Privacy filtering
Compliance with data protection regulations is an important consideration for any enterprise solution. NeMo Curator provides a GPU-accelerated PII detection and redaction module. You can specify the categories to redact and how to redact them. For example, you could detect all names and addresses and replace them with other tokens.
Domain and toxicity classification
Another aspect of ensuring data quality and relevance is to identify and remove out-of-domain, as well as toxic data. You can define custom filters to clean up your datasets and integrate them with external tools and machine learning models to classify the data into relevant and irrelevant categories.
Deduplication
Internet-scale data can contain many identical or near-identical documents, which could incur storage and compute costs, and potentially degrade the modelās performance. NeMo Curator provides a configurable de-duplication module, which leverages highly optimized CUDA implementations of the MinHash and other commonly used algorithms to de-duplicate the documents.
Streamlined scalability
NeMo Curator uses Dask, an open-source and commercially friendly parallel computing library to easily scale across many CPUs and GPUs and accelerate every component of the data curation pipeline. NeMo Curator easily integrates with Dask data structures and supports Dask arrays, as well as RAPIDS cuDF, to offload the processing to the correct resource with minimal intervention from developers.
Support for model customization tasks
In the near future, NeMo Curator will also support data curation for model customization tasks such as supervised fine-tuning (SFT) and parameter-efficient fine-tuning (PEFT) approaches such as LoRA and P-tuning. NeMo Curator enables sampling and blending various datasets for SFT in NeMo Aligner, which enables model customization and alignment with commercially permissible datasets to achieve near state-of-art model quality.
Practical Sample Code for Nemo Curator Data Preparation
Please find the curator steps and code here: https://github.com/kolhesamiksha/Nemo_Curator.
Note: Below is practical code I tried on Google with T4 available GPU, alternate you can run it on your suitable environment, make sure to use Python: 3.10.12 supported by Curator.
Data Curation & Preparation:
The text curator wants your data fields to be in the below format:
'text': data field needs to be converted from e.g. 'page_content' to 'text'.
'id': add a primary key key which should be unique, and useful for indexing/shuffling and data removal.
Dataset download: Common-Crawl
First, download the English Wikipedia data from the archive. The downloading pipeline in NeMo Curator consists of the following classes:
DocumentDownloader
: Abstract class for downloading remote data to disk.DocumentIterator
: Abstract class for reading dataset raw records from the disk.DocumentExtractor
: Abstract class for extracting text records, as well as any relevant metadata from the records on the disk.
These classes are highly flexible so you can modify the implementation to download any desirable dataset. NeMo Curator also provides the implementation for downloading popular open-source datasets such as CommonCrawl, Wikipedia, and arXiv. For this post, use the predefined downloader to download the Wikipedia dataset.
Before downloading, run the following code to start a Dask client. This starts a Dask LocalCluster
on your CPU. It can be reused for all modules except for deduplication, which requires a CPU cluster.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=10, processes=True, memory_limit='16GB')
client = Client(cluster)
Run the following code to download the Thai Wikipedia dataset. This downloads the Thai Wikipedia ā20240201ā snapshot to your local disk. To download other snapshots, you can replace the dump_date
parameter. To download other Wikipedia datasets for other languages, you can replace the language
parameter. The downloading process takes approximately 1-2 hours.
from nemo_curator.download import download_common_crawl
start_date = "2021-04"
end_date = "2021-05"
language = 'en'
url_limit=10
res = download_common_crawl(OUTPUT_FOLDER,
start_snapshot=start_date,
end_snapshot=end_date,
force_download=True,
url_limit=url_limit).df.compute()
Before proceeding, Itās recommended that you preprocess the dataset by adding a customized ID for each document. The ID is used as a tracker to identify duplicate documents or low-quality documents.
When processing multiple datasets, adding customized IDs also becomes important, as the original ID of each dataset might be duplicated. In this case, customized IDs can be used to distinguish between different datasets. NeMo Curator provides an AddId
class for you to insert customized IDs in the format of <prefix>_<id>
. Or Create your function to convert your text column name to text
(mandatory).
import os
import json
def process_jsonl_files(input_folder, output_folder):
"""
Process all JSONL files in the input folder, applying the data preparation logic,
and save the results to the output folder.
:param input_folder: Path to the folder containing JSONL input files.
:param output_folder: Path to the folder where modified JSONL files will be saved.
"""
# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)
# Iterate through all files in the input folder
for filename in os.listdir(input_folder):
if filename.endswith('.jsonl'):
input_file_path = os.path.join(input_folder, filename)
output_file_path = os.path.join(output_folder, f"modified-{filename}")
try:
# Process the file
with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
for line in infile:
try:
# Load the line as a JSON object
data = json.loads(line.strip())
# Replace 'page_content' with 'text' if it exists
# if 'page_content' in data:
# data['text'] = data.pop('page_content')
# Create the inline format string
formatted_line = json.dumps({
"text": data.get("text", ""),
'id': data['metadata'].get("document_id", ""),
'metadata': data['metadata'],
})
# Write the formatted line to the output file
outfile.write(formatted_line + '\n')
except json.JSONDecodeError as e:
print(f"Error decoding JSON in file {filename}: {e}")
except Exception as e:
print(f"An error occurred in file {filename}: {e}")
except Exception as e:
print(f"Failed to process file {filename}: {e}")
process_jsonl_files(INPUT_FOLDER, OUTPUT_FOLDER)
QuotationUnifier reformatter:
In NeMo Curator, we can use the DocumentModifier
interface to define how documents in the dataset should be modified. The helper function Modify
takes a DocumentModifier
object as well as a DocumentDataset
object and modifies the dataset based on the modifier.
Quotation Unifier is a custom class created which extends DocumentModifier module helps in creating a custom modifier which fits in the curator pipeline modifier component. this functionality helps us to define our own modifiers based on the intricacies of our own data.
from nemo_curator.modifiers import DocumentModifier
class QuotationUnifier(DocumentModifier):
"""
A simple modifier that unifies the quotation marks in the documents.
"""
def modify_document(self, text: str) -> str:
"""
Modifies the given text by replacing left and right single quotes with normal single quotes,
and replacing left and right double quotes with normal double quotes.
Args:
text (str): The text to be modified.
Returns:
str: The modified text.
"""
text = text.replace("ā", "'").replace("ā", "'")
text = text.replace("ā", '"').replace("ā", '"')
return text
Unicode reformatter
Another preliminary data-cleaning process used is unification. Data scraped from the Internet often contains various Unicode encodings and special characters that can lead to inconsistencies and errors in further processing. Running unification on the scraped data helps standardize the text into a consistent format, making it cleaner for LLM training.
A fundamental operation in data curation pipelines involving text data is text unification and cleaning, as text scraped from online sources may contain inconsistencies or Unicode issues. Such inconsistencies (poor quality tokens, for example) may cause problems for models that are trained on this data.
def clean_and_unify(dataset: DocumentDataset) -> DocumentDataset:
"""
Cleans and unifies the given dataset using a set of predefined cleaners.
Args:
dataset (DocumentDataset): The dataset to be cleaned and unified.
Returns:
DocumentDataset: The cleaned and unified dataset.
"""
import re
cleaners = Sequential(
[
# Unify all the quotation marks
Modify(QuotationUnifier()),
# Unify all unicode
Modify(UnicodeReformatter()),
]
)
cleaned_dataset = cleaners(dataset)
def sanitize_text(text):
# Remove control characters
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
# Replace empty strings with a space
if not text.strip():
text = " "
return text
cleaned_dataset.df['text'] = cleaned_dataset.df['text'].apply(sanitize_text)
return cleaned_dataset
have applied sanitise_text function, as the dataset contains some strings that donāt contain any text just some javascript text.
Data quality is undoubtedly one of the most important factors regarding LLM training performance. Advanced data curation techniques such as deduplication and heuristic filtering are often applied to yield better data quality.
Note: Curator pipeline works on a dataset format of type āDocumentDatasetā. Like Langchain as it works overall on the āDocumentā type for its operations.
Heuristic Filtering:
Another important step in the dataset curation process is data filtering, where some documents that do not fit certain criteria are discarded. For instance, you might want to discard documents that are too short, too long, or incomplete. At the time of writing, NeMo Curator provides 24 heuristics for natural languages, as well as eight heuristics for coding languages.
NeMo Curator provides a DocumentFilter
interface, which defines a way to score documents based on various criteria and a ScoreFilter
helper to filter the documents. The ScoreFilter
helper takes a DocumentDataset
along with a DocumentFilter
and determines whether each document in the dataset passes the filtering criteria.
In My case, I donāt want to work on a dataset with less content i.e. Less than 80 words, and use frequency-based methods to try to find the frequency between the consecutive words kinda more heuristic approach to find context and make the text more meaningful.
def filter_dataset(dataset: DocumentDataset) -> DocumentDataset:
"""
Filters the given dataset based on various criteria.
Args:
dataset (DocumentDataset): The dataset to be filtered.
Returns:
DocumentDataset: The filtered dataset.
"""
filters = Sequential(
[
ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
score_type=int,
),
ScoreFilter(
RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2),
text_field="text",
score_type=float,
),
ScoreFilter(
RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18),
text_field="text",
score_type=float,
),
ScoreFilter(
RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16),
text_field="text",
score_type=float,
),
]
)
filtered_dataset = filters(dataset)
return filtered_dataset
Distributed Data Classification
The Domain Classifier is useful because it helps the LLM understand the context and specific domain of the input text. Because different domains have different linguistic characteristics and terminologies, an LLMās ability to generate contextually relevant responses can be improved by tailoring training data to a specific domain. Overall, this helps provide more accurate and specialized information.
The Quality Classifier is useful for filtering out noisy or low quality data. This allows the model to focus on learning from high quality and informative examples, which contributes to the LLMās robustness and enhances its ability to generate reliable and meaningful outputs. Additionally, quality classification helps mitigate biases and inaccuracies that may arise from poorly curated training data.
The AEGIS Safety Models are essential for filtering harmful or risky content, which is critical for training models that should avoid learning from unsafe data. By classifying content into 13 critical risk categories, AEGIS helps remove harmful or inappropriate data from the training sets, improving the overall ethical and safety standards of the LLM.
Quality Classifier:
The Quality Classifier is designed to assess the quality of text documents, helping to filter out low-quality or noisy data from your dataset i.e "High", "Medium".
#Optional Step to Run
#RUN on GPU only, Runs on a specific version of Cuda GPU & requires sufficient RAM > 15GB
from nemo_curator.classifiers import QualityClassifier
def quality_classifier(dataset: DocumentDataset) -> DocumentDataset:
quality_classifier = QualityClassifier(filter_by=["High", "Medium"])
result_dataset = quality_classifier(dataset=dataset)
return result_dataset
# result_dataset.to_json(OUTPUT_FOLDER)
The AEGIS Safety Models: Data Safety
aegis is a family of content-safety LLMs used for detecting if a piece of text contains content that is a part of 13 critical risk categories. There are two variants, defensive and permissive, that are useful for filtering harmful data out of your training set. The models are parameter-efficient instruction-tuned versions of Llama Guard based on Llama2-7B trained on the NVIDIA content-safety dataset
āsafeā means that the document is considered safe by the model.
āO1ā through āO13ā mean the document is unsafe according to the model. Each number corresponds to a different category of safety from the safety taxonomy defined in the paper and listed on the model cards.
āunknownā means that the LLM output a non-standard response. To view the raw response of the LLM, you can set keep_raw_pred=True and raw_pred_column="raw_predictions"
# Optional Step to run
# Runs on a specific version of Cuda GPU & requires sufficient RAM > 15GB
from nemo_curator.classifiers import AegisClassifier
def Aegis_classifier(dataset: DocumentDataset) -> DocumentDataset:
token = "" # Replace with your user Hugging face access token
safety_classifier = AegisClassifier(
aegis_variant="nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0",
token=token,
filter_by=["safe", "O13"],
keep_raw_pred=True,
raw_pred_column="raw_predictions",
device_type=DEVICE
)
result_dataset = safety_classifier(dataset=dataset)
return result_dataset
#result_dataset.to_json(OUTPUT_FOLDER)
Data Deduplication
It works on overall document processing, it checks for any documents in the dataset with any duplicates, duplicate/redundant data increases memory and resources but doesnāt add any value during fine-tuning/pre-training.
Fuzzy Deduplication
- Fuzzy Deduplication When removing near-duplicates within the corpus, we perform fuzzy deduplication at the document level in order to remove documents with high Jaccard similarity scores
How It Works
Compute Minhashes: The first stage involves computing MinHash Signatures on documents. NeMo Curator currently only supports character-based n-grams for MinHashing. An approximate metric of ~4.5 characters per word can be used to determine the n-gram size for users familiar with word-based ngrams.
LSH (Locality Sensitive Hashing): Perform LSH to find candidate duplicates.
Buckets to Edgelist: If not using the false positive check, we directly convert the LSH buckets to edges for the connected components computation.
False Positive Check (optional alternative to Buckets to Edgelist) : Due to the approximate nature of the bucketing via MinHash + LSH (Leskovec et al., 2020), NeMo Curator provides the option to further process each of the buckets by computing some pairwise Jaccard similarity scores between documents in each bucket and filter out false positives that might have been hashed into the same bucket.
Connected Components: Due to the approximate nature of LSH, documents that are near duplicates may be assigned into different buckets with a few overlapping documents between these buckets. We use a GPU accelerated connected components algorithm to find all connected components in the graph formed by the edges between documents in the same bucket.
The result from the connected components step is a list of document IDs and the group they belong to. All documents in the same group are considered near duplicates. These results can be used to remove the near duplicates from the corpus.
from nemo_curator import FuzzyDuplicatesConfig
from nemo_curator import FuzzyDuplicates
from nemo_curator.datasets import DocumentDataset
#Optional Step to run, only if running on larger set of documents
def fuzzy_dedupe(dataset: DocumentDataset) -> DocumentDataset:
"""
Remove exact duplicates from the given DocumentDataset.
Args:
dataset (DocumentDataset): The dataset containing documents.
Returns:
DocumentDataset: The deduplicated dataset.
"""
config = FuzzyDuplicatesConfig(
#cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="id",
text_field="text",
seed=42,
char_ngrams=24,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=2,
false_positive_check=False,
)
FuzzyDups = FuzzyDuplicates(config=config, logger="./")
# Find the duplicates
duplicates = FuzzyDups(dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)
# Remove the duplicates using their IDs.
duplicate_ids = list(docs_to_remove.compute().id)
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)
Exact Deduplication
Exact Deduplication Exact deduplication refers to removing identical documents (i.e., document strings that are equal) from the dataset.
As exact deduplication requires significantly less compute, we typically will run exact deduplication before fuzzy deduplication. Also, from our experience in deduplicating Common Crawl snapshots, a significant portion (as high as ~40%) of the duplicates can be exact duplicates.
How It Works
- Exact dedpulication works by hashing each document and only keeping one document per hash. Running exact deduplication works on both CPU- and GPU-based backends.
def dedupe(dataset: DocumentDataset) -> DocumentDataset:
"""
Remove exact duplicates from the given DocumentDataset.
Args:
dataset (DocumentDataset): The dataset containing documents.
Returns:
DocumentDataset: The deduplicated dataset.
"""
deduplicator = ExactDuplicates(text_field="text", hash_method="md5")
# Find the duplicates
duplicates = deduplicator(dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)
# Remove the duplicates using their IDs.
duplicate_ids = list(docs_to_remove.compute().id)
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)
PII Redaction
leverages the Presidio framework and enables you to specify which PII to detect, what action to take for each detection, and process the data in batches to accelerate the operation.
Data Curation Pipeline
def run_curation_pipeline(jsonl_dir: str) -> None:
"""
Run the curation pipeline on the TinyStories dataset.
Args:
args (Any): Command-line arguments.
jsonl_dir (str): Directory path where the JSONL files are stored.
"""
# Initialize the Dask cluster.
client = get_client(cluster_type=DEVICE)
print(f"Running curation pipeline on '{jsonl_dir}'...")
files = [
fp
for fp in get_all_files_paths_under(jsonl_dir, recurse_subdirectories=False)
if fp.endswith(".jsonl")
]
print(files)
print("Reading the data...")
orig_dataset = DocumentDataset.read_json(files, add_filename=True)
dataset = orig_dataset
curation_steps = Sequential(
[
clean_and_unify,
filter_dataset, #quality_classifier/Aegis_classifier
dedupe, #fuzzy_dedup
redact_pii,
]
)
dataset = curation_steps(dataset)
print("Executing the pipeline...")
dataset = dataset.persist()
print(dataset)
print(f"Original dataset length: {len(orig_dataset.df)}")
print(f"After dataprep: {len(dataset.df)}")
print("Writing the results to disk...")
# Overwrite existing files in the curated directory.
out_path = os.path.join(jsonl_dir, "curated")
print(f"OUT PATH: {out_path}")
if os.path.isdir(out_path):
shutil.rmtree(out_path)
os.makedirs(out_path)
dataset.to_json(out_path, write_to_filename=True)
client.close()
run_curation_pipeline(OUTPUT_FOLDER)
Synthetic Data Generation
NeMo Curator has a simple, easy-to-use set of tools that let you use prebuilt synthetic data generation pipelines or build your own. Any model inference service that uses the OpenAI API is compatible with the synthetic data generation module, allowing you to generate your data from any model.
model architecture from scratch and the model behavior changes as per the model and its solution.
Andd!!! Hereās come to the endš„³, In this article weāve discussed Nemo Curator for data preprocessing and preparation a practical Example of a curation pipeline over Common-crawl data + synthetic dataset generation.
In the Next aticle, Iāll be focusing on How to finetune LLM models and LLM model deployment.
Please feel free to contribute to this article in the comments, and share your insights and experience in building and improving RAG systems. This will help everyone to learn from each others experience!!!.
till then, Stay tuned and follow our newsletter to get daily updates & Built projects end to end!! Connect with me on linkedin, github, kaggle.
Let's Learn and grow together:) Stay Healthy stay HappyāØ. Happy Learning!!