[improve][metadata] Replace findByIndex with streaming scanByIndex (point + range)#25731
Merged
Merged
Conversation
…oint + range)
Drop MetadataStore.findByIndex (point-lookup, materialized List) in favor of
a single streaming scanByIndex that subsumes both point and range queries:
CompletableFuture<Void> scanByIndex(
String scanPathPrefix, String indexName,
String fromKeyInclusive, String toKeyInclusive, // null = unbounded
Predicate<GetResult> fallbackFilter,
ScanConsumer consumer,
Set<Option> opts)
Both bounds are inclusive; point lookup is fromKey == toKey == key. Listener
shape mirrors scanChildren — onNext per match, then onCompleted or onError.
Oxia native: client.list with UseIndex over [scanPathPrefix/fromKey,
scanPathPrefix/toKey + "~") then storeGet for each primary key, streamed to
the consumer. The trailing "~" sentinel widens Oxia's half-open range to
include toKey for the secondary keys callers actually use today (numeric
timestamps, fixed-tag enums).
ZK / Memory / RocksDB compat layer in AbstractMetadataStore: list children
under scanPathPrefix, fetch each, stream those passing fallbackFilter. The
fallback ignores fromKey/toKey — callers encode the bounds in fallbackFilter.
PIP-473 needs the range-query shape for idx:txn-by-deadline (key <= now)
and idx:txn-by-final-state (key <= now - retention) sweeps.
Also: ScanConsumer.collectInto helper for callers that want a List<GetResult>
on top of the streaming API. Migrated ScalableTopicResources, the two test
classes, and the DualMetadataStore wrapper.
…ames with imports
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
PIP-473 (metadata-driven transactions for scalable topics) needs to scan secondary indexes by range, not just point lookup — the
idx:txn-by-deadlinesweep finds open txns past their deadline (key <= now), and the GC sweep finds finalized txns past their retention window. The existingMetadataStore.findByIndexis point-only and returns a materializedList<GetResult>, which doesn't fit either use case.This PR replaces
findByIndexwith a single streaming method that subsumes both shapes.Modifications
API change on
MetadataStore:fromKey == toKey == key. Half-open ranges usenull. Listener shape matchesscanChildren's existingScanConsumer.findByIndexis removed in the same PR — one method, no separate point-lookup variant. All call sites are migrated.Oxia (native):
client.list(start, end, ListOption.UseIndex(indexName))returns matching primary keys; results are then fetched viastoreGetand streamed to the consumer. Range translation:[scanPathPrefix/fromKey, scanPathPrefix/toKey + "~")— the trailing~sentinel widens Oxia's half-open range to includetoKeyfor the secondary-key shapes callers use today (numeric timestamps, fixed-tag enums). Documented in code.ZK / Memory / RocksDB compatibility layer in
AbstractMetadataStore: lists children underscanPathPrefix, fetches each, streams those matchingfallbackFilter. The fallback ignoresfromKey/toKey— callers encode the range infallbackFilter.ScanConsumer.collectInto(List)static helper for callers that prefer the materialized-list shape on top of the streaming API.Migrated callers:
ScalableTopicResources.findScalableTopicsAsync(broker)MetadataStoreSecondaryIndexTest,MetadataCacheSecondaryIndexTest(added a small in-testfindByIndexhelper that wrapsscanByIndex+collectInto)DualMetadataStorewrapperVerifying this change
findByIndex*tests migrated toscanByIndex+collectIntoand continue to pass on every backend (ZK, Memory, RocksDB, Oxia, MockZooKeeper).scanByIndexInclusiveRangeandscanByIndexUnboundedFromKeycover the range-query shape end-to-end (zero-padded numeric secondary keys mirroring PIP-473'sidx:txn-by-deadline).Local results: full
:pulsar-metadata:testruns cleanly modulo the recurringBookKeeper.AuditorLedgerCheckerTest.setUpBindExceptionflake (port collision; unrelated to this change).Does this pull request potentially affect one of the following parts:
MetadataStoreAPI —findByIndexis removed and replaced withscanByIndex. All in-tree callers are migrated in this PR. External implementations ofMetadataStorewould need the same migration; the new method has a default that returns a not-supported failure, matching the previous default behavior.Matching PR in forked repository
PR in forked repository: https://github.com/merlimat/pulsar/pull/new/mmerli/metadata-scan-by-index