Optimize notes parsers code and remove duplications

This commit is contained in:
t0xa 2025-08-31 14:49:15 +03:00
parent 8faf0dc233
commit 82ef68fb6a
3 changed files with 199 additions and 242 deletions

View file

@ -1,143 +1,72 @@
import os
import re
from typing import List, Tuple
from datetime import datetime
from obsidian.py_models import Approach, Exercise, Training
from obsidian.py_models import Training
from apple.mapper import unique_apple_exercises_mapper
current_directory = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT_DIR = os.getcwd()
from parsers.base_parser import BaseNotesParser
def get_current_path():
return os.path.join(PROJECT_ROOT_DIR, "data")
class AppleNotesParser(BaseNotesParser):
"""Parser for Apple Notes format training data."""
def __init__(self):
super().__init__("apple.md")
def get_obsidian_examples_file(example_file_name: str):
return os.path.join(get_current_path(), f"{example_file_name}")
def read_example_file(example_file_name: str):
path_to_example: str = get_obsidian_examples_file(example_file_name)
with open(path_to_example, "r") as f:
content = f.read()
return content
def filter_training_data(training_data: str):
cleaned_text = re.sub(
r"^\|(\s+|-*|\s-*\s)\|(\s+|-*|\s-*\s)\|(\s+|-*|\s-*\s)\|$",
"",
training_data,
flags=re.MULTILINE,
)
cleaned_text = re.sub(r"^\n", "", cleaned_text, flags=re.MULTILINE)
lines = cleaned_text.splitlines()
redundant_lines = [
"| | | |",
"|---|---|---|",
"|**Упражнение**|**Вес**|**Подходы**|",
]
filtered_lines = [line for line in lines if line not in redundant_lines]
return "\n".join(filtered_lines)
def parse_training_header(
training_data_line: str,
) -> Tuple[bool, str, str, str]:
pattern: str = (
r"^\*\*(?P<date>\d+.\d+.\d+)\s\((?P<trainer>.+)(-(?P<year_counter>.+))?\)\*\*"
)
match = re.search(pattern, training_data_line)
if match:
date = match.group("date").strip()
trainer = match.group("trainer").strip()
if match.group("year_counter"):
year_count = match.group("year_counter").strip()
else:
year_count = 0
return True, date, trainer, year_count
return False, "", "", ""
def serialize_exercise(reps: str, weight: str, name: str) -> Exercise:
# Split reps into array of int's
reps_list: List[int] = [int(rep) for rep in reps.split("-")]
weight_splitted: bool = False
weight_list: List[float] = []
if weight:
weight_str_list: List[str] = [weight for weight in weight.split("-")]
# Check if weight is splitted
if any(split_anchor in weight_str_list[0] for split_anchor in ["x", "х"]):
weight_splitted = True
splitter = "x" if "x" in weight_str_list[0] else "х"
weight_list = [float(xweight.split(splitter)[0]) for xweight in weight_str_list]
else:
weight_list = [float(w) for w in weight_str_list]
approaches = []
if not weight:
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=0.0, reps=reps_list[rep_index])
approaches.append(approach)
else:
weight_pointer = 0
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=weight_list[weight_pointer], reps=reps_list[rep_index])
if rep_index < len(weight_list) - 1:
weight_pointer += 1
approaches.append(approach)
exercise = Exercise(
name=name, approaches=approaches, splitted_weight=weight_splitted
)
return exercise
def parse_training_exercises(exercise_line: str) -> Exercise:
stripped: List[str] = [entry.strip() for entry in exercise_line.split("|")][1:-1]
for entry in stripped:
if entry in ["Упражнение", "Вес", "Подходы"]:
raise ValueError
if stripped:
if "---" in stripped[0]:
raise ValueError
if len(stripped) != 3:
raise ValueError
return serialize_exercise(
name=stripped[0], weight=stripped[1], reps=stripped[2]
def filter_training_data(self, training_data: str) -> str:
"""Filter Apple-specific training data format."""
cleaned_text = re.sub(
r"^\|(\s+|-*|\s-*\s)\|(\s+|-*|\s-*\s)\|(\s+|-*|\s-*\s)\|$",
"",
training_data,
flags=re.MULTILINE,
)
raise ValueError("No valid exercise data found")
cleaned_text = re.sub(r"^\n", "", cleaned_text, flags=re.MULTILINE)
lines = cleaned_text.splitlines()
redundant_lines = [
"| | | |",
"|---|---|---|",
"|**Упражнение**|**Вес**|**Подходы**|",
]
filtered_lines = [line for line in lines if line not in redundant_lines]
return "\n".join(filtered_lines)
def parse_training_header(self, training_data_line: str) -> Tuple[bool, str, str, str]:
"""Parse Apple Notes training header format."""
pattern: str = (
r"^\*\*(?P<date>\d+.\d+.\d+)\s\((?P<trainer>.+)(-(?P<year_counter>.+))?\)\*\*"
)
match = re.search(pattern, training_data_line)
if match:
date = match.group("date").strip()
trainer = match.group("trainer").strip()
if match.group("year_counter"):
year_count = match.group("year_counter").strip()
else:
year_count = "0"
return True, date, trainer, year_count
return False, "", "", ""
def parse_training_data():
training_data: str = filter_training_data(read_example_file("apple.md"))
lines = training_data.splitlines()
current_training = None
trains = []
for index, line in enumerate(lines):
header_parsed, date, trainer, year_count = parse_training_header(line)
if index == len(lines) - 1:
trains.append(current_training)
if header_parsed:
trains.append(current_training)
try:
current_training = Training(
date=datetime.strptime(date, "%d.%m.%Y").date(), exercises=[]
)
except ValueError:
current_training = Training(
date=datetime.strptime(date, "%d.%m.%y").date(), exercises=[]
)
continue
def create_training_from_date(self, date_str: str) -> Training:
"""Create Training object from date string with fallback parsing."""
try:
exr = parse_training_exercises(line)
current_training.exercises.append(exr)
return Training(
date=datetime.strptime(date_str, "%d.%m.%Y").date(), exercises=[]
)
except ValueError:
pass
return trains[1:]
return Training(
date=datetime.strptime(date_str, "%d.%m.%y").date(), exercises=[]
)
def parse_training_data() -> List[Training]:
"""Parse Apple Notes training data."""
parser = AppleNotesParser()
return parser.parse_training_data()
def remap_unique_exercises(apple_trainings: List[Training]) -> List[Training]:
"""Remap exercise names using Apple-specific mapping."""
for apple_training in apple_trainings:
if not apple_training or not apple_training.exercises:
continue

View file

@ -1,126 +1,42 @@
import os
import re
from pprint import pprint
from typing import List, Tuple
from datetime import datetime
from obsidian.py_models import Training
from obsidian.mapper import obsidian_unique_exercies_mapping
from obsidian.py_models import Approach, Exercise, Training
from utils.date_refactor import parse_training_date
current_directory = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT_DIR = os.getcwd()
from parsers.base_parser import BaseNotesParser
def get_data_path():
notes_data_dir = os.path.join(PROJECT_ROOT_DIR, "data")
return notes_data_dir
class ObsidianNotesParser(BaseNotesParser):
"""Parser for Obsidian Notes format training data."""
def __init__(self):
super().__init__("obsidian.md")
def filter_training_data(self, training_data: str) -> str:
"""Filter Obsidian-specific training data format."""
cleaned_text = re.sub(r"^\s*?\n", "", training_data, flags=re.MULTILINE)
return cleaned_text
def parse_training_header(self, training_data_line: str) -> Tuple[bool, str, str, str]:
"""Parse Obsidian Notes training header format."""
pattern: str = r"#\s(?P<date>\d+.\d+.\d+)\s\((?P<trainer>.+)-(?P<year_counter>.+)\)"
match = re.search(pattern, training_data_line)
if match:
date = match.group("date").strip()
trainer = match.group("trainer").strip()
year_count = match.group("year_counter").strip()
return True, date, trainer, year_count
return False, "", "", ""
def get_obsidian_examples_file(example_file_name: str):
return os.path.join(get_data_path(), f"{example_file_name}")
def read_example_file(example_file_name: str):
path_to_example: str = get_obsidian_examples_file(example_file_name)
with open(path_to_example, "r") as f:
content = f.read()
return content
def serialize_exercise(reps: str, weight: str, name: str) -> Exercise:
# Split reps into array of int's
reps_list: List[int] = [int(rep) for rep in reps.split("-")]
weight_splitted: bool = False
weight_list: List[float] = []
if weight:
weight_str_list: List[str] = [weight for weight in weight.split("-")]
# Check if weight is splitted
if any(split_anchor in weight_str_list[0] for split_anchor in ["x", "х"]):
weight_splitted = True
splitter = "x" if "x" in weight_str_list[0] else "х"
weight_list = [float(xweight.split(splitter)[0]) for xweight in weight_str_list]
else:
weight_list = [float(w) for w in weight_str_list]
approaches = []
if not weight:
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=0.0, reps=reps_list[rep_index])
approaches.append(approach)
else:
weight_pointer = 0
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=weight_list[weight_pointer], reps=reps_list[rep_index])
if rep_index < len(weight_list) - 1:
weight_pointer += 1
approaches.append(approach)
exercise = Exercise(
name=name, approaches=approaches, splitted_weight=weight_splitted
)
return exercise
def parse_training_exercises(exercise_line: str) -> Exercise:
stripped: List[str] = [entry.strip() for entry in exercise_line.split("|")][1:-1]
for entry in stripped:
if entry in ["Упражнение", "Вес", "Подходы"]:
raise ValueError
if stripped:
if "---" in stripped[0]:
raise ValueError
if len(stripped) != 3:
raise ValueError
return serialize_exercise(
name=stripped[0], weight=stripped[1], reps=stripped[2]
)
raise ValueError("No valid exercise data found")
def parse_training_header(
training_data_line: str,
) -> Tuple[bool, str, str, str]:
pattern: str = r"#\s(?P<date>\d+.\d+.\d+)\s\((?P<trainer>.+)-(?P<year_counter>.+)\)"
match = re.search(pattern, training_data_line)
if match:
date = match.group("date").strip()
trainer = match.group("trainer").strip()
year_count = match.group("year_counter").strip()
return True, date, trainer, year_count
return False, "", "", ""
def filter_training_data(training_data: str):
cleaned_text = re.sub(r"^\s*?\n", "", training_data, flags=re.MULTILINE)
return cleaned_text
def parse_training_data():
training_data: str = filter_training_data(read_example_file("obsidian.md"))
lines = training_data.splitlines()
current_training = None
trains = []
for index, line in enumerate(lines):
header_parsed, date, trainer, year_count = parse_training_header(line)
if index == len(lines) - 1:
trains.append(current_training)
if header_parsed:
trains.append(current_training)
current_training = Training(
# date=datetime.strptime(date, "%d.%m.%Y").date(), exercises=[]
date=parse_training_date(date),
exercises=[],
)
continue
try:
exr = parse_training_exercises(line)
current_training.exercises.append(exr)
except ValueError:
pass
return trains[1:]
def parse_training_data() -> List[Training]:
"""Parse Obsidian Notes training data."""
parser = ObsidianNotesParser()
return parser.parse_training_data()
def remap_unique_exercises(obsidian_trainings: List[Training]) -> List[Training]:
"""Remap exercise names using Obsidian-specific mapping."""
for obsidian_training in obsidian_trainings:
if not obsidian_training or not obsidian_training.exercises:
continue

112
parsers/base_parser.py Normal file
View file

@ -0,0 +1,112 @@
import os
import re
from typing import List, Tuple
from datetime import datetime
from obsidian.py_models import Approach, Exercise, Training
from utils.date_refactor import parse_training_date
class BaseNotesParser:
"""Base class for parsing training data from different note formats."""
def __init__(self, data_file_name: str):
self.data_file_name = data_file_name
self.project_root = os.getcwd()
def get_data_path(self) -> str:
return os.path.join(self.project_root, "data")
def get_data_file_path(self, file_name: str) -> str:
return os.path.join(self.get_data_path(), file_name)
def read_data_file(self, file_name: str) -> str:
path_to_file = self.get_data_file_path(file_name)
with open(path_to_file, "r") as f:
content = f.read()
return content
def serialize_exercise(self, reps: str, weight: str, name: str) -> Exercise:
"""Convert raw exercise data into Exercise object with approaches."""
reps_list: List[int] = [int(rep) for rep in reps.split("-")]
weight_splitted: bool = False
weight_list: List[float] = []
if weight:
weight_str_list: List[str] = [weight for weight in weight.split("-")]
if any(split_anchor in weight_str_list[0] for split_anchor in ["x", "х"]):
weight_splitted = True
splitter = "x" if "x" in weight_str_list[0] else "х"
weight_list = [float(xweight.split(splitter)[0]) for xweight in weight_str_list]
else:
weight_list = [float(w) for w in weight_str_list]
approaches = []
if not weight:
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=0.0, reps=reps_list[rep_index])
approaches.append(approach)
else:
weight_pointer = 0
for rep_index in range(0, len(reps_list)):
approach = Approach(weight=weight_list[weight_pointer], reps=reps_list[rep_index])
if rep_index < len(weight_list) - 1:
weight_pointer += 1
approaches.append(approach)
exercise = Exercise(
name=name, approaches=approaches, splitted_weight=weight_splitted
)
return exercise
def parse_training_exercises(self, exercise_line: str) -> Exercise:
"""Parse exercise data from a table row."""
stripped: List[str] = [entry.strip() for entry in exercise_line.split("|")][1:-1]
for entry in stripped:
if entry in ["Упражнение", "Вес", "Подходы"]:
raise ValueError
if stripped:
if "---" in stripped[0]:
raise ValueError
if len(stripped) != 3:
raise ValueError
return self.serialize_exercise(
name=stripped[0], weight=stripped[1], reps=stripped[2]
)
raise ValueError("No valid exercise data found")
def filter_training_data(self, training_data: str) -> str:
"""Filter and clean training data. Override in subclasses for specific formats."""
return training_data
def parse_training_header(self, training_data_line: str) -> Tuple[bool, str, str, str]:
"""Parse training header. Override in subclasses for specific formats."""
raise NotImplementedError("Subclasses must implement parse_training_header")
def create_training_from_date(self, date_str: str) -> Training:
"""Create Training object from date string using utility function."""
return Training(date=parse_training_date(date_str), exercises=[])
def parse_training_data(self) -> List[Training]:
"""Main parsing method. Override for specific parsing logic."""
training_data = self.filter_training_data(self.read_data_file(self.data_file_name))
lines = training_data.splitlines()
current_training = None
trains = []
for index, line in enumerate(lines):
header_parsed, date, trainer, year_count = self.parse_training_header(line)
if index == len(lines) - 1:
trains.append(current_training)
if header_parsed:
trains.append(current_training)
current_training = self.create_training_from_date(date)
continue
try:
exr = self.parse_training_exercises(line)
if current_training:
current_training.exercises.append(exr)
except ValueError:
pass
return [train for train in trains if train is not None]