Skip to content

Conversation

@xing-cg
Copy link

@xing-cg xing-cg commented Dec 26, 2025

Pull Request: Fix CSVReader chunk/thread orchestration and mmap error handling

This PR fixes two user-visible failure modes when parsing large CSV files (10MB+) on Linux:

  1. Process abort / std::terminate during file parsing, due to std::error_code being thrown from a worker thread (mmap path).
  2. Stream parsing corruption around the ~10MB chunk boundary where row["ets"] becomes a multi-line, non-numeric string (stream path), often seen downstream as “ets not a number”.

Note: The stream corruption can be reproduced even with a seekable stream (std::istringstream), so this is not about forward-only streams.


Related issues


User impact

A) mmap / filename path

Typical user code:

#include "csv.hpp"

int main() {
  csv::CSVReader reader("large.csv");
  for (const auto& row : reader) {
    (void)row;
  }
}

Before this PR:

  • The process can abort with:
    • terminate called after throwing an instance of 'std::error_code'
  • The exception occurs on a worker std::thread, so user try/catch cannot intercept it.

After this PR:

  • mmap failures are reported as a catchable std::system_error with context:
    • file / offset / length / error_code

B) stream path (CSVReader(std::istream&, fmt))

Typical user code:

std::istringstream iss(csv_data);
csv::CSVReader reader(iss, fmt);

for (csv::CSVRow& row : reader) {
  auto sv = row["ets"].get_sv();
  // downstream numeric conversion may fail if `sv` is corrupted
}

Before this PR:

  • Around the 10MB chunk boundary (deterministic on our dataset), row["ets"].get_sv() becomes a multi-line string containing commas and \n, even embedding the next record.
  • Downstream numeric conversion fails (application logs it as “ets not a number”).

After this PR:

  • The same reproducer returns a normal numeric ets string at the problematic index.

Root causes

1) Throwing std::error_code directly in MmapParser::next()

In include/internal/basic_csv_parser.cpp, mmap errors were reported via a raw std::error_code and thrown directly.
This is problematic because:

  • std::error_code is not derived from std::exception
  • if it escapes a std::thread entry point, the program calls std::terminate

2) Exceptions escaping the CSVReader worker thread

CSVReader performs chunk reads in a worker thread (CSVReader::read_csv). Previously it did not catch exceptions, so any parsing/mmap failure could escape the thread and trigger std::terminate.

3) Chunk transition ordering / scheduling race in CSVReader::read_row()

When records becomes empty, read_row() decides whether to wait, join the worker, or start a new worker based on records->is_waitable() and parser->eof().
The previous ordering allowed a window where:

  • a worker thread had ended (or was about to start), but is_waitable() did not reflect the correct state
  • eof() could be checked before join() (missing a strict happens-before), resulting in an incorrect decision and a non-deterministic call sequence into the parser

This manifested as corrupted field boundaries in the stream path around the chunk boundary (e.g., ets swallowing the remainder of a row plus the next line).


What changed

A) MmapParser: throw std::system_error with context

  • Convert mmap std::error_code to std::system_error
  • Include file / offset / length in the error message to make EINVAL(22) and similar failures diagnosable
  • Avoid advancing mmap_pos on failure

Files:

  • include/internal/basic_csv_parser.cpp

B) CSVReader: propagate worker-thread exceptions to the caller

  • Catch all exceptions in CSVReader::read_csv()
  • Store std::exception_ptr
  • Rethrow on the consumer thread after join() (in read_row(), begin(), initial_read())

Files:

  • include/internal/csv_reader.cpp
  • include/internal/csv_reader.hpp
  • include/internal/csv_reader_iterator.cpp

C) CSVReader: fix chunk transition ordering / race

  • In the records.empty() branch: join → rethrow → eof-check → start next worker
  • Mark records as waitable before starting the worker to close the “thread created but waitable still false” window

Files:

  • include/internal/csv_reader.cpp

D) Regenerate single-include headers

This project ships both:

  • modular headers/sources under include/
  • generated single-header distributions under single_include/ and single_include_test/

After changing internal sources, the single-header outputs must be regenerated so that users including single_include/csv.hpp get the same fixes.

Files:

  • single_include/csv.hpp
  • single_include_test/csv.hpp

Generation command:

python3 single_header.py > single_include/csv.hpp
python3 single_header.py > single_include_test/csv.hpp

Verification (local)

On a ~13MB / ~79k-line / 32-column CSV dataset:

  • mmap path no longer aborts; mmap failures are reported as std::system_error (catchable).
  • stream path reproducer that previously showed corrupted ets around n≈61640 now shows numeric ets (no newline embedded).

TA2601.csv

- MmapParser::next(): throw std::system_error (with file/offset/length context) instead of throwing std::error_code directly
- CSVReader: catch exceptions in worker thread, store exception_ptr, and rethrow on the consumer thread (avoid std::terminate)
- CSVReader: fix chunk transition ordering/race (join before checking eof; mark records waitable before starting worker; rethrow after join in begin()/initial_read())
- Regenerate single_include headers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant