Skip to content

[improve][metadata] Replace findByIndex with streaming scanByIndex (point + range)#25731

Merged
lhotari merged 2 commits into
apache:masterfrom
merlimat:mmerli/metadata-scan-by-index
May 11, 2026
Merged

[improve][metadata] Replace findByIndex with streaming scanByIndex (point + range)#25731
lhotari merged 2 commits into
apache:masterfrom
merlimat:mmerli/metadata-scan-by-index

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 9, 2026

Motivation

PIP-473 (metadata-driven transactions for scalable topics) needs to scan secondary indexes by range, not just point lookup — the idx:txn-by-deadline sweep finds open txns past their deadline (key <= now), and the GC sweep finds finalized txns past their retention window. The existing MetadataStore.findByIndex is point-only and returns a materialized List<GetResult>, which doesn't fit either use case.

This PR replaces findByIndex with a single streaming method that subsumes both shapes.

Modifications

API change on MetadataStore:

CompletableFuture<Void> scanByIndex(
    String scanPathPrefix, String indexName,
    String fromKeyInclusive, String toKeyInclusive,  // null = unbounded on that side
    Predicate<GetResult> fallbackFilter,
    ScanConsumer consumer,
    Set<Option> opts)
  • Both bounds are inclusive. Point lookup is fromKey == toKey == key. Half-open ranges use null. Listener shape matches scanChildren's existing ScanConsumer.
  • findByIndex is removed in the same PR — one method, no separate point-lookup variant. All call sites are migrated.

Oxia (native): client.list(start, end, ListOption.UseIndex(indexName)) returns matching primary keys; results are then fetched via storeGet and streamed to the consumer. Range translation: [scanPathPrefix/fromKey, scanPathPrefix/toKey + "~") — the trailing ~ sentinel widens Oxia's half-open range to include toKey for the secondary-key shapes callers use today (numeric timestamps, fixed-tag enums). Documented in code.

ZK / Memory / RocksDB compatibility layer in AbstractMetadataStore: lists children under scanPathPrefix, fetches each, streams those matching fallbackFilter. The fallback ignores fromKey/toKey — callers encode the range in fallbackFilter.

ScanConsumer.collectInto(List) static helper for callers that prefer the materialized-list shape on top of the streaming API.

Migrated callers:

  • ScalableTopicResources.findScalableTopicsAsync (broker)
  • MetadataStoreSecondaryIndexTest, MetadataCacheSecondaryIndexTest (added a small in-test findByIndex helper that wraps scanByIndex + collectInto)
  • DualMetadataStore wrapper

Verifying this change

  • All existing findByIndex* tests migrated to scanByIndex + collectInto and continue to pass on every backend (ZK, Memory, RocksDB, Oxia, MockZooKeeper).
  • New scanByIndexInclusiveRange and scanByIndexUnboundedFromKey cover the range-query shape end-to-end (zero-padded numeric secondary keys mirroring PIP-473's idx:txn-by-deadline).

Local results: full :pulsar-metadata:test runs cleanly modulo the recurring BookKeeper.AuditorLedgerCheckerTest.setUp BindException flake (port collision; unrelated to this change).

Does this pull request potentially affect one of the following parts:

  • The public MetadataStore API — findByIndex is removed and replaced with scanByIndex. All in-tree callers are migrated in this PR. External implementations of MetadataStore would need the same migration; the new method has a default that returns a not-supported failure, matching the previous default behavior.

Matching PR in forked repository

PR in forked repository: https://github.com/merlimat/pulsar/pull/new/mmerli/metadata-scan-by-index

merlimat added 2 commits May 9, 2026 13:05
…oint + range)

Drop MetadataStore.findByIndex (point-lookup, materialized List) in favor of
a single streaming scanByIndex that subsumes both point and range queries:

  CompletableFuture<Void> scanByIndex(
      String scanPathPrefix, String indexName,
      String fromKeyInclusive, String toKeyInclusive,  // null = unbounded
      Predicate<GetResult> fallbackFilter,
      ScanConsumer consumer,
      Set<Option> opts)

Both bounds are inclusive; point lookup is fromKey == toKey == key. Listener
shape mirrors scanChildren — onNext per match, then onCompleted or onError.

Oxia native: client.list with UseIndex over [scanPathPrefix/fromKey,
scanPathPrefix/toKey + "~") then storeGet for each primary key, streamed to
the consumer. The trailing "~" sentinel widens Oxia's half-open range to
include toKey for the secondary keys callers actually use today (numeric
timestamps, fixed-tag enums).

ZK / Memory / RocksDB compat layer in AbstractMetadataStore: list children
under scanPathPrefix, fetch each, stream those passing fallbackFilter. The
fallback ignores fromKey/toKey — callers encode the bounds in fallbackFilter.

PIP-473 needs the range-query shape for idx:txn-by-deadline (key <= now)
and idx:txn-by-final-state (key <= now - retention) sweeps.

Also: ScanConsumer.collectInto helper for callers that want a List<GetResult>
on top of the streaming API. Migrated ScalableTopicResources, the two test
classes, and the DualMetadataStore wrapper.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit 9c5ab38 into apache:master May 11, 2026
81 of 84 checks passed
@merlimat merlimat deleted the mmerli/metadata-scan-by-index branch May 11, 2026 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants