Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ volumes
*.xml

# ClickHouse database
*.chdb
*.chdb
8 changes: 5 additions & 3 deletions app/mcp/v1/tools/ch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

ch_reader_router = FastMCP(name="CH Reader MCP")


@ch_reader_router.tool
def get_health_summary_ch() -> dict[str, Any]:
"""
Expand All @@ -24,7 +25,8 @@ def get_health_summary_ch() -> dict[str, Any]:
try:
return get_health_summary_from_ch()
except Exception as e:
return {'error': str(e)}
return {"error": str(e)}


@ch_reader_router.tool
def search_health_records_ch(params: HealthRecordSearchParams) -> dict[str, Any]:
Expand All @@ -48,7 +50,8 @@ def search_health_records_ch(params: HealthRecordSearchParams) -> dict[str, Any]
try:
return search_health_records_from_ch(params)
except Exception as e:
return {'error': str(e)}
return {"error": str(e)}


@ch_reader_router.tool
def get_statistics_by_type_ch(record_type: RecordType | str) -> dict[str, Any]:
Expand Down Expand Up @@ -122,4 +125,3 @@ def get_trend_data_ch(
return get_trend_data_from_ch(record_type, interval, date_from, date_to)
except Exception as e:
return {"error": f"Failed to get trend data: {str(e)}"}

4 changes: 2 additions & 2 deletions app/services/ch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def inquire(self, query: str) -> dict[str, Any]:
:return: result of the query
"""
# first call to json.loads() only returns a string, and the second one a dict
response: str = json.dumps(str(self.session.query(query, fmt='JSON')))
response: str = json.dumps(str(self.session.query(query, fmt="JSON")))
try:
return json.loads(json.loads(response))
except JSONDecodeError as e:
return {'error': str(e)}
return {"error": str(e)}
12 changes: 7 additions & 5 deletions app/services/health/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from time import time
from typing import Any

from app.services.ch import CHClient
Expand All @@ -7,6 +6,7 @@

ch = CHClient()


def build_value_range(valuemin: str | None, valuemax: str | None) -> str | None:
if valuemax and valuemin:
return f"value >= '{valuemin}' and value <= '{valuemax}'"
Expand All @@ -32,7 +32,7 @@ def fill_query(params: HealthRecordSearchParams) -> str:

if conditions:
query += " AND " + " AND ".join(conditions)
query += f'LIMIT {params.limit}'
query += f"LIMIT {params.limit}"
return query


Expand All @@ -46,7 +46,9 @@ def search_health_records_from_ch(params: HealthRecordSearchParams) -> dict[str,


def get_statistics_by_type_from_ch(record_type: RecordType | str) -> dict[str, Any]:
return ch.inquire(f"SELECT type, COUNT(*), AVG(numerical), SUM(numerical), MIN(numerical), MAX(numerical) FROM {ch.db_name}.{ch.table_name} WHERE type = '{record_type}' GROUP BY type")
return ch.inquire(
f"SELECT type, COUNT(*), AVG(numerical), SUM(numerical), MIN(numerical), MAX(numerical) FROM {ch.db_name}.{ch.table_name} WHERE type = '{record_type}' GROUP BY type"
)


def get_trend_data_from_ch(
Expand All @@ -58,6 +60,6 @@ def get_trend_data_from_ch(
return ch.inquire(f"""
SELECT toStartOfInterval(startDate, INTERVAL 1 {interval}) AS interval,
AVG(numerical), MIN(numerical), MAX(numerical), COUNT(*) FROM {ch.db_name}.{ch.table_name}
WHERE type = '{record_type}' {f"AND startDate >= '{date_from}'" if date_from else ''} {f"AND startDate <= '{date_to}'" if date_to else ''}
GROUP BY interval ORDER BY interval ASC
WHERE type = '{record_type}' {f"AND startDate >= '{date_from}'" if date_from else ""} {f"AND startDate <= '{date_to}'" if date_to else ""}
GROUP BY interval ORDER BY interval ASC
""")
2 changes: 1 addition & 1 deletion glama.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
"bartmichalak",
"psobusiak"
]
}
}
5 changes: 3 additions & 2 deletions scripts/clickhouse_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from app.services.ch import CHClient
from scripts.xml_exporter import XMLExporter


class CHIndexer(XMLExporter, CHClient):
def __init__(self):
XMLExporter.__init__(self)
Expand Down Expand Up @@ -31,7 +32,6 @@ def create_table(self) -> None:
ORDER BY startDate
""")


def index_data(self) -> bool:
for docs in self.parse_xml():
try:
Expand Down Expand Up @@ -60,6 +60,7 @@ def run(self) -> bool:
print("Error during data indexing")
return False


if __name__ == "__main__":
ch = CHIndexer()
ch.run()
ch.run()
19 changes: 8 additions & 11 deletions scripts/xml_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

from app.config import settings


class XMLExporter:
def __init__(self):
self.path: Path = Path(settings.RAW_XML_PATH)
self.chunk_size: int = settings.CHUNK_SIZE

DATE_FIELDS: tuple[str] = ("startDate", "endDate", "creationDate")
DATE_FIELDS: tuple[str, ...] = ("startDate", "endDate", "creationDate")
DEFAULT_VALUES: dict[str, str] = {
"unit": "unknown",
"sourceVersion": "unknown",
Expand All @@ -39,23 +40,19 @@ def update_record(self, document: dict[str, Any]) -> dict[str, Any]:
that are optional and aren't filled out in every record
"""
for field in self.DATE_FIELDS:
document[field] = datetime.strptime(
document[field], '%Y-%m-%d %H:%M:%S %z'
)
document[field] = datetime.strptime(document[field], "%Y-%m-%d %H:%M:%S %z")

if len(document) != 9:
document.update(
{k: v for k, v in self.DEFAULT_VALUES.items() if k not in document}
)
document.update({k: v for k, v in self.DEFAULT_VALUES.items() if k not in document})

# making sure there are value field with text values
# and numerical which always contain numbers for the sake
# of aggregation in clickhouse
try:
val = float(document['value'])
document['numerical'] = val
val = float(document["value"])
document["numerical"] = val
except (TypeError, ValueError):
document['numerical'] = 0.0
document["numerical"] = 0.0

return document

Expand All @@ -79,4 +76,4 @@ def parse_xml(self) -> Generator[DataFrame, Any, None]:
elem.clear()

# yield remaining records
yield DataFrame(records).reindex(columns=self.COLUMN_NAMES)
yield DataFrame(records).reindex(columns=self.COLUMN_NAMES)