From 944bff22effa7ada0bac7f81877a2430f296c6c7 Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 03:49:11 +0800 Subject: [PATCH 1/8] fix: regression script output --- script/diffjson.py | 318 ++++++++++++++++++++++++++------------------- 1 file changed, 185 insertions(+), 133 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index b929f57..eb85c4c 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import argparse +from dataclasses import dataclass import json import os import re @@ -7,13 +8,81 @@ from pathlib import Path from typing import Literal +import IPython from deepdiff import DeepDiff -# Define status types for clarity +ONE_LINER_LEN = 100 + Status = Literal["OK", "BAD", "FILE_ERROR"] -def parse_accessor(accessor_string: str) -> list[str | int]: +@dataclass +class DiffResult: + status: Status + diff: DeepDiff | None + json1: any + json2: any + + def format(self, truncate_items: int) -> str: + output = [] + items_count = 0 + + def add_item(text: str): + nonlocal items_count + if truncate_items == 0 or items_count < truncate_items: + output.append(text) + output.append("--------------------") + items_count += 1 + + # Handle new items (dictionary_item_added and iterable_item_added) + if "dictionary_item_added" in self.diff: + for path in self.diff["dictionary_item_added"]: + one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) + add_item(f"New item: {path}\n Add: {one_liner}") + + if "iterable_item_added" in self.diff: + for path, value in self.diff["iterable_item_added"].items(): + one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) + add_item(f"New item: {path}\n Add: {one_liner}") + + # Handle removed items (dictionary_item_removed and iterable_item_removed) + if "dictionary_item_removed" in self.diff: + for path, value in self.diff["dictionary_item_removed"]: + one_liner = _format_value_one_liner(value) + add_item(f"Removed item: {path}\n Remove: {one_liner}") + + if "iterable_item_removed" in self.diff: + for path, value in self.diff["iterable_item_removed"].items(): + one_liner = _format_value_one_liner(value) + add_item(f"Removed item: {path}\n Remove: {one_liner}") + + # Handle changed values + if "values_changed" in self.diff: + for path, changes in self.diff["values_changed"].items(): + old_one_liner = _format_value_one_liner(changes["old_value"]) + new_one_liner = _format_value_one_liner(changes["new_value"]) + add_item( + f"Changed item: {path}\n Old: {old_one_liner}\n New: {new_one_liner}" + ) + + # Handle items moved (position changes in lists) + if "values_changed" not in self.diff and "iterable_item_moved" in self.diff: + for path, changes in self.diff["iterable_item_moved"].items(): + add_item(f"Moved item: {path}\n Position changed in list") + + # Add truncation notice if needed + if truncate_items > 0 and items_count > truncate_items: + remaining = items_count - truncate_items + output.append(f"...({remaining} more items)") + + # Clean up the last separator for a tidy output + if output and output[-1] == "--------------------": + output.pop() + + return "\n".join(output) + + +def _parse_accessor(accessor_string: str) -> list[str | int]: """ Parses a field accessor string like "['key'][0]" into a list ['key', 0]. This allows for programmatic access to nested JSON elements. @@ -31,7 +100,7 @@ def parse_accessor(accessor_string: str) -> list[str | int]: return keys -def delete_path(data: dict | list, path: list[str | int]): +def _delete_path(data: dict | list, path: list[str | int]): """ Deletes a value from a nested dictionary or list based on a path. This function modifies the data in place. If the path is invalid @@ -63,63 +132,44 @@ def delete_path(data: dict | list, path: list[str | int]): pass -def format_diff_custom(diff: DeepDiff) -> str: +def _get_path(data: dict | list, path: list[str | int]) -> any: """ - Formats a DeepDiff object into a custom human-readable string. - This provides a clear, indented view of changes. + Retrieves a value from a nested dictionary or list based on a path. + Returns None if the path is invalid or doesn't exist. """ - output = [] - - # Helper to format a value for printing. Pretty-prints dicts/lists. - def format_value(value): - if isinstance(value, (dict, list)): - return json.dumps(value, indent=2) - return repr(value) - - # Handle changed values - if "values_changed" in diff: - for path, changes in diff["values_changed"].items(): - output.append(f"Value Changed at: {path}") - output.append(f" - old: {format_value(changes['old_value'])}") - output.append(f" + new: {format_value(changes['new_value'])}") - output.append("--------------------") - - # Handle added items to lists/sets - if "iterable_item_added" in diff: - for path, value in diff["iterable_item_added"].items(): - output.append(f"Item Added at: {path}") - output.append(f" + new: {format_value(value)}") - output.append("--------------------") - - # Handle removed items from lists/sets - if "iterable_item_removed" in diff: - for path, value in diff["iterable_item_removed"].items(): - output.append(f"Item Removed at: {path}") - output.append(f" - old: {format_value(value)}") - output.append("--------------------") - - # Handle added keys in dictionaries - if "dictionary_item_added" in diff: - for path in diff["dictionary_item_added"]: - output.append(f"Dictionary Key Added: {path}") - output.append("--------------------") - - # Handle removed keys in dictionaries - if "dictionary_item_removed" in diff: - for path in diff["dictionary_item_removed"]: - output.append(f"Dictionary Key Removed: {path}") - output.append("--------------------") - - # Clean up the last separator for a tidy output - if output and output[-1] == "--------------------": - output.pop() - - return "\n".join(output) - - -def compare_json_files( + current = data + try: + for key in path: + current = current[key] + return current + except (KeyError, IndexError, TypeError): + return None + + +def _get_accessor(data: dict | list, accessor_string: str) -> any: + if accessor_string.startswith("root"): + accessor_string = accessor_string[4:] # Remove 'root' prefix + path = _parse_accessor(accessor_string) + return _get_path(data, path) + + +def _format_value_one_liner(value) -> str: + res = json.dumps(value) + if len(res) < ONE_LINER_LEN: + return res + if isinstance(value, dict): + keys_str = ", ".join(f'"{key}": ...' for key in value.keys()) + res = f"{{ {keys_str} }}" + elif isinstance(value, list): + res = f"[ ({len(value)} items) ]" + if len(res) < ONE_LINER_LEN: + return res + return res[:ONE_LINER_LEN] + f"... {len(res) - ONE_LINER_LEN} more chars" + + +def compare_files( file1_path: Path, file2_path: Path, ignore_fields: list[str] | None = None -) -> tuple[Status, DeepDiff | None]: +) -> DiffResult: """ Compares two JSON files, optionally ignoring specified fields. @@ -138,47 +188,79 @@ def compare_json_files( # Delete ignored fields from both JSON objects before comparison if ignore_fields: for field_accessor in ignore_fields: - path = parse_accessor(field_accessor) - delete_path(json1, path) - delete_path(json2, path) + path = _parse_accessor(field_accessor) + _delete_path(json1, path) + _delete_path(json2, path) diff = DeepDiff(json1, json2, ignore_order=True) - return ("BAD", diff) if diff else ("OK", None) + return ( + DiffResult("BAD", diff, json1, json2) + if diff + else DiffResult("OK", None, json1, json2) + ) -def process_directory_comparison( - old_dir: Path, new_dir: Path, ignore_fields: list[str] | None = None +def compare_and_report_files( + old_path: Path, + new_path: Path, + ignore_fields: list[str] | None = None, + truncate_items: int = 100, + verbose: bool = False, ) -> bool: - """ - Compares JSON files across two directories and prints results in a list format. - """ - results: dict[str, list[str]] = {"OK": [], "BAD": [], "MISS": [], "NEW": []} + result = compare_files(old_path, new_path, ignore_fields) + if result.status == "FILE_ERROR": + print("Error reading or parsing a file.", file=sys.stderr) + return 1 + + if result.status == "BAD" and result.diff: + print(f"Files {old_path.name} and {new_path.name} differ.", file=sys.stderr) + if verbose: + new_output = result.format(truncate_items) + print(new_output, file=sys.stderr) + return 1 + else: + print(f"Files '{old_path.name}' and '{new_path.name}' are identical.") + return 0 + + +def get_compare_file_list_bothdir( + old_dir: Path, new_dir: Path +) -> tuple[list[str], list[str], list[tuple[Path, Path]]]: old_files = {p.name for p in old_dir.glob("*.json")} new_files = {p.name for p in new_dir.glob("*.json")} - + compare_file = [] + miss_file = [] + new_file = [] for filename in sorted(old_files.intersection(new_files)): - status, _ = compare_json_files( - old_dir / filename, new_dir / filename, ignore_fields - ) - results["BAD" if status != "OK" else "OK"].append(filename) - + compare_file.append((old_dir / filename, new_dir / filename)) for filename in sorted(old_files - new_files): - results["MISS"].append(filename) - + miss_file.append(filename) for filename in sorted(new_files - old_files): - results["NEW"].append(filename) + new_file.append(filename) + return miss_file, new_file, compare_file - for filename in results["OK"]: - print(f"[OK ] {filename}") - for filename in results["NEW"]: - print(f"[NEW ] {filename}") - for filename in results["BAD"]: - print(f"[BAD ] {filename}", file=sys.stderr) - for filename in results["MISS"]: - print(f"[MISS] {filename}", file=sys.stderr) - return bool(results["BAD"] or results["MISS"]) +def get_compare_file_list(path1: Path, path2: Path) -> list[tuple[Path, Path]]: + if not path1.exists() or not path2.exists(): + raise ValueError( + f"Error: Path does not exist: {path1 if not path1.exists() else path2}" + ) + if path1.is_dir() and path2.is_dir(): + miss_files, new_files, compare_files = get_compare_file_list_bothdir( + path1, path2 + ) + for filename in miss_files: + print(f"[MISS] {filename}", file=sys.stderr) + for filename in new_files: + print(f"[NEW ] {filename}") + elif path1.is_file() and path2.is_file(): + compare_files = [(path1, path2)] + else: + raise ValueError( + "Error: Both arguments must be files or both must be directories." + ) + return compare_files def main(): @@ -200,62 +282,32 @@ def main(): "Also reads whitespace-separated values from $DIFFJSON_IGNORE. " "Example: -i \"['metadata']['timestamp']\"", ) + parser.add_argument( + "-t", + "--truncate_items", + type=int, + default=100, + help="Maximum number of items to output. If 0, no truncation. Default: 100", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose output for directory comparison.", + ) args = parser.parse_args() # --- Combine ignore fields from CLI and environment variable --- cli_ignore_fields = args.ignore env_ignore_str = os.environ.get("DIFFJSON_IGNORE", "") env_ignore_fields = env_ignore_str.split() if env_ignore_str else [] + ignore_fields = list(set(cli_ignore_fields + env_ignore_fields)) - # Combine both sources and remove duplicates - all_ignore_fields = list(set(cli_ignore_fields + env_ignore_fields)) - - path1, path2 = args.path1, args.path2 - - if not path1.exists() or not path2.exists(): - print( - f"Error: Path does not exist: {path1 if not path1.exists() else path2}", - file=sys.stderr, - ) - return 1 - - # --- Handle Directory Comparison --- - if path1.is_dir() and path2.is_dir(): - print(f"Comparing directories:\n- Old: {path1}\n- New: {path2}\n") - if process_directory_comparison(path1, path2, all_ignore_fields): - print("\nComparison finished with errors.", file=sys.stderr) - return 1 - else: - print("\nComparison finished successfully.") - return 0 - - # --- Handle Single File Comparison --- - elif path1.is_file() and path2.is_file(): - status, diff = compare_json_files(path1, path2, all_ignore_fields) - - if status == "FILE_ERROR": - print("Error reading or parsing a file.", file=sys.stderr) - return 1 - - if status == "BAD" and diff: - print( - f"Differences found between '{path1.name}' and '{path2.name}':\n", - file=sys.stderr, - ) - custom_output = format_diff_custom(diff) - print(custom_output, file=sys.stderr) - return 1 - else: - print(f"Files '{path1.name}' and '{path2.name}' are identical.") - return 0 - - # --- Handle Invalid Input --- - else: - print( - "Error: Both arguments must be files or both must be directories.", - file=sys.stderr, + compare_files = get_compare_file_list(args.path1, args.path2) + for file1, file2 in compare_files: + compare_and_report_files( + file1, file2, ignore_fields, args.truncate_items, args.verbose ) - return 1 if __name__ == "__main__": From 8eef8cf6a450674a335abe8f5fc9f9a954c3bb3b Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 04:06:28 +0800 Subject: [PATCH 2/8] fix: mypy for diffjson --- script/diffjson.py | 62 ++++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index eb85c4c..899903a 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -6,7 +6,7 @@ import re import sys from pathlib import Path -from typing import Literal +from typing import Any, Literal, Sequence import IPython from deepdiff import DeepDiff @@ -20,8 +20,8 @@ class DiffResult: status: Status diff: DeepDiff | None - json1: any - json2: any + json1: Any + json2: Any def format(self, truncate_items: int) -> str: output = [] @@ -35,29 +35,29 @@ def add_item(text: str): items_count += 1 # Handle new items (dictionary_item_added and iterable_item_added) - if "dictionary_item_added" in self.diff: + if self.diff is not None and "dictionary_item_added" in self.diff: for path in self.diff["dictionary_item_added"]: one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) add_item(f"New item: {path}\n Add: {one_liner}") - if "iterable_item_added" in self.diff: + if self.diff is not None and "iterable_item_added" in self.diff: for path, value in self.diff["iterable_item_added"].items(): one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) add_item(f"New item: {path}\n Add: {one_liner}") # Handle removed items (dictionary_item_removed and iterable_item_removed) - if "dictionary_item_removed" in self.diff: + if self.diff is not None and "dictionary_item_removed" in self.diff: for path, value in self.diff["dictionary_item_removed"]: one_liner = _format_value_one_liner(value) add_item(f"Removed item: {path}\n Remove: {one_liner}") - if "iterable_item_removed" in self.diff: + if self.diff is not None and "iterable_item_removed" in self.diff: for path, value in self.diff["iterable_item_removed"].items(): one_liner = _format_value_one_liner(value) add_item(f"Removed item: {path}\n Remove: {one_liner}") # Handle changed values - if "values_changed" in self.diff: + if self.diff is not None and "values_changed" in self.diff: for path, changes in self.diff["values_changed"].items(): old_one_liner = _format_value_one_liner(changes["old_value"]) new_one_liner = _format_value_one_liner(changes["new_value"]) @@ -66,7 +66,11 @@ def add_item(text: str): ) # Handle items moved (position changes in lists) - if "values_changed" not in self.diff and "iterable_item_moved" in self.diff: + if ( + self.diff is not None + and "values_changed" not in self.diff + and "iterable_item_moved" in self.diff + ): for path, changes in self.diff["iterable_item_moved"].items(): add_item(f"Moved item: {path}\n Position changed in list") @@ -89,7 +93,7 @@ def _parse_accessor(accessor_string: str) -> list[str | int]: """ # Regex to find content within brackets, e.g., ['key'] or [0] parts = re.findall(r"\[([^\]]+)\]", accessor_string) - keys = [] + keys: list[str | int] = [] for part in parts: try: # Try to convert to an integer for list indices @@ -100,7 +104,7 @@ def _parse_accessor(accessor_string: str) -> list[str | int]: return keys -def _delete_path(data: dict | list, path: list[str | int]): +def _delete_path(data: dict | list, path: list[str | int]) -> None: """ Deletes a value from a nested dictionary or list based on a path. This function modifies the data in place. If the path is invalid @@ -110,13 +114,18 @@ def _delete_path(data: dict | list, path: list[str | int]): return # Traverse to the parent of the target element to delete it - parent = data + parent: Any = data key_to_delete = path[-1] path_to_parent = path[:-1] try: for key in path_to_parent: - parent = parent[key] + if isinstance(parent, dict) and isinstance(key, str): + parent = parent[key] + elif isinstance(parent, list) and isinstance(key, int): + parent = parent[key] + else: + raise TypeError("Invalid path traversal") # Check if the final key/index exists in the parent before deleting if isinstance(parent, dict) and key_to_delete in parent: @@ -132,28 +141,33 @@ def _delete_path(data: dict | list, path: list[str | int]): pass -def _get_path(data: dict | list, path: list[str | int]) -> any: +def _get_path(data: dict | list, path: list[str | int]) -> Any: """ Retrieves a value from a nested dictionary or list based on a path. Returns None if the path is invalid or doesn't exist. """ - current = data + current: Any = data try: for key in path: - current = current[key] + if isinstance(current, dict) and isinstance(key, str): + current = current[key] + elif isinstance(current, list) and isinstance(key, int): + current = current[key] + else: + raise TypeError("Invalid path traversal") return current except (KeyError, IndexError, TypeError): return None -def _get_accessor(data: dict | list, accessor_string: str) -> any: +def _get_accessor(data: dict | list, accessor_string: str) -> Any: if accessor_string.startswith("root"): accessor_string = accessor_string[4:] # Remove 'root' prefix path = _parse_accessor(accessor_string) return _get_path(data, path) -def _format_value_one_liner(value) -> str: +def _format_value_one_liner(value: Any) -> str: res = json.dumps(value) if len(res) < ONE_LINER_LEN: return res @@ -183,7 +197,7 @@ def compare_files( with open(file2_path, "r", encoding="utf-8") as f2: json2 = json.load(f2) except (FileNotFoundError, json.JSONDecodeError): - return "FILE_ERROR", None + return DiffResult("FILE_ERROR", None, {}, {}) # Delete ignored fields from both JSON objects before comparison if ignore_fields: @@ -207,7 +221,7 @@ def compare_and_report_files( ignore_fields: list[str] | None = None, truncate_items: int = 100, verbose: bool = False, -) -> bool: +) -> int: result = compare_files(old_path, new_path, ignore_fields) if result.status == "FILE_ERROR": print("Error reading or parsing a file.", file=sys.stderr) @@ -263,7 +277,7 @@ def get_compare_file_list(path1: Path, path2: Path) -> list[tuple[Path, Path]]: return compare_files -def main(): +def main() -> int: parser = argparse.ArgumentParser( description="Compare two JSON files or two directories of JSON files." ) @@ -304,10 +318,14 @@ def main(): ignore_fields = list(set(cli_ignore_fields + env_ignore_fields)) compare_files = get_compare_file_list(args.path1, args.path2) + exit_code = 0 for file1, file2 in compare_files: - compare_and_report_files( + result = compare_and_report_files( file1, file2, ignore_fields, args.truncate_items, args.verbose ) + if result != 0: + exit_code = result + return exit_code if __name__ == "__main__": From 53228f35ffc320b6a9b13eb7a496d6f8dd0219ce Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 04:12:28 +0800 Subject: [PATCH 3/8] fix: colorize diffjson --- script/diffjson.py | 86 ++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index 899903a..9d5fe49 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -38,32 +38,32 @@ def add_item(text: str): if self.diff is not None and "dictionary_item_added" in self.diff: for path in self.diff["dictionary_item_added"]: one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) - add_item(f"New item: {path}\n Add: {one_liner}") + add_item(f"{path}\n{format_color('green', '+ ' + one_liner)}") if self.diff is not None and "iterable_item_added" in self.diff: for path, value in self.diff["iterable_item_added"].items(): one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) - add_item(f"New item: {path}\n Add: {one_liner}") + add_item(f"{path}\n{format_color('green', '+ ' + one_liner)}") # Handle removed items (dictionary_item_removed and iterable_item_removed) if self.diff is not None and "dictionary_item_removed" in self.diff: for path, value in self.diff["dictionary_item_removed"]: one_liner = _format_value_one_liner(value) - add_item(f"Removed item: {path}\n Remove: {one_liner}") + add_item(f"{path}\n{format_color('red', '- ' + one_liner)}") if self.diff is not None and "iterable_item_removed" in self.diff: for path, value in self.diff["iterable_item_removed"].items(): one_liner = _format_value_one_liner(value) - add_item(f"Removed item: {path}\n Remove: {one_liner}") + add_item(f"{path}\n{format_color('red', '- ' + one_liner)}") # Handle changed values if self.diff is not None and "values_changed" in self.diff: for path, changes in self.diff["values_changed"].items(): old_one_liner = _format_value_one_liner(changes["old_value"]) new_one_liner = _format_value_one_liner(changes["new_value"]) - add_item( - f"Changed item: {path}\n Old: {old_one_liner}\n New: {new_one_liner}" - ) + oldl = format_color("red", "- " + old_one_liner) + newl = format_color("green", "+ " + new_one_liner) + add_item(f"{path}\n{oldl}\n{newl}") # Handle items moved (position changes in lists) if ( @@ -72,7 +72,8 @@ def add_item(text: str): and "iterable_item_moved" in self.diff ): for path, changes in self.diff["iterable_item_moved"].items(): - add_item(f"Moved item: {path}\n Position changed in list") + movel = format_color("yellow", "~ Position changed in list") + add_item(f"{path}\n{movel}") # Add truncation notice if needed if truncate_items > 0 and items_count > truncate_items: @@ -178,7 +179,34 @@ def _format_value_one_liner(value: Any) -> str: res = f"[ ({len(value)} items) ]" if len(res) < ONE_LINER_LEN: return res - return res[:ONE_LINER_LEN] + f"... {len(res) - ONE_LINER_LEN} more chars" + return res[:ONE_LINER_LEN] + f"...({len(res) - ONE_LINER_LEN} more chars)" + + +_color_codes = {} +_reset_code = "" + + +def init_colors(): + global _color_codes, _reset_code + _color_codes = { + "red": "\033[31m", + "green": "\033[32m", + "yellow": "\033[33m", + } + _reset_code = "\033[0m" + + +def format_color(color: str, text: str) -> str: + code = _color_codes.get(color.lower()) + if code is None: + return text + return code + text + _reset_code + + +def print_color(color: str, *args, **kwargs): + sep = kwargs.get("sep", " ") + s = format_color(color, sep.join(str(arg) for arg in args)) + print(s, **{k: v for k, v in kwargs.items() if k not in ("sep")}) def compare_files( @@ -208,11 +236,7 @@ def compare_files( diff = DeepDiff(json1, json2, ignore_order=True) - return ( - DiffResult("BAD", diff, json1, json2) - if diff - else DiffResult("OK", None, json1, json2) - ) + return DiffResult("BAD", diff, json1, json2) if diff else DiffResult("OK", None, json1, json2) def compare_and_report_files( @@ -224,17 +248,20 @@ def compare_and_report_files( ) -> int: result = compare_files(old_path, new_path, ignore_fields) if result.status == "FILE_ERROR": - print("Error reading or parsing a file.", file=sys.stderr) + print_color( + "red", f"❌ [ERROR] reading or parsing {old_path} or {new_path}.", file=sys.stderr + ) return 1 if result.status == "BAD" and result.diff: - print(f"Files {old_path.name} and {new_path.name} differ.", file=sys.stderr) + print_color("red", f"❌ [DIFF] {str(old_path):<40} <-> {new_path}", file=sys.stderr) if verbose: new_output = result.format(truncate_items) + new_output = "\n ".join([""] + new_output.splitlines()) print(new_output, file=sys.stderr) return 1 else: - print(f"Files '{old_path.name}' and '{new_path.name}' are identical.") + print_color("green", f"✅ [IDENTICAL] {str(old_path):<40} <-> {new_path}") return 0 @@ -257,23 +284,17 @@ def get_compare_file_list_bothdir( def get_compare_file_list(path1: Path, path2: Path) -> list[tuple[Path, Path]]: if not path1.exists() or not path2.exists(): - raise ValueError( - f"Error: Path does not exist: {path1 if not path1.exists() else path2}" - ) + raise ValueError(f"Error: Path does not exist: {path1 if not path1.exists() else path2}") if path1.is_dir() and path2.is_dir(): - miss_files, new_files, compare_files = get_compare_file_list_bothdir( - path1, path2 - ) + miss_files, new_files, compare_files = get_compare_file_list_bothdir(path1, path2) for filename in miss_files: - print(f"[MISS] {filename}", file=sys.stderr) + print_color("red", f"❌ [MISS] {filename}", file=sys.stderr) for filename in new_files: - print(f"[NEW ] {filename}") + print_color("red", f"❌ [NEW ] {filename}") elif path1.is_file() and path2.is_file(): compare_files = [(path1, path2)] else: - raise ValueError( - "Error: Both arguments must be files or both must be directories." - ) + raise ValueError("Error: Both arguments must be files or both must be directories.") return compare_files @@ -281,12 +302,8 @@ def main() -> int: parser = argparse.ArgumentParser( description="Compare two JSON files or two directories of JSON files." ) - parser.add_argument( - "path1", type=Path, help="Path to the first file or 'old' directory." - ) - parser.add_argument( - "path2", type=Path, help="Path to the second file or 'new' directory." - ) + parser.add_argument("path1", type=Path, help="Path to the first file or 'old' directory.") + parser.add_argument("path2", type=Path, help="Path to the second file or 'new' directory.") parser.add_argument( "-i", "--ignore", @@ -317,6 +334,7 @@ def main() -> int: env_ignore_fields = env_ignore_str.split() if env_ignore_str else [] ignore_fields = list(set(cli_ignore_fields + env_ignore_fields)) + init_colors() compare_files = get_compare_file_list(args.path1, args.path2) exit_code = 0 for file1, file2 in compare_files: From 7557c58fedc2376e6f52aaecce750fe12cde4606 Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 04:16:13 +0800 Subject: [PATCH 4/8] fix: regression flags, wrong dep --- .github/workflows/regression.yml | 2 +- script/diffjson.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 9d80631..700455e 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -108,4 +108,4 @@ jobs: retention-days: 3 - name: Compare outputs and check for regression - run: ./pr_repo/script/diffjson.py out_old out_new $COMPARE_IGNORE + run: ./pr_repo/script/diffjson.py out_old out_new -v diff --git a/script/diffjson.py b/script/diffjson.py index 9d5fe49..3423be4 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -8,7 +8,6 @@ from pathlib import Path from typing import Any, Literal, Sequence -import IPython from deepdiff import DeepDiff ONE_LINER_LEN = 100 From df36cf7d61a02daa16457ea780b8eeb729556350 Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 05:23:20 +0800 Subject: [PATCH 5/8] ci: better format --- script/diffjson.py | 169 ++++++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 56 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index 3423be4..8fe9e6b 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -3,6 +3,7 @@ from dataclasses import dataclass import json import os +import pprint import re import sys from pathlib import Path @@ -10,9 +11,10 @@ from deepdiff import DeepDiff -ONE_LINER_LEN = 100 +oneliner_LEN = 100 Status = Literal["OK", "BAD", "FILE_ERROR"] +path_ty = list[str | int] @dataclass @@ -23,46 +25,38 @@ class DiffResult: json2: Any def format(self, truncate_items: int) -> str: + return self.format_nested(truncate_items) + + def collect_flat_raw(self, truncate_items: int) -> list[tuple[path_ty, Any]]: output = [] - items_count = 0 - def add_item(text: str): - nonlocal items_count - if truncate_items == 0 or items_count < truncate_items: - output.append(text) - output.append("--------------------") - items_count += 1 + def add_item(accessor: str, value: Any) -> None: + if truncate_items == 0 or len(output) < truncate_items: + output.append((_parse_accessor(accessor), value)) # Handle new items (dictionary_item_added and iterable_item_added) if self.diff is not None and "dictionary_item_added" in self.diff: - for path in self.diff["dictionary_item_added"]: - one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) - add_item(f"{path}\n{format_color('green', '+ ' + one_liner)}") + for accessor in self.diff["dictionary_item_added"]: + add_item(accessor, ("new", _get_accessor(self.json2, accessor))) if self.diff is not None and "iterable_item_added" in self.diff: - for path, value in self.diff["iterable_item_added"].items(): - one_liner = _format_value_one_liner(_get_accessor(self.json2, path)) - add_item(f"{path}\n{format_color('green', '+ ' + one_liner)}") + for accessor, value in self.diff["iterable_item_added"].items(): + add_item(accessor, ("new", value)) # Handle removed items (dictionary_item_removed and iterable_item_removed) if self.diff is not None and "dictionary_item_removed" in self.diff: - for path, value in self.diff["dictionary_item_removed"]: - one_liner = _format_value_one_liner(value) - add_item(f"{path}\n{format_color('red', '- ' + one_liner)}") + for accessor, value in self.diff["dictionary_item_removed"]: + add_item(accessor, ("removed", _get_accessor(self.json1, accessor))) if self.diff is not None and "iterable_item_removed" in self.diff: - for path, value in self.diff["iterable_item_removed"].items(): - one_liner = _format_value_one_liner(value) - add_item(f"{path}\n{format_color('red', '- ' + one_liner)}") + for accessor, value in self.diff["iterable_item_removed"].items(): + add_item(accessor, ("removed", value)) # Handle changed values if self.diff is not None and "values_changed" in self.diff: - for path, changes in self.diff["values_changed"].items(): - old_one_liner = _format_value_one_liner(changes["old_value"]) - new_one_liner = _format_value_one_liner(changes["new_value"]) - oldl = format_color("red", "- " + old_one_liner) - newl = format_color("green", "+ " + new_one_liner) - add_item(f"{path}\n{oldl}\n{newl}") + for accessor, changes in self.diff["values_changed"].items(): + add_item(accessor, ("removed", changes["old_value"])) + add_item(accessor, ("new", changes["new_value"])) # Handle items moved (position changes in lists) if ( @@ -70,30 +64,63 @@ def add_item(text: str): and "values_changed" not in self.diff and "iterable_item_moved" in self.diff ): - for path, changes in self.diff["iterable_item_moved"].items(): - movel = format_color("yellow", "~ Position changed in list") - add_item(f"{path}\n{movel}") + for accessor, changes in self.diff["iterable_item_moved"].items(): + add_item(accessor, ("moved", "Moved location in list.")) # Add truncation notice if needed - if truncate_items > 0 and items_count > truncate_items: - remaining = items_count - truncate_items - output.append(f"...({remaining} more items)") - - # Clean up the last separator for a tidy output - if output and output[-1] == "--------------------": - output.pop() - - return "\n".join(output) - - -def _parse_accessor(accessor_string: str) -> list[str | int]: + if truncate_items > 0 and len(output) > truncate_items: + remaining = len(output) - truncate_items + add_item([], ("info", f"...({remaining} more items)")) + + pprint.pprint(output) + return output + + def make_nested_oneliner(flat: list[tuple[path_ty, Any]]): + output = {} + for path, value in flat: + color, msg = value[0], value[1] + _set_with_ensure_strpath( + output, [str(p) for p in path], (color, _format_value_oneliner(msg)) + ) + return output + + def format_nested(self, truncate_items: int) -> str: + flat = self.collect_flat_raw(truncate_items) + nested = DiffResult.make_nested_oneliner(flat) + INDENT = " " + + def _dump(obj: dict, indent: int = 0, path: str = "") -> str: + if isinstance(obj, tuple): + color, msg = obj[0], obj[1] + color_map = { + "new": "green", + "removed": "red", + "moved": "yellow", + "info": "none", + } + color = color_map[color] + leader = {"green": "+ ", "red": "- "}.get(color, " ") + leader += INDENT * indent + l1 = format_color(color, leader + str(msg)) + l2 = format_color(color, leader + f"# {path=}") + return l1 + "\n" + l2 + output = "" + for key, value in obj.items(): + output += " " + INDENT * indent + f"{key}\n" + output += _dump(value, indent + 1, path + f"[{key}]") + "\n" + return output.rstrip() + + return _dump(nested) + + +def _parse_accessor(accessor_string: str) -> path_ty: """ Parses a field accessor string like "['key'][0]" into a list ['key', 0]. This allows for programmatic access to nested JSON elements. """ # Regex to find content within brackets, e.g., ['key'] or [0] parts = re.findall(r"\[([^\]]+)\]", accessor_string) - keys: list[str | int] = [] + keys: path_ty = [] for part in parts: try: # Try to convert to an integer for list indices @@ -104,7 +131,7 @@ def _parse_accessor(accessor_string: str) -> list[str | int]: return keys -def _delete_path(data: dict | list, path: list[str | int]) -> None: +def _delete_path(data: dict | list, path: path_ty) -> None: """ Deletes a value from a nested dictionary or list based on a path. This function modifies the data in place. If the path is invalid @@ -141,7 +168,7 @@ def _delete_path(data: dict | list, path: list[str | int]) -> None: pass -def _get_path(data: dict | list, path: list[str | int]) -> Any: +def _get_path(data: dict | list, path: path_ty) -> Any: """ Retrieves a value from a nested dictionary or list based on a path. Returns None if the path is invalid or doesn't exist. @@ -167,18 +194,30 @@ def _get_accessor(data: dict | list, accessor_string: str) -> Any: return _get_path(data, path) -def _format_value_one_liner(value: Any) -> str: +def _set_with_ensure_strpath(data: dict, str_path: list[str], value: Any) -> bool: + try: + current = data + for key in str_path[:-1]: + current = current.setdefault(key, {}) + final_key = str_path[-1] + current[final_key] = value + return True + except (KeyError, IndexError, TypeError): + return False + + +def _format_value_oneliner(value: Any) -> str: res = json.dumps(value) - if len(res) < ONE_LINER_LEN: + if len(res) < oneliner_LEN: return res if isinstance(value, dict): keys_str = ", ".join(f'"{key}": ...' for key in value.keys()) res = f"{{ {keys_str} }}" elif isinstance(value, list): res = f"[ ({len(value)} items) ]" - if len(res) < ONE_LINER_LEN: + if len(res) < oneliner_LEN: return res - return res[:ONE_LINER_LEN] + f"...({len(res) - ONE_LINER_LEN} more chars)" + return res[:oneliner_LEN] + f"...({len(res) - oneliner_LEN} more chars)" _color_codes = {} @@ -235,7 +274,11 @@ def compare_files( diff = DeepDiff(json1, json2, ignore_order=True) - return DiffResult("BAD", diff, json1, json2) if diff else DiffResult("OK", None, json1, json2) + return ( + DiffResult("BAD", diff, json1, json2) + if diff + else DiffResult("OK", None, json1, json2) + ) def compare_and_report_files( @@ -248,12 +291,16 @@ def compare_and_report_files( result = compare_files(old_path, new_path, ignore_fields) if result.status == "FILE_ERROR": print_color( - "red", f"❌ [ERROR] reading or parsing {old_path} or {new_path}.", file=sys.stderr + "red", + f"❌ [ERROR] reading or parsing {old_path} or {new_path}.", + file=sys.stderr, ) return 1 if result.status == "BAD" and result.diff: - print_color("red", f"❌ [DIFF] {str(old_path):<40} <-> {new_path}", file=sys.stderr) + print_color( + "red", f"❌ [DIFF] {str(old_path):<40} <-> {new_path}", file=sys.stderr + ) if verbose: new_output = result.format(truncate_items) new_output = "\n ".join([""] + new_output.splitlines()) @@ -283,9 +330,13 @@ def get_compare_file_list_bothdir( def get_compare_file_list(path1: Path, path2: Path) -> list[tuple[Path, Path]]: if not path1.exists() or not path2.exists(): - raise ValueError(f"Error: Path does not exist: {path1 if not path1.exists() else path2}") + raise ValueError( + f"Error: Path does not exist: {path1 if not path1.exists() else path2}" + ) if path1.is_dir() and path2.is_dir(): - miss_files, new_files, compare_files = get_compare_file_list_bothdir(path1, path2) + miss_files, new_files, compare_files = get_compare_file_list_bothdir( + path1, path2 + ) for filename in miss_files: print_color("red", f"❌ [MISS] {filename}", file=sys.stderr) for filename in new_files: @@ -293,7 +344,9 @@ def get_compare_file_list(path1: Path, path2: Path) -> list[tuple[Path, Path]]: elif path1.is_file() and path2.is_file(): compare_files = [(path1, path2)] else: - raise ValueError("Error: Both arguments must be files or both must be directories.") + raise ValueError( + "Error: Both arguments must be files or both must be directories." + ) return compare_files @@ -301,8 +354,12 @@ def main() -> int: parser = argparse.ArgumentParser( description="Compare two JSON files or two directories of JSON files." ) - parser.add_argument("path1", type=Path, help="Path to the first file or 'old' directory.") - parser.add_argument("path2", type=Path, help="Path to the second file or 'new' directory.") + parser.add_argument( + "path1", type=Path, help="Path to the first file or 'old' directory." + ) + parser.add_argument( + "path2", type=Path, help="Path to the second file or 'new' directory." + ) parser.add_argument( "-i", "--ignore", From ab57db3cac3531d6be2cf745b842b328ea01bcf6 Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 05:35:44 +0800 Subject: [PATCH 6/8] ci: collapse regression info --- script/diffjson.py | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index 8fe9e6b..5b9e53b 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -3,7 +3,6 @@ from dataclasses import dataclass import json import os -import pprint import re import sys from pathlib import Path @@ -27,12 +26,11 @@ class DiffResult: def format(self, truncate_items: int) -> str: return self.format_nested(truncate_items) - def collect_flat_raw(self, truncate_items: int) -> list[tuple[path_ty, Any]]: - output = [] + def collect_flat_raw(self, truncate_items: int) -> tuple[list[tuple[path_ty, Any]], str]: + output: list[tuple[path_ty, Any]] = [] def add_item(accessor: str, value: Any) -> None: - if truncate_items == 0 or len(output) < truncate_items: - output.append((_parse_accessor(accessor), value)) + output.append((_parse_accessor(accessor), value)) # Handle new items (dictionary_item_added and iterable_item_added) if self.diff is not None and "dictionary_item_added" in self.diff: @@ -69,14 +67,15 @@ def add_item(accessor: str, value: Any) -> None: # Add truncation notice if needed if truncate_items > 0 and len(output) > truncate_items: - remaining = len(output) - truncate_items - add_item([], ("info", f"...({remaining} more items)")) - - pprint.pprint(output) - return output - - def make_nested_oneliner(flat: list[tuple[path_ty, Any]]): - output = {} + remaining_msg = f"...({len(output) - truncate_items} more items)" + output = output[:truncate_items] + else: + remaining_msg = "" + return output, remaining_msg + + @staticmethod + def make_nested_oneliner(flat: list[tuple[path_ty, Any]]) -> dict: + output: dict = {} for path, value in flat: color, msg = value[0], value[1] _set_with_ensure_strpath( @@ -85,10 +84,9 @@ def make_nested_oneliner(flat: list[tuple[path_ty, Any]]): return output def format_nested(self, truncate_items: int) -> str: - flat = self.collect_flat_raw(truncate_items) + flat, remaining_msg = self.collect_flat_raw(truncate_items) nested = DiffResult.make_nested_oneliner(flat) INDENT = " " - def _dump(obj: dict, indent: int = 0, path: str = "") -> str: if isinstance(obj, tuple): color, msg = obj[0], obj[1] @@ -106,11 +104,16 @@ def _dump(obj: dict, indent: int = 0, path: str = "") -> str: return l1 + "\n" + l2 output = "" for key, value in obj.items(): - output += " " + INDENT * indent + f"{key}\n" - output += _dump(value, indent + 1, path + f"[{key}]") + "\n" + output += " " + INDENT * indent + f"[{key}]" + v = value + while isinstance(v, dict) and len(v) == 1: + k, v = next(iter(v.items())) + output += f"[{k}]" + output += "\n" + output += _dump(v, indent + 1, path + f"[{key}]") + "\n" return output.rstrip() - return _dump(nested) + return _dump(nested) + "\n" + remaining_msg def _parse_accessor(accessor_string: str) -> path_ty: @@ -303,7 +306,7 @@ def compare_and_report_files( ) if verbose: new_output = result.format(truncate_items) - new_output = "\n ".join([""] + new_output.splitlines()) + new_output = "\n[details] ".join([""] + new_output.splitlines() + [""]) print(new_output, file=sys.stderr) return 1 else: From b31a218990828a0507f61c3473bc752e8e9db95c Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Wed, 3 Dec 2025 06:06:12 +0800 Subject: [PATCH 7/8] ci: improve regression info --- script/diffjson.py | 73 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/script/diffjson.py b/script/diffjson.py index 5b9e53b..2cddc80 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -23,10 +23,34 @@ class DiffResult: json1: Any json2: Any + color_map = { + "new": "green", + "removed": "red", + "moved": "yellow", + "info": "none", + } + def format(self, truncate_items: int) -> str: return self.format_nested(truncate_items) - def collect_flat_raw(self, truncate_items: int) -> tuple[list[tuple[path_ty, Any]], str]: + def format_flat(self, truncate_items: int) -> str: + flat, remaining_msg = self.collect_flat_raw(truncate_items) + output_lines = [] + for path, value in flat: + color, msg = value[0], value[1] + color = DiffResult.color_map[color] + leader = {"green": "+ ", "red": "- "}.get(color, " ") + line = format_color( + color, leader + f"{''.join(f'[{repr(p)}]' for p in path)}: {msg}" + ) + output_lines.append(line) + if remaining_msg: + output_lines.append(remaining_msg) + return "\n".join(output_lines) + + def collect_flat_raw( + self, truncate_items: int + ) -> tuple[list[tuple[path_ty, Any]], str]: output: list[tuple[path_ty, Any]] = [] def add_item(accessor: str, value: Any) -> None: @@ -87,29 +111,40 @@ def format_nested(self, truncate_items: int) -> str: flat, remaining_msg = self.collect_flat_raw(truncate_items) nested = DiffResult.make_nested_oneliner(flat) INDENT = " " - def _dump(obj: dict, indent: int = 0, path: str = "") -> str: - if isinstance(obj, tuple): - color, msg = obj[0], obj[1] - color_map = { - "new": "green", - "removed": "red", - "moved": "yellow", - "info": "none", - } - color = color_map[color] + + def isleaf(obj: Any) -> bool: + return isinstance(obj, list) + + def _dump_leaf(obj: list, indent: int, path: str) -> str: + output = "" + for index, subobj in enumerate(obj): + color, msg = subobj[0], subobj[1] + color = DiffResult.color_map[color] leader = {"green": "+ ", "red": "- "}.get(color, " ") leader += INDENT * indent l1 = format_color(color, leader + str(msg)) - l2 = format_color(color, leader + f"# {path=}") - return l1 + "\n" + l2 + # l2 = format_color(color, leader + f"# {path=}") + output += l1 # + "\n" + l2 + if index != len(obj) - 1: + output += "\n" + return output + + def _dump(obj: dict, indent: int = 0, path: str = "") -> str: + if isinstance(obj, list): + return _dump_leaf(obj, indent, path) output = "" for key, value in obj.items(): - output += " " + INDENT * indent + f"[{key}]" + kline = " " + INDENT * indent + f"[{key}]" v = value - while isinstance(v, dict) and len(v) == 1: + while not isleaf(v): + if len(v) > 1: + break k, v = next(iter(v.items())) - output += f"[{k}]" - output += "\n" + kline += f"[{k}]" + if len(v) == 1: # parent of only one leaf, colorize it same like leaf + color = v[0][0] + kline = format_color(DiffResult.color_map[color], kline) + output += kline + "\n" output += _dump(v, indent + 1, path + f"[{key}]") + "\n" return output.rstrip() @@ -203,7 +238,9 @@ def _set_with_ensure_strpath(data: dict, str_path: list[str], value: Any) -> boo for key in str_path[:-1]: current = current.setdefault(key, {}) final_key = str_path[-1] - current[final_key] = value + if final_key not in current: + current[final_key] = [] + current[final_key].append(value) return True except (KeyError, IndexError, TypeError): return False From 33021ba056285e89039af9762d91a44321be767f Mon Sep 17 00:00:00 2001 From: Hoblovski Date: Tue, 30 Dec 2025 04:20:32 +0800 Subject: [PATCH 8/8] fix: diffjson.py --- script/diffjson.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/diffjson.py b/script/diffjson.py index 2cddc80..8ac7031 100755 --- a/script/diffjson.py +++ b/script/diffjson.py @@ -67,7 +67,7 @@ def add_item(accessor: str, value: Any) -> None: # Handle removed items (dictionary_item_removed and iterable_item_removed) if self.diff is not None and "dictionary_item_removed" in self.diff: - for accessor, value in self.diff["dictionary_item_removed"]: + for accessor in self.diff["dictionary_item_removed"]: add_item(accessor, ("removed", _get_accessor(self.json1, accessor))) if self.diff is not None and "iterable_item_removed" in self.diff: