261 lines
10 KiB
C++
261 lines
10 KiB
C++
internal
|
|
String8 rntuple_description(Arena *arena, const RNTuple_Data &ntuple)
|
|
{
|
|
String8 desc = push_str8f(arena, "version %u.%u.%u.%u",
|
|
ntuple.version.epoch,
|
|
ntuple.version.major,
|
|
ntuple.version.minor,
|
|
ntuple.version.patch);
|
|
return desc;
|
|
}
|
|
|
|
internal
|
|
ROOT::Experimental::RNTupleDescriptor create_descriptor(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info)
|
|
{
|
|
using namespace ROOT::Experimental;
|
|
using namespace ROOT::Experimental::Internal;
|
|
|
|
Temp scratch = temp_begin(arena);
|
|
defer { temp_end(scratch); };
|
|
|
|
const RNTuple_Anchor &anchor = info.anchor;
|
|
|
|
// Read compressed header+footer
|
|
u8 *header_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesHeader);
|
|
u8 *footer_zip = arena_push_contiguous(scratch.arena, anchor.fNBytesFooter);
|
|
reader.ReadBuffer(header_zip, anchor.fNBytesHeader, anchor.fSeekHeader);
|
|
reader.ReadBuffer(footer_zip, anchor.fNBytesFooter, anchor.fSeekFooter);
|
|
|
|
// Decompress header+footer
|
|
u8 *header = arena_push_contiguous(scratch.arena, anchor.fLenHeader);
|
|
u8 *footer = arena_push_contiguous(scratch.arena, anchor.fLenFooter);
|
|
RNTupleDecompressor::Unzip(header_zip, anchor.fNBytesHeader, anchor.fLenHeader, header);
|
|
RNTupleDecompressor::Unzip(footer_zip, anchor.fNBytesFooter, anchor.fLenFooter, footer);
|
|
|
|
// Deserialize header+footer
|
|
RNTupleDescriptorBuilder desc_builder;
|
|
RNTupleSerializer::DeserializeHeader(header, anchor.fLenHeader, desc_builder);
|
|
RNTupleSerializer::DeserializeFooter(footer, anchor.fLenFooter, desc_builder);
|
|
|
|
RNTupleDescriptor descriptor = desc_builder.MoveDescriptor();
|
|
for (const RClusterGroupDescriptor &cgdesc : descriptor.GetClusterGroupIterable()) {
|
|
u64 arena_start = arena_pos(scratch.arena);
|
|
|
|
// Read page list
|
|
u64 page_list_zip_size = cgdesc.GetPageListLocator().fBytesOnStorage;
|
|
u64 page_list_seek = cgdesc.GetPageListLocator().GetPosition<u64>();
|
|
u8 *page_list_zip = arena_push_contiguous(scratch.arena, page_list_zip_size);
|
|
reader.ReadBuffer(page_list_zip, page_list_zip_size, page_list_seek);
|
|
|
|
// Decompress page list
|
|
u64 page_list_len = cgdesc.GetPageListLength();
|
|
u8 *page_list = arena_push_contiguous(scratch.arena, page_list_len);
|
|
RNTupleDecompressor::Unzip(page_list_zip, page_list_zip_size, page_list_len, page_list);
|
|
|
|
// Deserialize page list
|
|
DescriptorId_t cluster_grpid = cgdesc.GetId();
|
|
RNTupleSerializer::DeserializePageList(page_list, page_list_len, cluster_grpid, descriptor);
|
|
|
|
arena_pop_to(scratch.arena, arena_start);
|
|
}
|
|
|
|
return descriptor;
|
|
}
|
|
|
|
internal
|
|
void gather_metadata(Arena *arena, RMicroFileReader &reader, const RNTuple_File_Info &info, RNTuple_Data &rndata)
|
|
{
|
|
using namespace ROOT::Experimental;
|
|
using namespace ROOT::Experimental::Internal;
|
|
|
|
RNTupleDescriptor descriptor = create_descriptor(arena, reader, info);
|
|
|
|
// gather cluster groups metadata
|
|
Cluster_Group_Info *cluster_groups = arena_push_array_nozero<Cluster_Group_Info>(arena, descriptor.GetNClusterGroups());
|
|
u64 tot_page_list_size = 0;
|
|
u64 cg_idx = 0;
|
|
for (const RClusterGroupDescriptor &cg_desc : descriptor.GetClusterGroupIterable()) {
|
|
Cluster_Group_Info &cg_info = cluster_groups[cg_idx++];
|
|
|
|
// Page list locator
|
|
RNTupleLocator plist_locator = cg_desc.GetPageListLocator();
|
|
cg_info.rng_page_list.start = plist_locator.GetPosition<u64>();
|
|
cg_info.rng_page_list.len = plist_locator.fBytesOnStorage;
|
|
tot_page_list_size += plist_locator.fBytesOnStorage;
|
|
}
|
|
|
|
fprintf(stderr, "Loading pages...\n");
|
|
|
|
u64 n_pages = 0;
|
|
u64 n_elems = 0;
|
|
u64 tot_page_size = 0;
|
|
Page_Info_Node *pinfo_head = nullptr, *pinfo_tail = nullptr;
|
|
Page_Info_Node *last_inserted_pinfo = nullptr;
|
|
|
|
Cluster_Info_Node *clinfo_head = nullptr, *clinfo_tail = nullptr;
|
|
u64 n_clusters = 0;
|
|
|
|
chr::time_point start_t = chr::high_resolution_clock::now();
|
|
u64 n_slow = 0;
|
|
|
|
// gather clusters and pages metadata
|
|
for (const RClusterDescriptor &cluster_desc : descriptor.GetClusterIterable()) {
|
|
if (!clinfo_head) {
|
|
clinfo_head = clinfo_tail = arena_push<Cluster_Info_Node>(arena);
|
|
} else {
|
|
Cluster_Info_Node *clinfo = arena_push<Cluster_Info_Node>(arena);
|
|
clinfo_tail->next = clinfo;
|
|
clinfo_tail = clinfo;
|
|
}
|
|
++n_clusters;
|
|
|
|
for (const RClusterDescriptor::RColumnRange &col_range : cluster_desc.GetColumnRangeIterable()) {
|
|
// insert page infos sorted by byte range
|
|
// @Speed: this is slow! speed it up!
|
|
const auto &page_range = cluster_desc.GetPageRange(col_range.fPhysicalColumnId);
|
|
for (const auto &page_info : page_range.fPageInfos) {
|
|
const u64 checksum_size = sizeof(u64);
|
|
Page_Info_Node *pinfo = arena_push<Page_Info_Node>(arena);
|
|
pinfo->range.start = page_info.fLocator.GetPosition<u64>();
|
|
pinfo->range.len = page_info.fLocator.fBytesOnStorage + (page_info.fHasChecksum) * checksum_size;
|
|
pinfo->n_elems = page_info.fHasChecksum ? -page_info.fNElements : page_info.fNElements;
|
|
|
|
if (!pinfo_head) {
|
|
// first node inserted
|
|
assert(!pinfo_tail);
|
|
pinfo_head = pinfo_tail = pinfo;
|
|
} else if (pinfo->range.start >= pinfo_tail->range.end()) {
|
|
// after tail
|
|
pinfo_tail->next = pinfo;
|
|
pinfo_tail = pinfo;
|
|
} else if (pinfo->range.end() <= pinfo_head->range.start) {
|
|
// before head
|
|
pinfo->next = pinfo_head;
|
|
pinfo_head = pinfo;
|
|
} else if (last_inserted_pinfo && pinfo->range.start == last_inserted_pinfo->range.end()) {
|
|
// common case: insert after previous
|
|
pinfo->next = last_inserted_pinfo->next;
|
|
last_inserted_pinfo->next = pinfo;
|
|
} else for (Page_Info_Node *node = pinfo_head->next, *prev = pinfo_head; node; prev = node, node = node->next) {
|
|
if (pinfo->range.end() <= node->range.start) {
|
|
prev->next = pinfo;
|
|
pinfo->next = node;
|
|
++n_slow;
|
|
break;
|
|
}
|
|
}
|
|
|
|
last_inserted_pinfo = pinfo;
|
|
|
|
++n_pages;
|
|
tot_page_size += pinfo->range.len;
|
|
n_elems += page_info.fNElements;
|
|
}
|
|
}
|
|
}
|
|
|
|
chr::time_point end_t = chr::high_resolution_clock::now();
|
|
u64 time_spent_ms = chr::duration_cast<chr::milliseconds>(end_t - start_t).count();
|
|
|
|
fprintf(stderr, "Loaded %lu pages in %lu ms (%lu took the slow path).\nGenerating groups...\n", n_pages, n_slow, time_spent_ms);
|
|
|
|
// Create page groups and chunks.
|
|
// Each page group is a grouping of GROUP_SIZE page infos whose range is equal to the combined ranges
|
|
// of its components. It is an acceleration structure used to more quickly find the correct page info
|
|
// that an offset belongs to.
|
|
// A page chunk is a grouping of adjacent pages, used to quickly determine if an offset is part
|
|
// of a page or not.
|
|
assert(pinfo_head);
|
|
const u64 GROUP_SIZE = 500;
|
|
Page_Info_Group *groups = arena_push_array_nozero<Page_Info_Group>(arena, n_pages / GROUP_SIZE + 1);
|
|
u64 n_groups = 1;
|
|
groups->first = pinfo_head;
|
|
groups->range.start = pinfo_head->range.start;
|
|
|
|
Page_Info_Chunk *chunks_head = arena_push<Page_Info_Chunk>(arena);
|
|
Page_Info_Chunk *chunks_tail = chunks_head;
|
|
chunks_head->range = pinfo_head->range;
|
|
u64 n_chunks = 1;
|
|
|
|
u64 idx = 1;
|
|
|
|
for (Page_Info_Node *pinfo = pinfo_head->next; pinfo; pinfo = pinfo->next) {
|
|
if (pinfo->range.start != chunks_tail->range.end()) {
|
|
// close current chunk and open new one
|
|
Page_Info_Chunk *chunk = arena_push<Page_Info_Chunk>(arena);
|
|
chunk->range.start = pinfo->range.start;
|
|
chunk->first_group = n_groups - 1;
|
|
chunks_tail->next = chunk;
|
|
chunks_tail = chunk;
|
|
++n_chunks;
|
|
}
|
|
chunks_tail->range.len += pinfo->range.len;
|
|
|
|
if (idx++ % GROUP_SIZE != 0)
|
|
continue;
|
|
|
|
// Create a new group every GROUP_SIZE page infos
|
|
|
|
Page_Info_Group &cur_group = groups[n_groups];
|
|
cur_group.first = pinfo;
|
|
cur_group.range.start = pinfo->range.start;
|
|
Page_Info_Group &prev_group = groups[n_groups - 1];
|
|
prev_group.range.len = cur_group.range.start - prev_group.range.start;
|
|
|
|
++n_groups;
|
|
}
|
|
|
|
Page_Info_Group &last_group = groups[n_groups - 1];
|
|
last_group.range.len = pinfo_tail->range.end() - last_group.range.start;
|
|
|
|
fprintf(stderr, "Generated %lu groups and %lu chunks.\n", n_groups, n_chunks);
|
|
|
|
assert(!chunks_tail->next);
|
|
assert(!pinfo_tail->next);
|
|
assert(!clinfo_tail->next);
|
|
|
|
rndata.pages = pinfo_head;
|
|
rndata.page_groups = groups;
|
|
rndata.n_page_groups = n_groups;
|
|
rndata.page_chunks = chunks_head;
|
|
rndata.n_page_chunks = n_chunks;
|
|
rndata.n_pages = n_pages;
|
|
rndata.n_elems = n_elems;
|
|
rndata.tot_page_size = tot_page_size;
|
|
rndata.cluster_groups = cluster_groups;
|
|
rndata.n_cluster_groups = cg_idx;
|
|
rndata.tot_page_list_size = tot_page_list_size;
|
|
rndata.clusters = clinfo_head;
|
|
rndata.n_clusters = n_clusters;
|
|
}
|
|
|
|
internal
|
|
RNTuple_Data get_rntuple_data(Arena *arena, const char *fname, const char *ntpl_name)
|
|
{
|
|
RNTuple_Data rndata = {};
|
|
|
|
// TODO: proper error handling
|
|
RMicroFileReader file_reader { fname };
|
|
RNTuple_File_Info file_info = file_reader.GetNTupleProper(ntpl_name);
|
|
if (!file_info.failed) {
|
|
rndata.version.epoch = file_info.anchor.fVersionEpoch;
|
|
rndata.version.major = file_info.anchor.fVersionMajor;
|
|
rndata.version.minor = file_info.anchor.fVersionMinor;
|
|
rndata.version.patch = file_info.anchor.fVersionPatch;
|
|
rndata.rng_header.start = file_info.anchor.fSeekHeader;
|
|
rndata.rng_header.len = file_info.anchor.fNBytesHeader;
|
|
rndata.rng_footer.start = file_info.anchor.fSeekFooter;
|
|
rndata.rng_footer.len = file_info.anchor.fNBytesFooter;
|
|
rndata.rng_anchor.start = file_info.anchor_seek;
|
|
rndata.rng_anchor.len = file_info.anchor_nbytes;
|
|
rndata.rng_anchor_key.start = file_info.anchor_key_seek;
|
|
rndata.rng_anchor_key.len = file_info.anchor_key_nbytes;
|
|
rndata.rblob_header_size = file_info.rblob_key_header_nbytes;
|
|
rndata.root_file_header_size = file_info.tfile_header_nbytes;
|
|
|
|
gather_metadata(arena, file_reader, file_info, rndata);
|
|
}
|
|
|
|
return rndata;
|
|
}
|
|
|