rntviewer/src/rntuple.cpp

231 lines
9.3 KiB
C++
Raw Normal View History

2024-07-11 12:00:43 +00:00
internal
2024-07-12 09:58:55 +00:00
String8 rntuple_description(Arena *arena, const RNTuple_Data &ntuple)
2024-07-11 12:00:43 +00:00
{
String8 desc = push_str8f(arena, "version %u.%u.%u.%u",
ntuple.version.epoch,
ntuple.version.major,
ntuple.version.minor,
ntuple.version.patch);
2024-07-11 12:00:43 +00:00
return desc;
}
internal
2024-07-12 13:56:04 +00:00
ROOT::Experimental::RNTupleDescriptor create_descriptor(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info)
2024-07-11 12:00:43 +00:00
{
2024-07-12 09:58:55 +00:00
using namespace ROOT::Experimental;
using namespace ROOT::Experimental::Internal;
Temp scratch = temp_begin(arena);
defer { temp_end(scratch); };
const RNTuple_Anchor &anchor = info.anchor;
// Read compressed header+footer
u8 *header_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesHeader);
u8 *footer_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesFooter);
reader.ReadBuffer(header_zip, anchor.fNBytesHeader, anchor.fSeekHeader);
reader.ReadBuffer(footer_zip, anchor.fNBytesFooter, anchor.fSeekFooter);
// Decompress header+footer
u8 *header = arena_push_contiguous(scratch.arena, anchor.fLenHeader);
u8 *footer = arena_push_contiguous(scratch.arena, anchor.fLenFooter);
RNTupleDecompressor::Unzip(header_zip, anchor.fNBytesHeader, anchor.fLenHeader, header);
RNTupleDecompressor::Unzip(footer_zip, anchor.fNBytesFooter, anchor.fLenFooter, footer);
// Deserialize header+footer
RNTupleDescriptorBuilder desc_builder;
RNTupleSerializer::DeserializeHeader(header, anchor.fLenHeader, desc_builder);
RNTupleSerializer::DeserializeFooter(footer, anchor.fLenFooter, desc_builder);
RNTupleDescriptor descriptor = desc_builder.MoveDescriptor();
for (const RClusterGroupDescriptor &cgdesc : descriptor.GetClusterGroupIterable()) {
u64 arena_start = arena_pos(scratch.arena);
// Read page list
u64 page_list_zip_size = cgdesc.GetPageListLocator().fBytesOnStorage;
u64 page_list_seek = cgdesc.GetPageListLocator().GetPosition<u64>();
u8 *page_list_zip = arena_push_contiguous(scratch.arena, page_list_zip_size);
reader.ReadBuffer(page_list_zip, page_list_zip_size, page_list_seek);
// Decompress page list
u64 page_list_len = cgdesc.GetPageListLength();
u8 *page_list = arena_push_contiguous(scratch.arena, page_list_len);
RNTupleDecompressor::Unzip(page_list_zip, page_list_zip_size, page_list_len, page_list);
// Deserialize page list
DescriptorId_t cluster_grpid = cgdesc.GetId();
RNTupleSerializer::DeserializePageList(page_list, page_list_len, cluster_grpid, descriptor);
arena_pop_to(scratch.arena, arena_start);
}
2024-07-12 13:56:04 +00:00
return descriptor;
}
internal
2024-07-12 16:29:35 +00:00
void gather_metadata(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info, RNTuple_Data &rndata)
2024-07-12 13:56:04 +00:00
{
using namespace ROOT::Experimental;
using namespace ROOT::Experimental::Internal;
RNTupleDescriptor descriptor = create_descriptor(arena, reader, info);
2024-07-12 16:29:35 +00:00
u64 n_pages = 0;
u64 n_elems = 0;
Page_Info_Node *pinfo_head = nullptr, *pinfo_tail = nullptr;
2024-07-15 13:54:22 +00:00
Page_Info_Node *last_inserted_pinfo = nullptr;
2024-07-12 16:29:35 +00:00
2024-07-15 09:29:32 +00:00
fprintf(stderr, "Loading pages...\n");
2024-07-15 13:54:22 +00:00
chr::time_point start_t = chr::high_resolution_clock::now();
2024-07-15 16:02:55 +00:00
u64 n_slow = 0;
2024-07-15 13:54:22 +00:00
2024-07-15 09:29:32 +00:00
// for all clusters, gather page metadata
2024-07-12 16:29:35 +00:00
for (const RClusterDescriptor &cluster_desc : descriptor.GetClusterIterable()) {
// for (const RClusterDescriptor::RColumnRange &col_range : cluster_desc.GetColumnRangeIterable()) {
for (auto col_id : cluster_desc.GetColumnIds()) {
const auto &col_range = cluster_desc.GetColumnRange(col_id);
2024-07-15 09:29:32 +00:00
// insert page infos sorted by byte range
2024-07-15 13:54:22 +00:00
// @Speed: this is slow! speed it up!
2024-07-15 09:54:45 +00:00
const auto &page_range = cluster_desc.GetPageRange(col_range.fPhysicalColumnId);
2024-07-12 13:56:04 +00:00
for (const auto &page_info : page_range.fPageInfos) {
const u64 checksum_size = sizeof(u64);
2024-07-12 16:29:35 +00:00
Page_Info_Node *pinfo = arena_push<Page_Info_Node>(arena);
pinfo->range.start = page_info.fLocator.GetPosition<u64>();
pinfo->range.len = page_info.fLocator.fBytesOnStorage + page_info.fHasChecksum * checksum_size;
2024-07-12 16:29:35 +00:00
pinfo->n_elems = page_info.fNElements;
2024-07-15 09:29:32 +00:00
if (!pinfo_head) {
// first node inserted
2024-07-12 16:29:35 +00:00
assert(!pinfo_tail);
pinfo_head = pinfo_tail = pinfo;
2024-07-15 09:29:32 +00:00
} else if (pinfo->range.start >= pinfo_tail->range.end()) {
// after tail
pinfo_tail->next = pinfo;
pinfo_tail = pinfo;
} else if (pinfo->range.end() <= pinfo_head->range.start) {
// before head
pinfo->next = pinfo_head;
pinfo_head = pinfo;
2024-07-15 13:54:22 +00:00
} else if (last_inserted_pinfo && pinfo->range.start == last_inserted_pinfo->range.end()) {
// common case: insert after previous
pinfo->next = last_inserted_pinfo->next;
last_inserted_pinfo->next = pinfo;
2024-07-15 09:29:32 +00:00
} else for (Page_Info_Node *node = pinfo_head->next, *prev = pinfo_head; node; prev = node, node = node->next) {
2024-07-15 13:54:22 +00:00
if (pinfo->range.end() <= node->range.start) {
2024-07-15 16:02:55 +00:00
printf("inserted 0x%lX - 0x%lX (previous was 0x%lX - 0x%lX)\n", pinfo->range.start, pinfo->range.end(), last_inserted_pinfo->range.start, last_inserted_pinfo->range.end());
2024-07-15 09:29:32 +00:00
prev->next = pinfo;
pinfo->next = node;
2024-07-15 16:02:55 +00:00
++n_slow;
2024-07-15 13:54:22 +00:00
break;
2024-07-15 09:29:32 +00:00
}
2024-07-12 16:29:35 +00:00
}
2024-07-15 09:29:32 +00:00
2024-07-15 13:54:22 +00:00
last_inserted_pinfo = pinfo;
2024-07-12 16:29:35 +00:00
++n_pages;
n_elems += page_info.fNElements;
2024-07-12 13:56:04 +00:00
}
}
}
2024-07-12 16:29:35 +00:00
2024-07-15 13:54:22 +00:00
chr::time_point end_t = chr::high_resolution_clock::now();
u64 time_spent_ms = chr::duration_cast<chr::milliseconds>(end_t - start_t).count();
2024-07-15 16:02:55 +00:00
fprintf(stderr, "Loaded %lu pages in %lu ms (%lu took the slow path).\nGenerating groups...\n", n_pages, n_slow, time_spent_ms);
2024-07-12 16:29:35 +00:00
2024-07-15 09:54:45 +00:00
// Create page groups and chunks.
2024-07-15 09:29:32 +00:00
// Each page group is a grouping of GROUP_SIZE page infos whose range is equal to the combined ranges
// of its components. It is an acceleration structure used to more quickly find the correct page info
// that an offset belongs to.
2024-07-15 09:54:45 +00:00
// A page chunk is a grouping of adjacent page groups, used to quickly determine if an offset is part
// of a page or not.
assert(pinfo_head);
const u64 GROUP_SIZE = 500;
2024-07-12 16:29:35 +00:00
Page_Info_Group *groups = arena_push_array_nozero<Page_Info_Group>(arena, n_pages / GROUP_SIZE + 1);
u64 n_groups = 1;
groups->first = pinfo_head;
groups->range.start = pinfo_head->range.start;
2024-07-15 13:54:22 +00:00
Page_Info_Chunk *chunks_head = arena_push<Page_Info_Chunk>(arena);
Page_Info_Chunk *chunks_tail = chunks_head;
chunks_head->range = pinfo_head->range;
u64 n_chunks = 1;
2024-07-15 09:29:32 +00:00
u64 idx = 1;
2024-07-15 09:54:45 +00:00
for (Page_Info_Node *pinfo = pinfo_head->next; pinfo; pinfo = pinfo->next) {
if (pinfo->range.start != chunks_tail->range.end()) {
// close current chunk and open new one
2024-07-15 09:54:45 +00:00
Page_Info_Chunk *chunk = arena_push<Page_Info_Chunk>(arena);
chunk->range.start = pinfo->range.start;
2024-07-15 09:54:45 +00:00
chunk->first_group = n_groups;
printf("closing chunk 0x%lX - 0x%lX, opening new at 0x%0lX\n", chunks_tail->range.start, chunks_tail->range.end(), pinfo->range.start);
2024-07-15 09:54:45 +00:00
chunks_tail->next = chunk;
chunks_tail = chunk;
++n_chunks;
2024-07-12 16:29:35 +00:00
}
chunks_tail->range.len += pinfo->range.len;
printf("adding page 0x%lX - 0x%lX to chunk 0x%lX - 0x%lX\n", pinfo->range.start, pinfo->range.end(), chunks_tail->range.start, chunks_tail->range.end());
if (idx++ % GROUP_SIZE != 0)
continue;
// Create a new group every GROUP_SIZE page infos
Page_Info_Group &cur_group = groups[n_groups];
cur_group.first = pinfo;
cur_group.range.start = pinfo->range.start;
Page_Info_Group &prev_group = groups[n_groups - 1];
prev_group.range.len = cur_group.range.start - prev_group.range.start;
2024-07-15 09:54:45 +00:00
++n_groups;
2024-07-12 16:29:35 +00:00
}
2024-07-15 13:54:22 +00:00
Page_Info_Group &last_group = groups[n_groups - 1];
last_group.range.len = pinfo_tail->range.end() - last_group.range.start;
2024-07-15 09:29:32 +00:00
fprintf(stderr, "Generated %lu groups and %lu chunks.\n", n_groups, n_chunks);
2024-07-12 16:29:35 +00:00
2024-07-15 13:54:22 +00:00
assert(!chunks_tail->next);
assert(!pinfo_tail->next);
2024-07-12 16:29:35 +00:00
rndata.pages = pinfo_head;
rndata.page_groups = groups;
rndata.n_page_groups = n_groups;
2024-07-15 09:29:32 +00:00
rndata.page_chunks = chunks_head;
rndata.n_page_chunks = n_chunks;
2024-07-12 16:29:35 +00:00
rndata.n_pages = n_pages;
rndata.n_elems = n_elems;
2024-07-12 09:58:55 +00:00
}
internal
RNTuple_Data get_rntuple_data(Arena *arena, const char *fname, const char *ntpl_name)
{
RNTuple_Data rndata = {};
2024-07-11 12:00:43 +00:00
2024-07-11 14:29:44 +00:00
// TODO: proper error handling
2024-07-12 09:58:55 +00:00
RMicroFileReader file_reader { fname };
RNTuple_File_Info file_info = file_reader.GetNTupleProper(ntpl_name);
2024-07-12 07:53:01 +00:00
if (!file_info.failed) {
2024-07-12 09:58:55 +00:00
rndata.version.epoch = file_info.anchor.fVersionEpoch;
rndata.version.major = file_info.anchor.fVersionMajor;
rndata.version.minor = file_info.anchor.fVersionMinor;
rndata.version.patch = file_info.anchor.fVersionPatch;
rndata.rng_header.start = file_info.anchor.fSeekHeader;
rndata.rng_header.len = file_info.anchor.fNBytesHeader;
rndata.rng_footer.start = file_info.anchor.fSeekFooter;
rndata.rng_footer.len = file_info.anchor.fNBytesFooter;
rndata.rng_anchor.start = file_info.anchor_seek;
rndata.rng_anchor.len = file_info.anchor_nbytes;
rndata.rng_anchor_key.start = file_info.anchor_key_seek;
rndata.rng_anchor_key.len = file_info.anchor_key_nbytes;
rndata.rblob_header_size = file_info.rblob_key_header_nbytes;
rndata.root_file_header_size = file_info.tfile_header_nbytes;
2024-07-12 16:29:35 +00:00
gather_metadata(arena, file_reader, file_info, rndata);
2024-07-11 14:29:44 +00:00
}
2024-07-12 09:58:55 +00:00
return rndata;
2024-07-11 12:00:43 +00:00
}
2024-07-12 09:58:55 +00:00