internal String8 rntuple_description(Arena *arena, const RNTuple_Data &ntuple) { String8 desc = push_str8f(arena, "version %u.%u.%u.%u", ntuple.version.epoch, ntuple.version.major, ntuple.version.minor, ntuple.version.patch); return desc; } internal ROOT::Experimental::RNTupleDescriptor create_descriptor(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info) { using namespace ROOT::Experimental; using namespace ROOT::Experimental::Internal; Temp scratch = temp_begin(arena); defer { temp_end(scratch); }; const RNTuple_Anchor &anchor = info.anchor; // Read compressed header+footer u8 *header_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesHeader); u8 *footer_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesFooter); reader.ReadBuffer(header_zip, anchor.fNBytesHeader, anchor.fSeekHeader); reader.ReadBuffer(footer_zip, anchor.fNBytesFooter, anchor.fSeekFooter); // Decompress header+footer u8 *header = arena_push_contiguous(scratch.arena, anchor.fLenHeader); u8 *footer = arena_push_contiguous(scratch.arena, anchor.fLenFooter); RNTupleDecompressor::Unzip(header_zip, anchor.fNBytesHeader, anchor.fLenHeader, header); RNTupleDecompressor::Unzip(footer_zip, anchor.fNBytesFooter, anchor.fLenFooter, footer); // Deserialize header+footer RNTupleDescriptorBuilder desc_builder; RNTupleSerializer::DeserializeHeader(header, anchor.fLenHeader, desc_builder); RNTupleSerializer::DeserializeFooter(footer, anchor.fLenFooter, desc_builder); RNTupleDescriptor descriptor = desc_builder.MoveDescriptor(); for (const RClusterGroupDescriptor &cgdesc : descriptor.GetClusterGroupIterable()) { u64 arena_start = arena_pos(scratch.arena); // Read page list u64 page_list_zip_size = cgdesc.GetPageListLocator().fBytesOnStorage; u64 page_list_seek = cgdesc.GetPageListLocator().GetPosition(); u8 *page_list_zip = arena_push_contiguous(scratch.arena, page_list_zip_size); reader.ReadBuffer(page_list_zip, page_list_zip_size, page_list_seek); // Decompress page list u64 page_list_len = cgdesc.GetPageListLength(); u8 *page_list = arena_push_contiguous(scratch.arena, page_list_len); RNTupleDecompressor::Unzip(page_list_zip, page_list_zip_size, page_list_len, page_list); // Deserialize page list DescriptorId_t cluster_grpid = cgdesc.GetId(); RNTupleSerializer::DeserializePageList(page_list, page_list_len, cluster_grpid, descriptor); arena_pop_to(scratch.arena, arena_start); } return descriptor; } internal void gather_metadata(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info, RNTuple_Data &rndata) { using namespace ROOT::Experimental; using namespace ROOT::Experimental::Internal; RNTupleDescriptor descriptor = create_descriptor(arena, reader, info); u64 n_pages = 0; u64 n_elems = 0; Page_Info_Node *pinfo_head = nullptr, *pinfo_tail = nullptr; Page_Info_Node *last_inserted_pinfo = nullptr; fprintf(stderr, "Loading pages...\n"); chr::time_point start_t = chr::high_resolution_clock::now(); // for all clusters, gather page metadata for (const RClusterDescriptor &cluster_desc : descriptor.GetClusterIterable()) { for (const RClusterDescriptor::RColumnRange &col_range : cluster_desc.GetColumnRangeIterable()) { // insert page infos sorted by byte range // @Speed: this is slow! speed it up! const auto &page_range = cluster_desc.GetPageRange(col_range.fPhysicalColumnId); for (const auto &page_info : page_range.fPageInfos) { Page_Info_Node *pinfo = arena_push(arena); pinfo->range.start = page_info.fLocator.GetPosition(); pinfo->range.len = page_info.fLocator.fBytesOnStorage; pinfo->n_elems = page_info.fNElements; if (!pinfo_head) { // first node inserted assert(!pinfo_tail); pinfo_head = pinfo_tail = pinfo; } else if (pinfo->range.start >= pinfo_tail->range.end()) { // after tail pinfo_tail->next = pinfo; pinfo_tail = pinfo; } else if (pinfo->range.end() <= pinfo_head->range.start) { // before head pinfo->next = pinfo_head; pinfo_head = pinfo; } else if (last_inserted_pinfo && pinfo->range.start == last_inserted_pinfo->range.end()) { // common case: insert after previous pinfo->next = last_inserted_pinfo->next; last_inserted_pinfo->next = pinfo; } else for (Page_Info_Node *node = pinfo_head->next, *prev = pinfo_head; node; prev = node, node = node->next) { if (pinfo->range.end() <= node->range.start) { prev->next = pinfo; pinfo->next = node; break; } } last_inserted_pinfo = pinfo; ++n_pages; n_elems += page_info.fNElements; } } } chr::time_point end_t = chr::high_resolution_clock::now(); u64 time_spent_ms = chr::duration_cast(end_t - start_t).count(); fprintf(stderr, "Loaded %lu pages in %lu ms.\nGenerating groups...\n", n_pages, time_spent_ms); // Create page groups and chunks. // Each page group is a grouping of GROUP_SIZE page infos whose range is equal to the combined ranges // of its components. It is an acceleration structure used to more quickly find the correct page info // that an offset belongs to. // A page chunk is a grouping of adjacent page groups, used to quickly determine if an offset is part // of a page or not. const u64 GROUP_SIZE = 100; Page_Info_Group *groups = arena_push_array_nozero(arena, n_pages / GROUP_SIZE + 1); static const Page_Info_Node invalid_last = { nullptr, { 0, 0 }, 0 }; const Page_Info_Node *last = &invalid_last; Page_Info_Chunk *chunks_head = nullptr, *chunks_tail = nullptr; u64 idx = 0; u64 n_groups = 0; u64 n_chunks = 0; for (Page_Info_Node *pinfo = pinfo_head; pinfo; pinfo = pinfo->next) { assert(last->range.end() <= pinfo->range.start); last = pinfo; if (idx++ % GROUP_SIZE != 0) continue; // Create a new group every GROUP_SIZE page infos Page_Info_Group &cur_group = groups[n_groups]; cur_group.first = pinfo; cur_group.range.start = pinfo->range.start; if (n_groups > 0) { Page_Info_Group &prev_group = groups[n_groups - 1]; prev_group.range.len = cur_group.range.start - prev_group.range.start; } assert((!chunks_head) == (n_groups == 0)); if (!chunks_head) { assert(!chunks_tail); chunks_head = chunks_tail = arena_push(arena); chunks_head->range.start = cur_group.range.start; chunks_head->first_group = n_groups; ++n_chunks; } else if (groups[n_groups - 1].range.end() != cur_group.range.start) { // close current chunk Page_Info_Group &prev_group = groups[n_groups - 1]; chunks_tail->range.len = (prev_group.range.end() - chunks_tail->range.start); // open new chunk Page_Info_Chunk *chunk = arena_push(arena); chunk->range.start = cur_group.range.start; chunk->first_group = n_groups; chunks_tail->next = chunk; chunks_tail = chunk; ++n_chunks; } ++n_groups; } if (n_groups) { Page_Info_Group &last_group = groups[n_groups - 1]; last_group.range.len = last->range.end() - last_group.range.start; chunks_tail->range.len = last->range.end() - chunks_tail->range.start; } fprintf(stderr, "Generated %lu groups and %lu chunks.\n", n_groups, n_chunks); assert(!chunks_tail->next); assert(!pinfo_tail->next); rndata.pages = pinfo_head; rndata.page_groups = groups; rndata.n_page_groups = n_groups; rndata.page_chunks = chunks_head; rndata.n_page_chunks = n_chunks; rndata.n_pages = n_pages; rndata.n_elems = n_elems; } internal RNTuple_Data get_rntuple_data(Arena *arena, const char *fname, const char *ntpl_name) { RNTuple_Data rndata = {}; // TODO: proper error handling RMicroFileReader file_reader { fname }; RNTuple_File_Info file_info = file_reader.GetNTupleProper(ntpl_name); if (!file_info.failed) { rndata.version.epoch = file_info.anchor.fVersionEpoch; rndata.version.major = file_info.anchor.fVersionMajor; rndata.version.minor = file_info.anchor.fVersionMinor; rndata.version.patch = file_info.anchor.fVersionPatch; rndata.rng_header.start = file_info.anchor.fSeekHeader; rndata.rng_header.len = file_info.anchor.fNBytesHeader; rndata.rng_footer.start = file_info.anchor.fSeekFooter; rndata.rng_footer.len = file_info.anchor.fNBytesFooter; rndata.rng_anchor.start = file_info.anchor_seek; rndata.rng_anchor.len = file_info.anchor_nbytes; rndata.rng_anchor_key.start = file_info.anchor_key_seek; rndata.rng_anchor_key.len = file_info.anchor_key_nbytes; rndata.rblob_header_size = file_info.rblob_key_header_nbytes; rndata.root_file_header_size = file_info.tfile_header_nbytes; gather_metadata(arena, file_reader, file_info, rndata); } return rndata; }