Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 92 additions & 18 deletions dlio_benchmark/reader/parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
Configuration (under storage_options in the DLIO YAML):
columns: null # list of column names to read (null = all)
row_group_cache_size: 4 # max row groups held in memory per reader thread
metadata_cache: true # cache parquet footer metadata across opens
memory_map: true # use memory-mapped I/O
file_cache: true # keep 1 ParquetFile open across close/open calls

Example YAML snippet:
dataset:
Expand All @@ -24,6 +27,8 @@
storage_options:
columns: ["feature1", "label"]
row_group_cache_size: 8
metadata_cache: true
memory_map: true
"""
import bisect

Expand Down Expand Up @@ -57,6 +62,15 @@ def __init__(self, dataset_type, thread_index, epoch):

opts = getattr(self._args, "storage_options", {}) or {}

# Configuration flags
self._use_metadata_cache = opts.get("metadata_cache", True)
self._use_memory_map = opts.get("memory_map", True)
self._use_file_cache = opts.get("file_cache", True)

# Metadata cache: filename -> (FileMetaData, cumulative_offsets)
# Caches parquet footer metadata to avoid re-reading it on every open
self._metadata_cache: dict = {}

# Optional column selection (list[str] or None = all columns)
self._columns = opts.get("columns") or None

Expand All @@ -65,9 +79,15 @@ def __init__(self, dataset_type, thread_index, epoch):
self._rg_cache: dict = {}
self._rg_lru: list = [] # insertion-order LRU key list

# File cache: keeps at most 1 ParquetFile open across close/open cycles
# Stored as (filename, (pf, offsets)) or None
self._file_cache = None

self.logger.info(
f"{utcnow()} ParquetReader thread={thread_index} epoch={epoch} "
f"columns={self._columns} rg_cache_size={self._rg_cache_size}"
f"columns={self._columns} rg_cache_size={self._rg_cache_size} "
f"metadata_cache={self._use_metadata_cache} memory_map={self._use_memory_map} "
f"file_cache={self._use_file_cache}"
)

# ── Helpers ──────────────────────────────────────────────────────────────
Expand All @@ -78,6 +98,14 @@ def _evict_lru(self):
oldest = self._rg_lru.pop(0)
self._rg_cache.pop(oldest, None)

def _evict_rg_for_file(self, filename):
"""Drop all row-group cache entries belonging to ``filename``."""
keys_to_remove = [k for k in self._rg_cache if k[0] == filename]
for k in keys_to_remove:
self._rg_cache.pop(k, None)
if k in self._rg_lru:
self._rg_lru.remove(k)

# ── FormatReader interface ────────────────────────────────────────────────

@dlp.log
Expand All @@ -88,31 +116,76 @@ def open(self, filename):
Returns (ParquetFile, cumulative_offsets) stored in open_file_map[filename].
cumulative_offsets[i] is the first row index of row group i;
cumulative_offsets[-1] is the total row count.

With metadata_cache=True, caches parquet metadata (footer) to avoid re-reading.
With memory_map=True, uses memory-mapped I/O for faster access.
With file_cache=True, returns a cached ParquetFile handle if the same
file was the last one closed, avoiding any re-open work.
"""
import pyarrow.parquet as pq

pf = pq.ParquetFile(filename)
meta = pf.metadata
# File cache hit: same file as the last one we kept open
if self._use_file_cache and self._file_cache is not None and self._file_cache[0] == filename:
return self._file_cache[1]

# File cache miss with a different file cached: evict it now
if self._use_file_cache and self._file_cache is not None:
old_filename = self._file_cache[0]
self._evict_rg_for_file(old_filename)
self._file_cache = None

cached_meta = None
cached_offsets = None

# Check if metadata is cached
if self._use_metadata_cache:
cached = self._metadata_cache.get(filename)
if cached is not None:
cached_meta, cached_offsets = cached

# Open the file - pass cached metadata to skip footer read if available
pf = pq.ParquetFile(
filename,
memory_map=self._use_memory_map,
metadata=cached_meta
)

# Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...]
offsets = [0]
for i in range(meta.num_row_groups):
offsets.append(offsets[-1] + meta.row_group(i).num_rows)
# Use cached offsets or compute them
if cached_offsets is not None:
offsets = cached_offsets
else:
# Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...]
meta = pf.metadata
offsets = [0]
for i in range(meta.num_row_groups):
offsets.append(offsets[-1] + meta.row_group(i).num_rows)

# Cache the metadata and offsets
if self._use_metadata_cache:
self._metadata_cache[filename] = (meta, offsets)

self.logger.debug(
f"{utcnow()} ParquetReader.open {filename} "
f"row_groups={meta.num_row_groups} total_rows={offsets[-1]}"
)
return (pf, offsets)
handle = (pf, offsets)

# Populate the 1-slot file cache
if self._use_file_cache:
self._file_cache = (filename, handle)

return handle

@dlp.log
def close(self, filename):
"""Evict cached row groups for this file to free memory."""
keys_to_remove = [k for k in self._rg_cache if k[0] == filename]
for k in keys_to_remove:
self._rg_cache.pop(k, None)
if k in self._rg_lru:
self._rg_lru.remove(k)
"""
Close ``filename`` and evict its row-group cache entries.

With ``file_cache`` enabled, the most recently used file is kept open;
the actual close/eviction is deferred until a different file is opened
(handled in :meth:`open`) or until :meth:`finalize` runs.
"""
if self._use_file_cache and self._file_cache is not None and self._file_cache[0] == filename:
# Keep this file open in the 1-slot file cache
return

self._evict_rg_for_file(filename)
super().close(filename)

@dlp.log
Expand Down Expand Up @@ -169,6 +242,7 @@ def read_index(self, image_idx, step):
def finalize(self):
self._rg_cache.clear()
self._rg_lru.clear()
self._file_cache = None
return super().finalize()

def is_index_based(self):
Expand Down
Loading