[draft] wide metrics schema by mattmkim · Pull Request #6233 · quickwit-oss/quickwit

and others added 27 commits

March 27, 2026 10:13
…metrics index at startup
…uilder for dynamic schema discovery

Replace the fixed MetricDataPoint struct (with hardcoded tag_service,
tag_env, tag_datacenter, tag_region, tag_host fields, attributes VARIANT,
and resource_attributes VARIANT) with a fully dynamic representation:

  pub struct MetricDataPoint {
      pub metric_name: String,
      pub metric_type: MetricType,
      pub timestamp_secs: u64,
      pub value: f64,
      pub tags: HashMap<String, String>,
  }

All data-point attributes become string entries in tags. metric_unit and
start_timestamp_secs are stored as tags when present. The resource-level
service.name is stored under "service", with data-point attributes taking
precedence via entry().or_insert_with().

Resource attributes are dropped entirely.

Validation: data points with empty metric_name or zero timestamp_secs are
silently dropped. Cumulative-temporality Sum metrics produce a partial
rejection (incrementing num_rejected) rather than aborting the entire
request, so valid metrics in the same request are still ingested.

ArrowMetricsBatchBuilder is rewritten with a two-pass approach:
1. Schema discovery: scan all data points to collect the union of tag
   keys via BTreeSet<&str> (sorted, no per-key allocation until finish).
2. Array building: create one StringDictionaryBuilder<Int32Type> per
   tag column and populate it, appending null for missing tags.

The resulting RecordBatch has a fully dynamic schema:
  metric_name: Dictionary(Int32, Utf8) not null
  metric_type:  UInt8 not null
  timestamp_secs: UInt64 not null
  value: Float64 not null
  <tag keys alphabetically>: Dictionary(Int32, Utf8) nullable

No VARIANT columns. No fixed column count.
…ants

Remove the hardcoded 14-field ParquetField enum. Replace with:

  REQUIRED_FIELDS: &[&str] = &["metric_name", "metric_type", "timestamp_secs", "value"]
  SORT_ORDER: &[&str] = &["metric_name", "service", "env", "datacenter", "region", "host", "timestamp_secs"]

  fn validate_required_fields(schema: &Schema) -> Result<(), String>
  fn required_field_type(name: &str) -> Option<DataType>

ParquetSchema::new() is replaced by ParquetSchema::from_arrow_schema(SchemaRef),
which wraps any schema rather than building a fixed one.

SORT_ORDER columns not present in a batch are silently skipped at write
time, so batches with arbitrary tag subsets sort correctly on whatever
columns they do have.
…hema

ParquetWriter no longer holds a fixed ParquetSchema. sort_batch() now
iterates SORT_ORDER and filter_maps to only the column indices that exist
in the batch, so any subset of sort columns works without error.

validate_required_fields() is called before writing to ensure the 4
required fields are present with correct types.

ParquetWriterConfig::to_writer_properties() takes &ArrowSchema so it can:
- Enable dictionary encoding on all Dictionary(Int32, Utf8) columns found
  in the batch, rather than a hardcoded list
- Enable bloom filters on metric_name and any SORT_ORDER column present,
  with per-column NDV estimates
- Build SortingColumn metadata from the actual column indices in the batch
…accumulation

ParquetIngestProcessor no longer holds a fixed ParquetSchema. Validation
now checks only that the 4 required fields exist with correct types via
validate_required_fields(), accepting any additional columns.

ParquetBatchAccumulator gains incremental union schema tracking:
- union_fields: BTreeMap<String, (DataType, bool)> updated on each
  add_batch() call (O(fields) per batch, not O(batches) at flush)
- flush_internal() builds a union SchemaRef from union_fields, then
  aligns each pending batch to it via align_batch_to_schema(), which
  inserts new_null_array() for columns missing from a given batch
- union_fields is reset after each flush so each split is independent

new_null_array() is O(1) — a null bitmap, not a copy of data. The dominant
cost remains concat_batches(), which is unavoidable regardless of approach.

BTreeMap ensures deterministic column ordering across flushes.
ParquetSplitWriter::new() no longer takes a ParquetSchema argument.
Column indices are now resolved by name at write time:

- extract_time_range(): schema.index_of("timestamp_secs").unwrap()
  (safe: validate_required_fields() in write_to_file() guarantees presence)
- extract_metric_names(): schema.index_of("metric_name").unwrap()
- extract_service_names(): schema.index_of("service").ok()
  (optional: not all batches carry a service column)
Remove all ParquetSchema::new() and ParquetSplitWriter::new(schema, ...) calls.
Constructors that no longer need a schema:
- ParquetIngestProcessor::new()  (no schema arg)
- ParquetSplitWriter::new(config, path)  (no schema arg)
- ParquetDocProcessor creates an empty Arrow schema for the checkpoint-only
  empty batch path (no data, just advancing the offset)

Test batches are updated from the old 14-column VARIANT schema to simple
4-field dynamic schemas matching the new pipeline output.
11 copies of create_test_batch / create_dict_array / create_nullable_dict_array
were spread across the two crates. Consolidate into two shared modules:

quickwit-parquet-engine/src/test_helpers.rs (#[cfg(test)])
  create_dict_array, create_nullable_dict_array
  create_test_batch_with_tags(num_rows, tags) -- canonical builder
  create_test_batch(num_rows)                  -- delegates with ["service","host"]

quickwit-indexing/src/actors/parquet_test_helpers.rs (#[cfg(test)])
  Same pair for the indexing actor tests (separate crate boundary)

split_writer.rs retains create_test_batch_with_options() since it needs
per-row control of metric names, timestamps, and service values for its
time-range and metadata extraction tests.

writer.rs retains create_nullable_dict_array(Option<&str>) (single-value
variant with a different signature used only in the sort test).

Base automatically changed from matthew.kim/parquet-engine-upstream to main

March 30, 2026 13:51