diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9687dbf..73582d6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,3 +20,10 @@ repos: hooks: - id: trailing-whitespace - id: end-of-file-fixer + +exclude: | + (?x)( + ^tests/ + ^docs/ + ^README.md/ + ) diff --git a/app/config.py b/app/config.py index a2fe6c3..f4227c7 100644 --- a/app/config.py +++ b/app/config.py @@ -30,7 +30,7 @@ class Settings(BaseSettings): CH_DB_NAME: str = "applehealth" CH_TABLE_NAME: str = "data" - DUCKDB_FILENAME: str = "applehealth.parquet" + DUCKDB_FILENAME: str = "applehealth.duckdb" CHUNK_SIZE: int = 50_000 diff --git a/app/mcp/v1/tools/duckdb_reader.py b/app/mcp/v1/tools/duckdb_reader.py index b064cd1..e23fd9a 100644 --- a/app/mcp/v1/tools/duckdb_reader.py +++ b/app/mcp/v1/tools/duckdb_reader.py @@ -2,7 +2,7 @@ from fastmcp import FastMCP -from app.schemas.record import HealthRecordSearchParams, IntervalType, RecordType +from app.schemas.record import HealthRecordSearchParams, IntervalType, RecordType, WorkoutType from app.services.health.duckdb_queries import ( get_health_summary_from_duckdb, get_statistics_by_type_from_duckdb, @@ -11,7 +11,7 @@ search_values_from_duckdb, ) -duckdb_reader_router = FastMCP(name="CH Reader MCP") +duckdb_reader_router = FastMCP(name="DuckDB Reader MCP") @duckdb_reader_router.tool @@ -23,14 +23,16 @@ def get_health_summary_duckdb() -> list[dict[str, Any]]: Notes for LLM: - IMPORTANT - Do not guess, autofill, or assume any missing data. - - If there are multiple databases available (DuckDB, ClickHouse, Elasticsearch): + - Use this tool if you're not certain of the record type that + should be called + - If there are multiple databases available (DuckDB, Elasticsearch): first, ask the user which one he wants to use. DO NOT call any tools before the user specifies his intent. - If the user decides on an option, only use tools from this database, do not switch over to another until the user specifies that he wants to use a different one. You do not have to keep asking whether the user wants to use the same database that he used before. - - If there is only one database available (DuckDB, ClickHouse, Elasticsearch): + - If there is only one database available (DuckDB, Elasticsearch): you can use the tools from this database without the user specifying it. """ try: @@ -46,10 +48,11 @@ def search_health_records_duckdb(params: HealthRecordSearchParams) -> list[dict[ Parameters: - params: HealthRecordSearchParams object containing all search/filter parameters. + (required parameters: record_type) Notes for LLMs: - This function should return a list of health record documents (dicts) - matching the search criteria. + matching the search criteria ordered by date from most to least recent. - Each document in the list should represent a single health record as stored in ClickHouse. - If an error occurs, the function should return a list with a single dict containing an 'error' key and the error message. @@ -58,14 +61,16 @@ def search_health_records_duckdb(params: HealthRecordSearchParams) -> list[dict[ - Example date_from/date_to: "2020-01-01T00:00:00+00:00" - Example value_min/value_max: "10", "100.5" - IMPORTANT - Do not guess, autofill, or assume any missing data. - - If there are multiple databases available (DuckDB, ClickHouse, Elasticsearch): + - This tool can be used to search for most recent records of a given type, + in which case you should use this tool with a limit of 1. + - If there are multiple databases available (DuckDB, Elasticsearch): first, ask the user which one he wants to use. DO NOT call any tools before the user specifies his intent. - If the user decides on an option, only use tools from this database, do not switch over to another until the user specifies that he wants to use a different one. You do not have to keep asking whether the user wants to use the same database that he used before. - - If there is only one database available (DuckDB, ClickHouse, Elasticsearch): + - If there is only one database available (DuckDB, Elasticsearch): you can use the tools from this database without the user specifying it. """ try: @@ -75,7 +80,9 @@ def search_health_records_duckdb(params: HealthRecordSearchParams) -> list[dict[ @duckdb_reader_router.tool -def get_statistics_by_type_duckdb(record_type: RecordType | str) -> list[dict[str, Any]]: +def get_statistics_by_type_duckdb( + record_type: RecordType | WorkoutType | str, +) -> list[dict[str, Any]]: """ Get comprehensive statistics for a specific health record type from DuckDB. @@ -105,17 +112,19 @@ def get_statistics_by_type_duckdb(record_type: RecordType | str) -> list[dict[st specific health metrics. - The function is useful for health analysis, identifying outliers, and understanding data quality. + - This tool can also be used to figure out the value of the record with + the shortest/longest duration or highest/lowest value - date_range key for query is commented, since it contained hardcoded from date, but you can use it anyway if you replace startDate with your data. - IMPORTANT - Do not guess, autofill, or assume any missing data. - - If there are multiple databases available (DuckDB, ClickHouse, Elasticsearch): + - If there are multiple databases available (DuckDB, Elasticsearch): first, ask the user which one he wants to use. DO NOT call any tools before the user specifies his intent. - If the user decides on an option, only use tools from this database, do not switch over to another until the user specifies that he wants to use a different one. You do not have to keep asking whether the user wants to use the same database that he used before. - - If there is only one database available (DuckDB, ClickHouse, Elasticsearch): + - If there is only one database available (DuckDB, Elasticsearch): you can use the tools from this database without the user specifying it. """ try: @@ -126,7 +135,7 @@ def get_statistics_by_type_duckdb(record_type: RecordType | str) -> list[dict[st @duckdb_reader_router.tool def get_trend_data_duckdb( - record_type: RecordType | str, + record_type: RecordType | WorkoutType | str, interval: IntervalType = "month", date_from: str | None = None, date_to: str | None = None, @@ -166,14 +175,14 @@ def get_trend_data_duckdb( - IMPORTANT - interval must be one of: "day", "week", "month", or "year". Do not use other values. - Do not guess, autofill, or assume any missing data. - - If there are multiple databases available (DuckDB, ClickHouse, Elasticsearch): + - If there are multiple databases available (DuckDB, Elasticsearch): first, ask the user which one he wants to use. DO NOT call any tools before the user specifies his intent. - If the user decides on an option, only use tools from this database, do not switch over to another until the user specifies that he wants to use a different one. You do not have to keep asking whether the user wants to use the same database that he used before. - - If there is only one database available (DuckDB, ClickHouse, Elasticsearch): + - If there is only one database available (DuckDB, Elasticsearch): you can use the tools from this database without the user specifying it. """ try: @@ -184,7 +193,7 @@ def get_trend_data_duckdb( @duckdb_reader_router.tool def search_values_duckdb( - record_type: RecordType | str | None, + record_type: RecordType | WorkoutType | str | None, value: str, date_from: str | None = None, date_to: str | None = None, @@ -204,14 +213,14 @@ def search_values_duckdb( records with the value of "HKCategoryValueSleepAnalysisAsleepDeep" - The function automatically handles date filtering if date_from/date_to are provided - Do not guess, autofill, or assume any missing data. - - If there are multiple databases available (DuckDB, ClickHouse, Elasticsearch): + - If there are multiple databases available (DuckDB, Elasticsearch): first, ask the user which one he wants to use. DO NOT call any tools before the user specifies his intent. - If the user decides on an option, only use tools from this database, do not switch over to another until the user specifies that he wants to use a different one. You do not have to keep asking whether the user wants to use the same database that he used before. - - If there is only one database available (DuckDB, ClickHouse, Elasticsearch): + - If there is only one database available (DuckDB, Elasticsearch): you can use the tools from this database without the user specifying it. """ try: diff --git a/app/schemas/record.py b/app/schemas/record.py index fe0c940..3a56ea2 100644 --- a/app/schemas/record.py +++ b/app/schemas/record.py @@ -22,14 +22,27 @@ "HKQuantityTypeIdentifierEnvironmentalAudioExposure", ] +WorkoutType = Literal[ + "HKWorkoutActivityTypeRunning", + "HKWorkoutActivityTypeWalking", + "HKWorkoutActivityTypeHiking", + "HKWorkoutActivityTypeTraditionalStrengthTraining", + "HKWorkoutActivityTypeCycling", + "HKWorkoutActivityTypeMixedMetabolicCardioTraining", + "HKWorkoutActivityTypeHighIntensityIntervalTraining", + "HKWorkoutActivityTypeHockey", +] + IntervalType = Literal["day", "week", "month", "year"] class HealthRecordSearchParams(BaseModel): - record_type: RecordType | str | None = None + record_type: RecordType | WorkoutType | str | None = None source_name: str | None = None date_from: str | None = None date_to: str | None = None + min_workout_duration: str | None = None + max_workout_duration: str | None = None value_min: str | None = None value_max: str | None = None limit: int = 10 diff --git a/app/services/duckdb_client.py b/app/services/duckdb_client.py index 5ebb4be..386acfa 100644 --- a/app/services/duckdb_client.py +++ b/app/services/duckdb_client.py @@ -24,9 +24,11 @@ def __post_init__(self): else: self.path = Path(self.path) - if isinstance(self.path, Path) and not self.path.exists(): - raise FileNotFoundError(f"Parquet file not found: {self.path}") - @staticmethod - def format_response(response: DuckDBPyRelation) -> list[dict[str, Any]]: - return response.df().to_dict(orient="records") + def format_response( + response: DuckDBPyRelation | list[DuckDBPyRelation], + ) -> list[dict[str, Any]]: + if isinstance(response, DuckDBPyRelation): + return response.df().to_dict(orient="records") + records = [record.df().to_dict(orient="records") for record in response] + return sum(records, []) diff --git a/app/services/health/duckdb_queries.py b/app/services/health/duckdb_queries.py index 4cd9937..8f3621e 100644 --- a/app/services/health/duckdb_queries.py +++ b/app/services/health/duckdb_queries.py @@ -1,72 +1,155 @@ +import logging from typing import Any import duckdb -from app.schemas.record import HealthRecordSearchParams, IntervalType, RecordType +from app.schemas.record import ( + HealthRecordSearchParams, + IntervalType, + RecordType, + WorkoutType, +) from app.services.duckdb_client import DuckDBClient -from app.services.health.sql_helpers import fill_query +from app.services.health.sql_helpers import ( + fill_query, + get_table, + join_query, + join_string, + value_aggregates, +) client = DuckDBClient() +con = duckdb.connect(client.path, read_only=True) def get_health_summary_from_duckdb() -> list[dict[str, Any]]: - response = duckdb.sql( - f"""SELECT type, COUNT(*) AS count FROM read_parquet('{client.path}') - GROUP BY type ORDER BY count DESC""", + records = con.sql( + """SELECT type, COUNT(*) AS count FROM records + GROUP BY type ORDER BY count DESC""", ) - return client.format_response(response) + workouts = con.sql( + f"""SELECT workouts.type, COUNT(*) AS count FROM workouts {join_query} + GROUP BY workouts.type ORDER BY count DESC""", + ) + + return client.format_response([records, workouts]) def search_health_records_from_duckdb( params: HealthRecordSearchParams, ) -> list[dict[str, Any]]: - query: str = f"SELECT * FROM read_parquet('{client.path}')" + query: str = "SELECT * FROM" query += fill_query(params) - response = duckdb.sql(query) + response = con.sql(query) return client.format_response(response) def get_statistics_by_type_from_duckdb( - record_type: RecordType | str, + record_type: RecordType | WorkoutType | str, ) -> list[dict[str, Any]]: - result = duckdb.sql(f""" - SELECT type, COUNT(*) AS count, AVG(value) AS average, - SUM(value) AS sum, MIN(value) AS min, MAX(value) AS max - FROM read_parquet('{client.path}') - WHERE type = '{record_type}' GROUP BY type - """) - return client.format_response(result) + table = get_table(record_type) + complimentary_table = "stats" if table == "workouts" else "records" + join_clause = join_string(table) + values = value_aggregates(table) + results = [] + value = values[0] + for value in values: + results.append( + con.sql(f""" + SELECT {table}.type, {complimentary_table}.type AS stat_type, COUNT(*) AS count, + AVG({value}) AS average, SUM({value}) AS sum, MIN({value}) AS min, + MAX({value}) AS max, unit FROM {table} {join_clause} + WHERE {table}.type = '{record_type}' GROUP BY {table}.type, + {complimentary_table}.type, unit + """), + ) + return client.format_response(results) def get_trend_data_from_duckdb( - record_type: RecordType | str, + record_type: RecordType | WorkoutType | str, interval: IntervalType = "month", date_from: str | None = None, date_to: str | None = None, ) -> list[dict[str, Any]]: - result = duckdb.sql(f""" - SELECT device, time_bucket(INTERVAL '1 {interval}', startDate) AS interval, - AVG(value) AS average, SUM(value) AS sum, - MIN(value) AS min, MAX(value) AS max, COUNT(*) AS count - FROM read_parquet('{client.path}') - WHERE type = '{record_type}' - {f"AND startDate >= '{date_from}'" if date_from else ""} - {f"AND startDate <= '{date_to}'" if date_to else ""} - GROUP BY interval, device ORDER BY interval ASC - """) - return client.format_response(result) + table = get_table(record_type) + join_clause = join_string(table) + values = value_aggregates(table) + results = [] + + for value in values: + results.append( + con.sql(f""" + SELECT {table}.type, sourceName, time_bucket(INTERVAL '1 {interval}', + {table}.startDate) AS interval, + AVG({value}) AS average, SUM({value}) AS sum, + MIN({value}) AS min, MAX({value}) AS max, COUNT(*) AS count, + unit FROM {table} {join_clause} + WHERE {table}.type = '{record_type}' + {f"AND {table}.startDate >= '{date_from}'" if date_from else ""} + {f"AND {table}.startDate <= '{date_to}'" if date_to else ""} + GROUP BY interval, {table}.type, sourceName, unit ORDER BY interval ASC + """), + ) + return client.format_response(results) def search_values_from_duckdb( - record_type: RecordType | str | None, + record_type: RecordType | WorkoutType | str | None, value: str, date_from: str | None = None, date_to: str | None = None, ) -> list[dict[str, Any]]: - result = duckdb.sql(f""" - SELECT * FROM read_parquet('{client.path}') WHERE textvalue = '{value}' - {f"AND type = '{record_type}'" if record_type else ""} + table = get_table(record_type) + join_clause = join_string(table) + + result = con.sql(f""" + SELECT * FROM {table} {join_clause} WHERE textValue = '{value}' + {f"AND {table}.type = '{record_type}'" if record_type else ""} {f"AND startDate >= '{date_from}'" if date_from else ""} {f"AND startDate <= '{date_to}'" if date_to else ""} + ORDER BY startDate DESC """) return client.format_response(result) + + +logger = logging.getLogger(__name__) + + +def main() -> None: + logging.basicConfig(level=logging.INFO, filename="duckdb.log", format="%(message)s") + logger.info("Starting logging for duckdb queries") + + logger.info("-----------------") + logger.info(f"records for get_health_summary_from_duckdb: {get_health_summary_from_duckdb()}") + logger.info("-----------------") + logger.info( + f"records for get_statistics_by_type_duckdb:" + f" {get_statistics_by_type_from_duckdb('HKWorkoutActivityTypeRunning')}", + ) + logger.info("-----------------") + logger.info( + f"records for get_trend_data_duckdb: { + get_trend_data_from_duckdb( + 'HKWorkoutActivityTypeRunning', + date_from='2016-01-01T00:00:00+00:00', + date_to='2016-12-31T23:59:59+00:00', + ) + }", + ) + logger.info("-----------------") + pars = HealthRecordSearchParams( + limit=20, + record_type="HKWorkoutActivityTypeRunning", + date_from="2016-01-01T00:00:00+00:00", + date_to="2016-12-31T23:59:59+00:00", + ) + logger.info( + f"records for search_health_records_from_duckdb: {search_health_records_from_duckdb(pars)}", + ) + logger.info("-----------------") + logger.info("Finished logging") + + +if __name__ == "__main__": + main() diff --git a/app/services/health/parquet_queries.py b/app/services/health/parquet_queries.py new file mode 100644 index 0000000..4cd9937 --- /dev/null +++ b/app/services/health/parquet_queries.py @@ -0,0 +1,72 @@ +from typing import Any + +import duckdb + +from app.schemas.record import HealthRecordSearchParams, IntervalType, RecordType +from app.services.duckdb_client import DuckDBClient +from app.services.health.sql_helpers import fill_query + +client = DuckDBClient() + + +def get_health_summary_from_duckdb() -> list[dict[str, Any]]: + response = duckdb.sql( + f"""SELECT type, COUNT(*) AS count FROM read_parquet('{client.path}') + GROUP BY type ORDER BY count DESC""", + ) + return client.format_response(response) + + +def search_health_records_from_duckdb( + params: HealthRecordSearchParams, +) -> list[dict[str, Any]]: + query: str = f"SELECT * FROM read_parquet('{client.path}')" + query += fill_query(params) + response = duckdb.sql(query) + return client.format_response(response) + + +def get_statistics_by_type_from_duckdb( + record_type: RecordType | str, +) -> list[dict[str, Any]]: + result = duckdb.sql(f""" + SELECT type, COUNT(*) AS count, AVG(value) AS average, + SUM(value) AS sum, MIN(value) AS min, MAX(value) AS max + FROM read_parquet('{client.path}') + WHERE type = '{record_type}' GROUP BY type + """) + return client.format_response(result) + + +def get_trend_data_from_duckdb( + record_type: RecordType | str, + interval: IntervalType = "month", + date_from: str | None = None, + date_to: str | None = None, +) -> list[dict[str, Any]]: + result = duckdb.sql(f""" + SELECT device, time_bucket(INTERVAL '1 {interval}', startDate) AS interval, + AVG(value) AS average, SUM(value) AS sum, + MIN(value) AS min, MAX(value) AS max, COUNT(*) AS count + FROM read_parquet('{client.path}') + WHERE type = '{record_type}' + {f"AND startDate >= '{date_from}'" if date_from else ""} + {f"AND startDate <= '{date_to}'" if date_to else ""} + GROUP BY interval, device ORDER BY interval ASC + """) + return client.format_response(result) + + +def search_values_from_duckdb( + record_type: RecordType | str | None, + value: str, + date_from: str | None = None, + date_to: str | None = None, +) -> list[dict[str, Any]]: + result = duckdb.sql(f""" + SELECT * FROM read_parquet('{client.path}') WHERE textvalue = '{value}' + {f"AND type = '{record_type}'" if record_type else ""} + {f"AND startDate >= '{date_from}'" if date_from else ""} + {f"AND startDate <= '{date_to}'" if date_to else ""} + """) + return client.format_response(result) diff --git a/app/services/health/sql_helpers.py b/app/services/health/sql_helpers.py index db62ecd..5acf97e 100644 --- a/app/services/health/sql_helpers.py +++ b/app/services/health/sql_helpers.py @@ -1,40 +1,89 @@ +from typing import Any + from app.schemas.record import HealthRecordSearchParams +join_query: str = "INNER JOIN stats ON workouts.startDate = stats.startDate" + + +def join_string(table: str) -> str: + if table == "workouts": + return join_query + return "" + + +def value_aggregates(table: str) -> list[str]: + if table in ["workouts", "stats"]: + return ["duration", "sum"] + return ["value"] + + +def get_table(record_type: str | Any) -> str: + if record_type.startswith("HKWorkout"): + return "workouts" + return "records" -def build_date(date_from: str | None, date_to: str | None) -> str | None: + +def get_value_type(table: str | None) -> str: + match table: + case "records": + return "value" + case "workouts" | "stats": + return "sum" + case _: + return "value" + + +def build_date(date_from: str | None, date_to: str | None, table: str) -> str | None: if date_from and date_to: - return f"startDate >= '{date_from}' and startDate <= '{date_to}'" + return f"{table}.startDate >= '{date_from}' and {table}.startDate <= '{date_to}'" if date_from: - return f"startDate >= '{date_from}'" + return f"{table}.startDate >= '{date_from}'" if date_to: - return f"startDate <= '{date_to}'" + return f"{table}.startDate <= '{date_to}'" return None -def build_value_range(valuemin: str | None, valuemax: str | None) -> str | None: +def build_value_range( + valuemin: str | None, + valuemax: str | None, + value_type: str | None, +) -> str | None: + # value_type: str = get_value_type(table) + if valuemax and valuemin: - return f"value >= '{valuemin}' and value <= '{valuemax}'" + return f"{value_type} >= '{valuemin}' and {value_type} <= '{valuemax}'" if valuemin: - return f"value >= '{valuemin}'" + return f"{value_type} >= '{valuemin}'" if valuemax: - return f"value <= '{valuemax}'" + return f"{value_type} <= '{valuemax}'" return None def fill_query(params: HealthRecordSearchParams) -> str: conditions: list[str] = [] - query: str = " WHERE 1=1" + table = get_table(params.record_type) + + if table == "workouts": + query: str = f" workouts {join_query} WHERE 1=1" + else: + query: str = " records WHERE 1=1" + value_type = get_value_type(table) if params.record_type: - conditions.append(f" type = '{params.record_type}'") + conditions.append(f" {table}.type = '{params.record_type}'") if params.source_name: conditions.append(f" source_name = '{params.source_name}'") if params.date_from or params.date_to: - conditions.append(build_date(params.date_from, params.date_to)) + conditions.append(build_date(params.date_from, params.date_to, table)) if params.value_min or params.value_max: - conditions.append(build_value_range(params.value_min, params.value_max)) + conditions.append(build_value_range(params.value_min, params.value_max, value_type)) + if params.min_workout_duration or params.max_workout_duration: + conditions.append(build_value_range(params.value_min, params.value_max, "duration")) + + conditions = [condition for condition in conditions if condition is not None] if conditions: query += " AND " + " AND ".join(conditions) - query += f"LIMIT {params.limit}" + + query += f"ORDER BY {table}.startDate DESC LIMIT {params.limit}" return query diff --git a/config/.env.example b/config/.env.example index 035f920..047d7c8 100644 --- a/config/.env.example +++ b/config/.env.example @@ -1,9 +1,6 @@ ES_USER="elastic" ES_PASSWORD="elastic" ES_HOST="localhost" -CH_DIRNAME="applehealth.chdb" -CH_DB_NAME="applehealth" -CH_TABLE_NAME="data" -DUCKDB_FILENAME="applehealth.parquet" +DUCKDB_FILENAME="applehealth.duckdb" CHUNK_SIZE="50000" RAW_XML_PATH="raw.xml" diff --git a/pyproject.toml b/pyproject.toml index 8d5bcc0..1927864 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "httpx>=0.28", "pandas>=2.3.2", "passlib>=1.7", - "polars>=1.33.0", + "polars>=1.34.0", "pydantic>=2.11", "pydantic-settings>=2.10", "testcontainers>=4.10", @@ -31,7 +31,7 @@ module-name = "app" [dependency-groups] dev = [ "fastapi>=0.116.2", - "pytest>=8.4.1", + "pytest>=8.4.2", "pytest-asyncio>=1.0.0", "pytest-cov>=6.2.1", ] @@ -44,9 +44,13 @@ code-quality = [ [tool.pytest.ini_options] asyncio_default_fixture_loop_scope = "session" +[tool.ty.src] +exclude = ["./tests/", "./docs/", "./README.md"] + [tool.ruff] line-length = 100 target-version = "py313" +extend-exclude = ["tests/", "./docs/", "./README.md"] [tool.ruff.lint] select = [ diff --git a/scripts/duckdb_importer.py b/scripts/duckdb_importer.py index 138fabb..723fdc4 100644 --- a/scripts/duckdb_importer.py +++ b/scripts/duckdb_importer.py @@ -1,6 +1,8 @@ import os from pathlib import Path +import duckdb +import pandas as pd import polars as pl from app.services.duckdb_client import DuckDBClient @@ -12,35 +14,148 @@ def __init__(self): XMLExporter.__init__(self) DuckDBClient.__init__(self) - def exportxml(self) -> None: - chunkfiles = [] + chunk_files = [] + + def export_xml(self) -> None: + """ + Export xml data from Apple Health export file + to a .duckdb database with path specified by user + """ + con = duckdb.connect("data.duckdb") + con.sql(""" + CREATE TABLE IF NOT EXISTS records ( + type VARCHAR, + sourceVersion VARCHAR, + sourceName VARCHAR, + device VARCHAR, + startDate TIMESTAMP, + endDate TIMESTAMP, + creationDate TIMESTAMP, + unit VARCHAR, + value DOUBLE, + textValue VARCHAR + ); + """) + con.sql(""" + CREATE TABLE IF NOT EXISTS workouts ( + type VARCHAR, + duration DOUBLE, + durationUnit VARCHAR, + sourceName VARCHAR, + startDate TIMESTAMP, + endDate TIMESTAMP, + creationDate TIMESTAMP + ) + """) + con.sql(""" + CREATE TABLE IF NOT EXISTS stats ( + type VARCHAR, + startDate TIMESTAMP, + endDate TIMESTAMP, + sum DOUBLE, + average DOUBLE, + maximum DOUBLE, + minimum DOUBLE, + unit VARCHAR + ) + """) + + docs_count = 0 for i, docs in enumerate(self.parse_xml(), 1): - df: pl.DataFrame = pl.DataFrame(docs) - chunk_file: Path = Path(f"data.chunk_{i}.parquet") - print(f"processed {i * self.chunk_size} docs") + cols = set(docs.columns) + if cols == set(self.RECORD_COLUMNS): + con.sql(""" + INSERT INTO records SELECT * FROM docs; + """) + if cols == set(self.WORKOUT_COLUMNS): + con.sql(""" + INSERT INTO workouts SELECT * FROM docs; + """) + if cols == set(self.WORKOUT_STATS_COLUMNS): + con.sql(""" + INSERT INTO stats SELECT * FROM docs; + """) + print(f"processed {docs_count + len(docs)} docs") + docs_count += len(docs) + + def write_to_file(self, index: int, df: pl.DataFrame) -> None: + chunk_file: str | Path = "" + try: + # check for columns specific for each table + if "workoutActivityType" in df.columns: + chunk_file = Path(f"workouts.chunk_{index}.parquet") + elif "type" in df.columns: + chunk_file = Path(f"records.chunk_{index}.parquet") + elif "sum" in df.columns: + chunk_file = Path(f"workout_stats.chunk_{index}.parquet") + print(f"processed {index * self.chunk_size} docs") df.write_parquet(chunk_file, compression="zstd", compression_level=1) - chunkfiles.append(chunk_file) - print(f"written {i * self.chunk_size} docs") + self.chunk_files.append(chunk_file) - chunk_dfs: list[pl.DataFrame] = [] - reference_columns: list[str] = [] + except Exception: + for file in self.chunk_files: + os.remove(file) + raise RuntimeError(f"Failed to write chunk file to disk: {chunk_file}") - for chunk_file in chunkfiles: - df = pl.read_parquet(chunk_file) + def export_xml_parquet(self) -> None: + """ + Deprecated method for exporting to multiple parquet files + corresponding to each table in the duckdb database + use export_xml instead + """ - if not reference_columns: - reference_columns = df.columns + for i, docs in enumerate(self.parse_xml(), 1): + df: pl.DataFrame = pl.DataFrame(docs) + self.write_to_file(i, df) + + record_chunk_dfs: list[pd.DataFrame] = [] + workout_chunk_dfs: list[pd.DataFrame] = [] + stat_chunk_dfs: list[pd.DataFrame] = [] + + for chunk_file in self.chunk_files: + df = pl.read_parquet(chunk_file) + cols = set(df.columns) + if cols == set(self.RECORD_COLUMNS): + df = df.select(self.RECORD_COLUMNS) + record_chunk_dfs.append(df) + elif cols == set(self.WORKOUT_COLUMNS): + df = df.select(self.WORKOUT_COLUMNS) + workout_chunk_dfs.append(df) + elif cols == set(self.WORKOUT_STATS_COLUMNS): + df = df.select(self.WORKOUT_STATS_COLUMNS) + stat_chunk_dfs.append(df) - df = df.select(reference_columns) - chunk_dfs.append(df) + record_df = None + workout_df = None + stat_df = None - combined_df = pl.concat(chunk_dfs) - combined_df.write_parquet(f"{self.path}", compression="zstd") + try: + if record_chunk_dfs: + record_df = pl.concat(record_chunk_dfs) + if workout_chunk_dfs: + workout_df = pl.concat(workout_chunk_dfs) + if stat_chunk_dfs: + stat_df = pl.concat(stat_chunk_dfs) + except Exception as e: + for f in self.chunk_files: + os.remove(f) + raise RuntimeError(f"Failed to concatenate dataframes: {str(e)}") + try: + if record_df is not None: + record_df.write_parquet(f"{self.path / 'records.parquet'}", compression="zstd") + if workout_df is not None: + workout_df.write_parquet(f"{self.path / 'workouts.parquet'}", compression="zstd") + if stat_df is not None: + stat_df.write_parquet(f"{self.path / 'stats.parquet'}", compression="zstd") + except Exception as e: + for f in self.chunk_files: + os.remove(f) + raise RuntimeError(f"Failed to write to path {self.path}: {str(e)}") - for f in chunkfiles: + for f in self.chunk_files: os.remove(f) if __name__ == "__main__": importer = ParquetImporter() - importer.exportxml() + importer.export_xml() diff --git a/scripts/xml_exporter.py b/scripts/xml_exporter.py index b7d0c64..3c9a011 100644 --- a/scripts/xml_exporter.py +++ b/scripts/xml_exporter.py @@ -3,24 +3,30 @@ from typing import Any, Generator from xml.etree import ElementTree as ET -from pandas import DataFrame +import pandas as pd from app.config import settings class XMLExporter: def __init__(self): - self.xmlpath: Path = Path(settings.RAW_XML_PATH) + self.xml_path: Path = Path(settings.RAW_XML_PATH) self.chunk_size: int = settings.CHUNK_SIZE DATE_FIELDS: tuple[str, ...] = ("startDate", "endDate", "creationDate") DEFAULT_VALUES: dict[str, str] = { - "unit": "unknown", - "sourceVersion": "unknown", - "device": "unknown", - "value": "unknown", + "unit": "", + "sourceVersion": "", + "device": "", + "value": "", } - COLUMN_NAMES: tuple[str, ...] = ( + DEFAULT_STATS: dict[str, float] = { + "sum": 0.0, + "average": 0.0, + "maximum": 0.0, + "minimum": 0.0, + } + RECORD_COLUMNS: tuple[str, ...] = ( "type", "sourceVersion", "sourceName", @@ -30,49 +36,107 @@ def __init__(self): "creationDate", "unit", "value", - "textvalue", + "textValue", + ) + WORKOUT_COLUMNS: tuple[str, ...] = ( + "type", + "duration", + "durationUnit", + "sourceName", + "startDate", + "endDate", + "creationDate", + ) + WORKOUT_STATS_COLUMNS: tuple[str, ...] = ( + "type", + "startDate", + "endDate", + "sum", + "average", + "maximum", + "minimum", + "unit", ) - def update_record(self, document: dict[str, Any]) -> dict[str, Any]: + def update_record(self, kind: str, document: dict[str, Any]) -> dict[str, Any]: """ Updates records to fill out columns without specified data: There are 9 columns that need to be filled out, and there are 4 columns that are optional and aren't filled out in every record - Additionally a textvalue field is added for querying text values + Additionally a textValue field is added for querying text values """ for field in self.DATE_FIELDS: - document[field] = datetime.strptime(document[field], "%Y-%m-%d %H:%M:%S %z") + if field in document: + document[field] = datetime.strptime(document[field], "%Y-%m-%d %H:%M:%S %z") + + if kind == "record": + if len(document) != 9: + document.update({k: v for k, v in self.DEFAULT_VALUES.items() if k not in document}) + + document["textValue"] = document["value"] + + try: + document["value"] = float(document["value"]) + except (TypeError, ValueError): + document["value"] = 0.0 - if len(document) != 9: - document.update({k: v for k, v in self.DEFAULT_VALUES.items() if k not in document}) + elif kind == "workout": + document["type"] = document.pop("workoutActivityType") - document["textvalue"] = document["value"] + try: + document["duration"] = float(document["duration"]) + except (TypeError, ValueError): + document["duration"] = 0.0 - try: - document["value"] = float(document["value"]) - except (TypeError, ValueError): - document["value"] = 0.0 + elif kind == "stat": + document.update({k: v for k, v in self.DEFAULT_STATS.items() if k not in document}) return document - def parse_xml(self) -> Generator[DataFrame, Any, None]: + def parse_xml(self) -> Generator[pd.DataFrame, Any, None]: """ Parses the XML file and yields pandas dataframes of specified chunk_size. Extracts attributes from each Record element. """ records: list[dict[str, Any]] = [] + workouts: list[dict[str, Any]] = [] + workout_stats: list[dict[str, Any]] = [] - for event, elem in ET.iterparse(self.xmlpath, events=("start",)): + for event, elem in ET.iterparse(self.xml_path, events=("start",)): if elem.tag == "Record" and event == "start": if len(records) >= self.chunk_size: - yield DataFrame(records).reindex(columns=self.COLUMN_NAMES) + yield pd.DataFrame(records).reindex(columns=self.RECORD_COLUMNS) records = [] record: dict[str, Any] = elem.attrib.copy() # fill out empty cells if they exist and convert dates to datetime - self.update_record(record) + self.update_record("record", record) records.append(record) + + elif elem.tag == "Workout" and event == "start": + if len(workouts) >= self.chunk_size: + yield pd.DataFrame(workouts).reindex(columns=self.WORKOUT_COLUMNS) + workouts = [] + workout: dict[str, Any] = elem.attrib.copy() + + for stat in elem: + if stat.tag != "WorkoutStatistics": + continue + statistic = stat.attrib.copy() + self.update_record("stat", statistic) + workout_stats.append(statistic) + if len(workout_stats) >= self.chunk_size: + yield pd.DataFrame(workout_stats).reindex( + columns=self.WORKOUT_STATS_COLUMNS, + ) + workout_stats = [] + + self.update_record("workout", workout) + workouts.append(workout) + elem.clear() # yield remaining records - yield DataFrame(records).reindex(columns=self.COLUMN_NAMES) + yield pd.DataFrame(records).reindex(columns=self.RECORD_COLUMNS) + yield pd.DataFrame(workouts).reindex(columns=self.WORKOUT_COLUMNS) + yield pd.DataFrame(workout_stats).reindex(columns=self.WORKOUT_STATS_COLUMNS) diff --git a/uv.lock b/uv.lock index e2b9d04..33a715d 100644 --- a/uv.lock +++ b/uv.lock @@ -69,7 +69,7 @@ requires-dist = [ { name = "httpx", specifier = ">=0.28" }, { name = "pandas", specifier = ">=2.3.2" }, { name = "passlib", specifier = ">=1.7" }, - { name = "polars", specifier = ">=1.33.0" }, + { name = "polars", specifier = ">=1.34.0" }, { name = "pydantic", specifier = ">=2.11" }, { name = "pydantic-settings", specifier = ">=2.10" }, { name = "testcontainers", specifier = ">=4.10" }, @@ -84,7 +84,7 @@ code-quality = [ ] dev = [ { name = "fastapi", specifier = ">=0.116.2" }, - { name = "pytest", specifier = ">=8.4.1" }, + { name = "pytest", specifier = ">=8.4.2" }, { name = "pytest-asyncio", specifier = ">=1.0.0" }, { name = "pytest-cov", specifier = ">=6.2.1" }, ] @@ -961,16 +961,28 @@ wheels = [ [[package]] name = "polars" -version = "1.33.1" +version = "1.34.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/85/da/8246f1d69d7e49f96f0c5529057a19af1536621748ef214bbd4112c83b8e/polars-1.33.1.tar.gz", hash = "sha256:fa3fdc34eab52a71498264d6ff9b0aa6955eb4b0ae8add5d3cb43e4b84644007", size = 4822485, upload-time = "2025-09-09T08:37:49.062Z" } +dependencies = [ + { name = "polars-runtime-32" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a1/3e/35fcf5bf51404371bb172b289a5065778dc97adca4416e199c294125eb05/polars-1.34.0.tar.gz", hash = "sha256:5de5f871027db4b11bcf39215a2d6b13b4a80baf8a55c5862d4ebedfd5cd4013", size = 684309, upload-time = "2025-10-02T18:31:04.396Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6b/80/1791ac226bb989bef30fe8fde752b2021b6ec5dfd6e880262596aedf4c05/polars-1.34.0-py3-none-any.whl", hash = "sha256:40d2f357b4d9e447ad28bd2c9923e4318791a7c18eb68f31f1fbf11180f41391", size = 772686, upload-time = "2025-10-02T18:29:59.492Z" }, +] + +[[package]] +name = "polars-runtime-32" +version = "1.34.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/02/10/1189afb14cc47ed215ccf7fbd00ed21c48edfd89e51c16f8628a33ae4b1b/polars_runtime_32-1.34.0.tar.gz", hash = "sha256:ebe6f865128a0d833f53a3f6828360761ad86d1698bceb22bef9fd999500dc1c", size = 2634491, upload-time = "2025-10-02T18:31:05.502Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/19/79/c51e7e1d707d8359bcb76e543a8315b7ae14069ecf5e75262a0ecb32e044/polars-1.33.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:3881c444b0f14778ba94232f077a709d435977879c1b7d7bd566b55bd1830bb5", size = 39132875, upload-time = "2025-09-09T08:36:38.609Z" }, - { url = "https://files.pythonhosted.org/packages/f8/15/1094099a1b9cb4fbff58cd8ed3af8964f4d22a5b682ea0b7bb72bf4bc3d9/polars-1.33.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:29200b89c9a461e6f06fc1660bc9c848407640ee30fe0e5ef4947cfd49d55337", size = 35638783, upload-time = "2025-09-09T08:36:43.748Z" }, - { url = "https://files.pythonhosted.org/packages/8d/b9/9ac769e4d8e8f22b0f2e974914a63dd14dec1340cd23093de40f0d67d73b/polars-1.33.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:444940646e76342abaa47f126c70e3e40b56e8e02a9e89e5c5d1c24b086db58a", size = 39742297, upload-time = "2025-09-09T08:36:47.132Z" }, - { url = "https://files.pythonhosted.org/packages/7a/26/4c5da9f42fa067b2302fe62bcbf91faac5506c6513d910fae9548fc78d65/polars-1.33.1-cp39-abi3-manylinux_2_24_aarch64.whl", hash = "sha256:094a37d06789286649f654f229ec4efb9376630645ba8963b70cb9c0b008b3e1", size = 36684940, upload-time = "2025-09-09T08:36:50.561Z" }, - { url = "https://files.pythonhosted.org/packages/06/a6/dc535da476c93b2efac619e04ab81081e004e4b4553352cd10e0d33a015d/polars-1.33.1-cp39-abi3-win_amd64.whl", hash = "sha256:c9781c704432a2276a185ee25898aa427f39a904fbe8fde4ae779596cdbd7a9e", size = 39456676, upload-time = "2025-09-09T08:36:54.612Z" }, - { url = "https://files.pythonhosted.org/packages/cb/4e/a4300d52dd81b58130ccadf3873f11b3c6de54836ad4a8f32bac2bd2ba17/polars-1.33.1-cp39-abi3-win_arm64.whl", hash = "sha256:c3cfddb3b78eae01a218222bdba8048529fef7e14889a71e33a5198644427642", size = 35445171, upload-time = "2025-09-09T08:36:58.043Z" }, + { url = "https://files.pythonhosted.org/packages/97/35/bc4f1a9dcef61845e8e4e5d2318470b002b93a3564026f0643f562761ecb/polars_runtime_32-1.34.0-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:2878f9951e91121afe60c25433ef270b9a221e6ebf3de5f6642346b38cab3f03", size = 39655423, upload-time = "2025-10-02T18:30:02.846Z" }, + { url = "https://files.pythonhosted.org/packages/a6/bb/d655a103e75b7c81c47a3c2d276be0200c0c15cfb6fd47f17932ddcf7519/polars_runtime_32-1.34.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:fbc329c7d34a924228cc5dcdbbd4696d94411a3a5b15ad8bb868634c204e1951", size = 35986049, upload-time = "2025-10-02T18:30:05.848Z" }, + { url = "https://files.pythonhosted.org/packages/9e/ce/11ca850b7862cb43605e5d86cdf655614376e0a059871cf8305af5406554/polars_runtime_32-1.34.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:93fa51d88a2d12ea996a5747aad5647d22a86cce73c80f208e61f487b10bc448", size = 40261269, upload-time = "2025-10-02T18:30:08.48Z" }, + { url = "https://files.pythonhosted.org/packages/d8/25/77d12018c35489e19f7650b40679714a834effafc25d61e8dcee7c4fafce/polars_runtime_32-1.34.0-cp39-abi3-manylinux_2_24_aarch64.whl", hash = "sha256:79e4d696392c6d8d51f4347f0b167c52eef303c9d87093c0c68e8651198735b7", size = 37049077, upload-time = "2025-10-02T18:30:11.162Z" }, + { url = "https://files.pythonhosted.org/packages/e2/75/c30049d45ea1365151f86f650ed5354124ff3209f0abe588664c8eb13a31/polars_runtime_32-1.34.0-cp39-abi3-win_amd64.whl", hash = "sha256:2501d6b29d9001ea5ea2fd9b598787e10ddf45d8c4a87c2bead75159e8a15711", size = 40105782, upload-time = "2025-10-02T18:30:14.597Z" }, + { url = "https://files.pythonhosted.org/packages/a3/31/84efa27aa3478c8670bac1a720c8b1aee5c58c9c657c980e5e5c47fde883/polars_runtime_32-1.34.0-cp39-abi3-win_arm64.whl", hash = "sha256:f9ed1765378dfe0bcd1ac5ec570dd9eab27ea728bbc980cc9a76eebc55586559", size = 35873216, upload-time = "2025-10-02T18:30:17.439Z" }, ] [[package]]