diff --git a/src/rntuple.cpp b/src/rntuple.cpp index 7796ec0..263b68f 100644 --- a/src/rntuple.cpp +++ b/src/rntuple.cpp @@ -103,7 +103,6 @@ void gather_ntuple_metadata(Arena *arena, RMicroFileReader &reader, const RNTupl for (const RClusterDescriptor::RColumnRange &col_range : cluster_desc.GetColumnRangeIterable()) { // insert page infos sorted by byte range - // @Speed: this is slow! speed it up! const auto &page_range = cluster_desc.GetPageRange(col_range.fPhysicalColumnId); for (const auto &page_info : page_range.fPageInfos) { const u64 checksum_size = sizeof(u64); @@ -120,22 +119,54 @@ void gather_ntuple_metadata(Arena *arena, RMicroFileReader &reader, const RNTupl } else if (pinfo->range.start >= pinfo_tail->range.end()) { // after tail pinfo_tail->next = pinfo; + pinfo->prev = pinfo_tail; pinfo_tail = pinfo; } else if (pinfo->range.end() <= pinfo_head->range.start) { // before head pinfo->next = pinfo_head; + pinfo_head->prev = pinfo; pinfo_head = pinfo; - } else if (last_inserted_pinfo && pinfo->range.start == last_inserted_pinfo->range.end()) { - // common case: insert after previous - pinfo->next = last_inserted_pinfo->next; - last_inserted_pinfo->next = pinfo; - } else for (Page_Info_Node *node = pinfo_head->next, *prev = pinfo_head; node; prev = node, node = node->next) { - if (pinfo->range.end() <= node->range.start) { - prev->next = pinfo; - pinfo->next = node; - ++n_slow; - break; + } else { + // Very commonly pages are already sorted either in increasing or decreasing order. + // By starting to look from the last inserted page we are very likely to find the + // proper slot immediately. + b8 inserted = false; + b8 pinfo_is_after_last = pinfo->range.start >= last_inserted_pinfo->range.end(); + if (pinfo_is_after_last) { + for (Page_Info_Node *node = last_inserted_pinfo->next; node; node = node->next) { + // check if `pinfo` fits right before the node we're looking at + if (pinfo->range.end() <= node->range.start) { + Page_Info_Node *prev = node->prev; + if (UNLIKELY(!prev) || prev->range.end() <= pinfo->range.start) { + if (LIKELY(prev)) { + prev->next = pinfo; + pinfo->prev = prev; + } + pinfo->next = node; + inserted = true; + break; + } + } + } + } else { + for (Page_Info_Node *node = last_inserted_pinfo; node; node = node->prev) { + // check if `pinfo` fits right before the node we're looking at + if (pinfo->range.end() <= node->range.start) { + Page_Info_Node *prev = node->prev; + if (UNLIKELY(!prev) || prev->range.end() <= pinfo->range.start) { + if (LIKELY(prev)) { + prev->next = pinfo; + pinfo->prev = prev; + } + pinfo->next = node; + inserted = true; + break; + } + } + } } + + assert(inserted); } last_inserted_pinfo = pinfo; @@ -171,8 +202,11 @@ void gather_ntuple_metadata(Arena *arena, RMicroFileReader &reader, const RNTupl u64 n_chunks = 1; u64 idx = 1; - + [[maybe_unused]] Page_Info_Node *prev = pinfo_head; for (Page_Info_Node *pinfo = pinfo_head->next; pinfo; pinfo = pinfo->next) { + assert(prev->range.end() <= pinfo->range.start); + prev = pinfo; + if (pinfo->range.start != chunks_tail->range.end()) { // close current chunk and open new one Page_Info_Chunk *chunk = arena_push(arena); diff --git a/src/rntuple.h b/src/rntuple.h index a6fc00f..b6485aa 100644 --- a/src/rntuple.h +++ b/src/rntuple.h @@ -11,6 +11,7 @@ struct Range_Seq { struct Page_Info_Node { Page_Info_Node *next; + Page_Info_Node *prev; Byte_Range range; // len includes checksum i32 n_elems; // negative = page has checksum