Skip to content

Conversation

@BlueCrescent
Copy link
Collaborator

  • Included an example configuration file.
  • Added datatrove and pydantic-settings to requirements.
  • Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.

…ized data using scores.

- Included an example configuration file.
- Added datatrove and pydantic-settings to requirements.
- Note that modalities is also required for the pipeline to work, but it is not included in the requirements file.
@BlueCrescent BlueCrescent requested a review from Copilot July 25, 2025 08:39
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a data filtering pipeline using datatrove for filtering tokenized data based on scores. The pipeline processes JSONL files containing scores for data samples and filters corresponding tokenized datasets based on configurable thresholds.

  • Adds a complete datatrove-based filtering pipeline with score parsing and data filtering components
  • Introduces configuration management using pydantic-settings for both local and Slurm execution environments
  • Updates dependencies to include datatrove and pydantic-settings

Reviewed Changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py Implements ScoresParser class for reading JSONL score files and mapping to tokenized data
src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py Implements DataFiltering class for filtering datasets based on score thresholds
src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py Main pipeline orchestration with configuration management and execution settings
pyproject.toml Adds datatrove and pydantic-settings dependencies
configs/data_processing/example_filter_pipeline_config.yaml Example configuration file for the filtering pipeline
Comments suppressed due to low confidence (1)

src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py:241

  • [nitpick] The error message could be more helpful by providing an example of how to use the FilterPipelineBuilder class directly or where to find documentation.
            "and use the FilterPipelineBuilder class directly."

@ajude2s ajude2s requested a review from AbasKhan October 29, 2025 20:47
@ajude2s ajude2s self-assigned this Nov 2, 2025
document = self.get_document_from_dict(doc_content, filepath, 0)
return [document]

def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scores are emitted in lexicographic order of the document IDs. IDs such as sample1, sample2, sample10 will be reordered to sample1, sample10, sample2, so the thresholds get applied to the wrong rows in the packed dataset. Please preserve the original file order (e.g. rely on insertion order or track the original line index when deduplicating).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a0698c2

output_folder (Path): The folder where the filtered datasets will be saved.
thresholds (dict[str, float]): A dictionary where keys are score names and values are the
thresholds to filter samples.
hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like an artifact

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in f2e8f24

@ajude2s ajude2s requested a review from AbasKhan December 10, 2025 11:47

sbatch_args = values.get("sbatch_args") or {}
if isinstance(sbatch_args, _DictConfig):
sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this not throw an error ?, unless you import OmegaConf ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OmegaConf is imported.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e791792

@@ -0,0 +1,173 @@
import json
import logging
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import, please remove it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 85e3f5c

"""
Maps a base file path to the corresponding tokenized data path.
Args:
base_file_path (str): The path of the base file.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the docstrings to reflect the new changes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 1c3656c

_TOKENIZER_CACHE: dict[str, Any] = {}

HEADER_SIZE = 64 # Mimics EmbeddedStreamData.HEADER_SIZE_IN_BYTES (simplified for tests)
DATA_SECTION_LEN_BYTES = 8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsed constants DATA_SECTION_LEN_BYTES and TOKEN_SIZE_DESC_LEN_BYTES

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 1c3656c

from modalities.dataloader.filter_packed_data import filter_dataset
except ImportError:
logging.error("The filtering pipeline requires the 'modalities' package to be installed.")
exit(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using exit(1) is not ideal , i would say something like

try:
    from modalities.dataloader.filter_packed_data import filter_dataset
except ImportError as exc:
    raise ImportError(
        "The filtering pipeline requires the optional dependency 'modalities'. "
        "Install it via `pip install modalities` and try again."
    ) from exc

would be better

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 379df23

"""

name = "ScoresParser"
# type = "Parser"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this line altogether

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 379df23

Copy link
Collaborator

@AbasKhan AbasKhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from a minor change , rest looks really good . Well done


sbatch_args = values.get("sbatch_args") or {}
if isinstance(sbatch_args, _DictConfig):
sbatch_args = OmegaConf.to_container(sbatch_args, resolve=True) # type: ignore[arg-type]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm ? from omegaconf import DictConfig as _DictConfig does not import OmegaConf. I am not sure why the code is not throwing an error here, from omegaconf import DictConfig as _DictConfig, OmegaConf should be the way

]
return pipeline

if __name__ == "__main__":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here ?, I think we should have a entry point in main.py rather

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in e19f4a0

@ajude2s ajude2s requested a review from AbasKhan December 11, 2025 14:31
Copy link
Collaborator

@AbasKhan AbasKhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge it. But, I would suggest to add Mehdi or Max and second reviewer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants