rntviewer/src/rntuple.cpp
2024-10-31 11:39:59 +01:00

539 lines
20 KiB
C++

internal
String8 rntuple_description(Arena *arena, const ROOT::Experimental::RNTuple &ntuple)
{
String8 desc = push_str8f(arena, "version %u.%u.%u.%u",
ntuple.GetVersionEpoch(),
ntuple.GetVersionMajor(),
ntuple.GetVersionMinor(),
ntuple.GetVersionPatch());
return desc;
}
internal
ROOT::Experimental::RNTupleDescriptor create_descriptor(Arena *arena, const TFile_Data &tfile_data,
ROOT::Experimental::Internal::RMiniFileReader &reader)
{
using namespace ROOT::Experimental;
using namespace ROOT::Experimental::Internal;
Temp scratch = scratch_begin(&arena, 1);
defer { scratch_end(scratch); };
const RNTuple &anchor = tfile_data.rntuple_anchor;
// Read compressed header+footer
u8 *header_zip = arena_push_array_nozero<u8>(scratch.arena, anchor.GetNBytesHeader());
u8 *footer_zip = arena_push_array_nozero<u8>(scratch.arena, anchor.GetNBytesFooter());
reader.ReadBuffer(header_zip, anchor.GetNBytesHeader(), anchor.GetSeekHeader());
reader.ReadBuffer(footer_zip, anchor.GetNBytesFooter(), anchor.GetSeekFooter());
// Decompress header+footer
u8 *header = arena_push_array_nozero<u8>(scratch.arena, anchor.GetLenHeader());
u8 *footer = arena_push_array_nozero<u8>(scratch.arena, anchor.GetLenFooter());
RNTupleDecompressor::Unzip(header_zip, anchor.GetNBytesHeader(), anchor.GetLenHeader(), header);
RNTupleDecompressor::Unzip(footer_zip, anchor.GetNBytesFooter(), anchor.GetLenFooter(), footer);
// Deserialize header+footer
RNTupleDescriptorBuilder desc_builder;
try {
RNTupleSerializer::DeserializeHeader(header, anchor.GetLenHeader(), desc_builder);
RNTupleSerializer::DeserializeFooter(footer, anchor.GetLenFooter(), desc_builder);
} catch (...) {
fprintf(stderr, "Failed to deserialize header/footer!\n");
}
RNTupleDescriptor descriptor = desc_builder.MoveDescriptor();
for (const RClusterGroupDescriptor &cgdesc : descriptor.GetClusterGroupIterable()) {
u64 arena_start = arena_pos(scratch.arena);
// Read page list
u64 page_list_zip_size = cgdesc.GetPageListLocator().fBytesOnStorage;
u64 page_list_seek = cgdesc.GetPageListLocator().GetPosition<u64>();
u8 *page_list_zip = arena_push_array_nozero<u8>(scratch.arena, page_list_zip_size);
reader.ReadBuffer(page_list_zip, page_list_zip_size, page_list_seek);
// Decompress page list
u64 page_list_len = cgdesc.GetPageListLength();
u8 *page_list = arena_push_array_nozero<u8>(scratch.arena, page_list_len);
RNTupleDecompressor::Unzip(page_list_zip, page_list_zip_size, page_list_len, page_list);
// Deserialize page list
DescriptorId_t cluster_grpid = cgdesc.GetId();
try {
RNTupleSerializer::DeserializePageList(page_list, page_list_len, cluster_grpid, descriptor);
} catch (...) {
fprintf(stderr, "Failed to deserialize page list!\n");
}
arena_pop_to(scratch.arena, arena_start);
}
return descriptor;
}
internal
String8 build_fully_qualified_field_name(Arena *arena, const ROOT::Experimental::RNTupleDescriptor &desc,
const ROOT::Experimental::RFieldDescriptor *field_desc)
{
String8_Node *sn = push_str8_node(arena, nullptr, "%s", field_desc->GetFieldName().c_str());
ROOT::Experimental::DescriptorId_t field_id = field_desc->GetParentId();
u64 size = sn->str.size;
while (field_id != ROOT::Experimental::kInvalidDescriptorId) {
field_desc = &desc.GetFieldDescriptor(field_id);
if (field_desc->GetFieldName().empty())
break;
sn = push_str8_node(arena, sn, "%s", field_desc->GetFieldName().c_str());
field_id = field_desc->GetParentId();
size += sn->str.size + 1;
}
String8 str;
str.str = arena_push_array_nozero<u8>(arena, size + 1);
str.str[size] = 0;
str.size = 0;
for (String8_Node *snode = sn; snode; snode = snode->prev) {
memcpy(str.str + str.size, snode->str.str, snode->str.size);
str.size += snode->str.size;
if (str.size < size)
str.str[str.size++] = '.';
}
return str;
}
internal
const char *get_column_type_name_from_ondisk_type(u16 type)
{
switch (type) {
case 0x01: return "Index64";
case 0x02: return "Index32";
case 0x03: return "Switch";
case 0x04: return "Byte";
case 0x05: return "Char";
case 0x06: return "Bit";
case 0x07: return "Real64";
case 0x08: return "Real32";
case 0x09: return "Real16";
case 0x16: return "Int64";
case 0x0A: return "UInt64";
case 0x17: return "Int32";
case 0x0B: return "UInt32";
case 0x18: return "Int16";
case 0x0C: return "UInt16";
case 0x19: return "Int8";
case 0x0D: return "UInt8";
case 0x0E: return "SplitIndex64";
case 0x0F: return "SplitIndex32";
case 0x10: return "SplitReal64";
case 0x11: return "SplitReal32";
case 0x1A: return "SplitInt64";
case 0x13: return "SplitUInt64";
case 0x1B: return "SplitInt32";
case 0x14: return "SplitUInt32";
case 0x1C: return "SplitInt16";
case 0x15: return "SplitUInt16";
case 0x1D: return "Real32Trunc";
case 0x1E: return "Real32Quant";
default: return "Unknown";
}
}
internal
b8 get_tfile_data(Arena *arena, const Inspected_File &file, String8 &ntpl_name, TFile_Data &tfile_data)
{
b8 success = walk_tkeys(arena, file.mem, file.size, tfile_data);
if (success) {
// If we weren't given a rntuple name, use the first one in the file (if any)
if (!ntpl_name.size && tfile_data.tkeys_data.rntuples) {
ntpl_name = tfile_data.tkeys_data.rntuples->head->str;
map_rntuple_rblobs(tfile_data.rntuple_anchor, tfile_data.tkeys_data);
}
}
return success;
}
internal
u64 calc_page_uncomp_size(const u8 *fmem, const Page_Info_Node *page_head)
{
TIMED_SCOPE();
u64 tot_size = 0;
for (const Page_Info_Node *pinfo = page_head; pinfo; pinfo = pinfo->next) {
const u8 *src = fmem + pinfo->range.start;
u32 uncomp_size = src[6] | (src[7] << 8) | (src[8] << 16);
tot_size += uncomp_size;
}
return tot_size;
}
internal
RNTuple_Data get_rntuple_data(Arena *arena, const Inspected_File &file, const TFile_Data &tfile_data, b8 extended_info)
{
RNTuple_Data rndata {};
if (!tfile_data.rntuple_anchor.GetNBytesHeader())
return rndata;
using namespace ROOT::Experimental;
using namespace ROOT::Experimental::Internal;
auto raw_file = ROOT::Internal::RRawFile::Create(file.name.c());
RMiniFileReader reader { raw_file.get() };
RNTupleDescriptor descriptor = create_descriptor(arena, tfile_data, reader);
// gather cluster groups metadata
Cluster_Group_Info *cluster_groups = arena_push_array_nozero<Cluster_Group_Info>(arena, descriptor.GetNClusterGroups());
u64 tot_page_list_size = 0;
u64 cg_idx = 0;
for (const RClusterGroupDescriptor &cg_desc : descriptor.GetClusterGroupIterable()) {
Cluster_Group_Info &cg_info = cluster_groups[cg_idx++];
// Page list locator
RNTupleLocator plist_locator = cg_desc.GetPageListLocator();
cg_info.rng_page_list.start = plist_locator.GetPosition<u64>();
cg_info.rng_page_list.len = plist_locator.fBytesOnStorage;
tot_page_list_size += plist_locator.fBytesOnStorage;
}
fprintf(stderr, "Loading pages...\n");
u64 n_pages = 0;
u64 n_duplicate_page_ranges = 0;
u64 n_elems = 0; // total number of page elements
u64 n_entries = 0;
u64 tot_page_comp_size = 0;
Page_Info_Node *pinfo_head = nullptr, *pinfo_tail = nullptr;
Page_Info_Node *last_inserted_pinfo = nullptr;
u64 n_clusters = 0;
chr::time_point start_t = chr::high_resolution_clock::now();
Cluster_Info *clusters = arena_push_array<Cluster_Info>(arena, descriptor.GetNActiveClusters());
// gather clusters and pages metadata
for (const RClusterDescriptor &cluster_desc : descriptor.GetClusterIterable()) {
++n_clusters;
n_entries += cluster_desc.GetNEntries();
for (const RClusterDescriptor::RColumnRange &col_range : cluster_desc.GetColumnRangeIterable()) {
const auto &col_descriptor = descriptor.GetColumnDescriptor(col_range.fPhysicalColumnId);
const char *elem_type_name = RColumnElementBase::GetTypeName(col_descriptor.GetType());
const auto &field_desc = descriptor.GetFieldDescriptor(col_descriptor.GetFieldId());
const String8 owner_field_name = build_fully_qualified_field_name(arena, descriptor, &field_desc);
// insert page infos sorted by byte range
const auto &page_range = cluster_desc.GetPageRange(col_range.fPhysicalColumnId);
for (const auto &page_info : page_range.fPageInfos) {
const u64 checksum_size = sizeof(u64);
Page_Info_Node *pinfo = arena_push<Page_Info_Node>(arena);
pinfo->range.start = page_info.fLocator.GetPosition<u64>();
pinfo->range.len = page_info.fLocator.fBytesOnStorage + (page_info.fHasChecksum) * checksum_size;
pinfo->n_elems = page_info.fHasChecksum ? -page_info.fNElements : page_info.fNElements;
// This assert is just here because we're using u32 for cluster_id to save memory.
// If in the future we get RNTuples with more than 4B clusters we can just change the type to u64.
assert(cluster_desc.GetId() <= UINT_MAX);
pinfo->cluster_id = cluster_desc.GetId();
pinfo->elem_type_name = push_str8f(arena, "%s", elem_type_name);
pinfo->owner_field_name = owner_field_name;
pinfo->bits_per_elem = col_descriptor.GetBitsOnStorage();
Cluster_Info &cluster = clusters[pinfo->cluster_id];
if (!cluster.first_page || pinfo->range.start < cluster.first_page->range.start) {
if (cluster.first_page)
cluster.first_page->is_first_in_cluster = false;
cluster.first_page = pinfo;
pinfo->is_first_in_cluster = true;
}
b8 duplicate = false;
if (UNLIKELY(!pinfo_head)) {
// first node inserted
assert(!pinfo_tail);
pinfo_head = pinfo_tail = pinfo;
} else if (pinfo->range.start >= pinfo_tail->range.end()) {
// after tail
pinfo_tail->next = pinfo;
pinfo->prev = pinfo_tail;
pinfo_tail = pinfo;
} else if (pinfo->range.end() <= pinfo_head->range.start) {
// before head
pinfo->next = pinfo_head;
pinfo_head->prev = pinfo;
pinfo_head = pinfo;
} else {
// Very commonly pages are already sorted either in increasing or decreasing order.
// By starting to look from the last inserted page we are very likely to find the
// proper slot immediately.
[[maybe_unused]] b8 inserted = false;
b8 pinfo_is_after_last = pinfo->range.start >= last_inserted_pinfo->range.end();
if (pinfo_is_after_last) {
for (Page_Info_Node *node = last_inserted_pinfo->next; node; node = node->next) {
if (pinfo->range.start == node->range.start) {
duplicate = true;
break;
}
// check if `pinfo` fits right before the node we're looking at
if (pinfo->range.end() <= node->range.start) {
Page_Info_Node *prev = node->prev;
if (UNLIKELY(!prev) || prev->range.end() <= pinfo->range.start) {
if (LIKELY(prev)) {
prev->next = pinfo;
pinfo->prev = prev;
}
node->prev = pinfo;
pinfo->next = node;
inserted = true;
break;
}
}
}
} else {
for (Page_Info_Node *node = last_inserted_pinfo; node; node = node->prev) {
if (pinfo->range.start == node->range.start) {
duplicate = true;
break;
}
// check if `pinfo` fits right before the node we're looking at
if (pinfo->range.end() <= node->range.start) {
Page_Info_Node *prev = node->prev;
if (UNLIKELY(!prev) || prev->range.end() <= pinfo->range.start) {
if (LIKELY(prev)) {
prev->next = pinfo;
pinfo->prev = prev;
}
node->prev = pinfo;
pinfo->next = node;
inserted = true;
break;
}
}
}
}
assert(inserted != duplicate);
}
if (duplicate) {
++n_duplicate_page_ranges;
continue;
}
last_inserted_pinfo = pinfo;
++n_pages;
tot_page_comp_size += pinfo->range.len;
n_elems += page_info.fNElements;
}
}
}
chr::time_point end_t = chr::high_resolution_clock::now();
u64 time_spent_ms = chr::duration_cast<chr::milliseconds>(end_t - start_t).count();
fprintf(stderr, "Loaded %" PRIu64 " pages in %" PRIu64 " ms (%" PRIu64 " duplicates).\nGenerating groups...\n",
n_pages, time_spent_ms, n_duplicate_page_ranges);
// Create page groups and chunks.
// Each page group is a grouping of GROUP_SIZE page infos whose range is equal to the combined ranges
// of its components. It is an acceleration structure used to more quickly find the correct page info
// that an offset belongs to.
// A page chunk is a grouping of adjacent pages, used to quickly determine if an offset is part
// of a page or not.
Page_Info_Group *groups = nullptr;
Page_Info_Chunk *chunks_head = nullptr, *chunks_tail = nullptr;
u64 n_groups = 0;
u64 n_chunks = 0;
u64 idx = 0;
// NOTE: pinfo_head may be null if we failed to load any page (which may happen e.g. if the rntuple
// is corrupted)
if (pinfo_head) {
const u64 GROUP_SIZE = 500;
groups = arena_push_array_nozero<Page_Info_Group>(arena, n_pages / GROUP_SIZE + 1);
n_groups = 1;
groups->first = pinfo_head;
groups->range.start = pinfo_head->range.start;
chunks_head = arena_push<Page_Info_Chunk>(arena);
chunks_tail = chunks_head;
chunks_head->range = pinfo_head->range;
n_chunks = 1;
idx = 1;
[[maybe_unused]] Page_Info_Node *prev = pinfo_head;
for (Page_Info_Node *pinfo = pinfo_head->next; pinfo; pinfo = pinfo->next) {
assert(prev->range.end() <= pinfo->range.start);
prev = pinfo;
pinfo->page_id = idx;
if (pinfo->range.start != chunks_tail->range.end()) {
// close current chunk and open new one
Page_Info_Chunk *chunk = arena_push<Page_Info_Chunk>(arena);
chunk->range.start = pinfo->range.start;
chunk->first_group = n_groups - 1;
chunks_tail->next = chunk;
chunks_tail = chunk;
++n_chunks;
}
chunks_tail->range.len += pinfo->range.len;
// while we're at it, set the first_page_idx information on the page's parent cluster
// Note that the first page won't update its cluster's `first_page_idx` (since we loop
// from idx = 1) but that's fine because that idx is by definition 0.
if (pinfo->is_first_in_cluster)
clusters[pinfo->cluster_id].first_page_idx = idx;
if (idx++ % GROUP_SIZE != 0)
continue;
// Create a new group every GROUP_SIZE page infos
Page_Info_Group &cur_group = groups[n_groups];
cur_group.first = pinfo;
cur_group.range.start = pinfo->range.start;
Page_Info_Group &prev_group = groups[n_groups - 1];
prev_group.range.len = cur_group.range.start - prev_group.range.start;
++n_groups;
}
}
// verify that we added all pages to chunks
assert(idx == n_pages);
if (n_groups) {
Page_Info_Group &last_group = groups[n_groups - 1];
last_group.range.len = pinfo_tail->range.end() - last_group.range.start;
}
fprintf(stderr, "Generated %" PRIu64 " groups and %" PRIu64 " chunks.\n", n_groups, n_chunks);
assert(!chunks_tail || !chunks_tail->next);
assert(!pinfo_tail || !pinfo_tail->next);
rndata.pages = pinfo_head;
rndata.page_groups = groups;
rndata.n_page_groups = n_groups;
rndata.page_chunks = chunks_head;
rndata.n_page_chunks = n_chunks;
rndata.n_pages = n_pages;
rndata.n_elems = n_elems;
rndata.tot_page_comp_size = tot_page_comp_size;
rndata.cluster_groups = cluster_groups;
rndata.n_cluster_groups = cg_idx;
rndata.tot_page_list_size = tot_page_list_size;
rndata.clusters = clusters;
rndata.n_clusters = n_clusters;
rndata.n_entries = n_entries;
if (extended_info)
rndata.tot_page_uncomp_size = calc_page_uncomp_size(file.mem, rndata.pages);
return rndata;
}
internal
Byte_Range get_section_range(const App_State &app, Section_Id sec)
{
return app.tfile_data.tkeys_data.sections[sec].range;
}
internal
Section find_section(App_State &app, u64 off, i64 hilite_cluster = -1)
{
const RNTuple_Data &rdata = app.rndata;
const TKeys_Data &tdata = app.tfile_data.tkeys_data;
Section sec {};
for (u32 i = 1; i < Sec_Page; ++i) {
sec = tdata.sections[i];
if (sec.range.start - sec.pre_size <= off && off < sec.range.end()) {
return sec;
}
}
u64 rblob_sz = tdata.sections[Sec_Page].pre_size;
/// Page fast lookup (relative to app.last_pinfo)
{
// fast case: `off` is in the same page info as previous `off`.
u64 pre_size = app.last_pinfo->is_first_in_cluster ? rblob_sz : 0;
if (app.last_pinfo->range.start - pre_size < off && off < app.last_pinfo->range.end()) {
sec.id = Sec_Page;
sec.range = app.last_pinfo->range;
sec.pre_size = pre_size;
sec.post_size = app.last_pinfo->checksum_size();
sec.highlighted = hilite_cluster >= 0 && app.last_pinfo->cluster_id == (u64)hilite_cluster;
sec.info = app.last_pinfo;
return sec;
}
// still fast case: `off is in the next page info as the previous.
if (app.last_pinfo->next) // don't check if it's checksum, since it's the first byte of the page
app.last_pinfo = app.last_pinfo->next;
if (app.last_pinfo) {
u64 pre_size = app.last_pinfo->is_first_in_cluster ? rblob_sz : 0;
if (app.last_pinfo->range.start - pre_size <= off && off < app.last_pinfo->range.end()) {
sec.id = Sec_Page;
sec.range = app.last_pinfo->range;
sec.pre_size = pre_size;
sec.post_size = app.last_pinfo->checksum_size();
sec.highlighted = hilite_cluster >= 0 && app.last_pinfo->cluster_id == (u64)hilite_cluster;
sec.info = app.last_pinfo;
return sec;
}
}
}
// @Speed
for (u64 cg_idx = 0; cg_idx < rdata.n_cluster_groups; ++cg_idx) {
Cluster_Group_Info &cg_info = rdata.cluster_groups[cg_idx];
if (cg_info.rng_page_list.start - rblob_sz <= off && off < cg_info.rng_page_list.end()) {
sec.id = Sec_Page_List;
sec.range = cg_info.rng_page_list;
sec.pre_size = rblob_sz;
sec.post_size = 8;
return sec;
}
}
// Slow page group lookup, ideally only done once per render when last_pinfo is invalid.
for (Page_Info_Chunk *chunk = rdata.page_chunks; chunk; chunk = chunk->next) {
// If we're at the start of a chunk, return a fake Sec_Page used to highlight the RBlob header bytes.
if (chunk->range.start - rblob_sz <= off && off < chunk->range.start) {
sec.id = Sec_Page;
sec.range = { chunk->range.start, 0 };
sec.pre_size = rblob_sz;
return sec;
}
if (chunk->range.start <= off && off < chunk->range.end()) {
for (u64 group_idx = chunk->first_group; group_idx < rdata.n_page_groups; ++group_idx) {
const Page_Info_Group &group = rdata.page_groups[group_idx];
if (off < group.range.start || off >= group.range.end())
continue;
for (Page_Info_Node *pinfo = group.first; pinfo; pinfo = pinfo->next) {
u64 pre_size = pinfo->is_first_in_cluster ? rblob_sz : 0;
if (pinfo->range.start - pre_size <= off && off < pinfo->range.end()) {
app.last_pinfo = pinfo;
sec.id = Sec_Page;
sec.range = pinfo->range;
sec.pre_size = pre_size;
sec.post_size = pinfo->checksum_size();
sec.highlighted = hilite_cluster >= 0 && pinfo->cluster_id == (u64)hilite_cluster;
sec.info = pinfo;
return sec;
}
}
}
fprintf(stderr, "Offset 0x%" PRIX64 " is in chunk 0x%" PRIX64 " - 0x%" PRIX64 ", but found in no page_info range!\n",
off, chunk->range.start, chunk->range.end());
assert(false);
}
}
return {};
}