Skip to content

[Dataflow Streaming] Intern encoded tags across keys#36313

Merged
scwhittle merged 13 commits intoapache:masterfrom
arunpandianp:statetag
Oct 28, 2025
Merged

[Dataflow Streaming] Intern encoded tags across keys#36313
scwhittle merged 13 commits intoapache:masterfrom
arunpandianp:statetag

Conversation

@arunpandianp
Copy link
Copy Markdown
Contributor

@arunpandianp arunpandianp commented Sep 29, 2025

Intern encoded tags across keys reduces memory use, GC pressure and improves job performance.

The change also fixes a bug where tag collisions between system and user namespaces are not handled correctly by NamespacedTag class. Added a test conflictingUserAndSystemTags to cover the scenario.

#33578

Before:
image

After (~50% less number of objects in heap):
image

@arunpandianp
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable optimization by pre-encoding and interning state keys, which reduces redundant computations and memory overhead. The use of InternedByteString with an IdentityHashMap in the state cache is a clever approach. The refactoring is applied consistently across most of the relevant classes. I have identified one area where the refactoring appears to be incomplete (WindmillCombiningState) and have also provided a minor suggestion to improve type safety. Overall, this is a solid improvement to the state management logic.

@arunpandianp arunpandianp changed the title [wip] cache encoded key [Dataflow Streaming] Cache encoded tags across keys Oct 22, 2025
@arunpandianp arunpandianp marked this pull request as ready for review October 22, 2025 08:48
@arunpandianp
Copy link
Copy Markdown
Contributor Author

R: @scwhittle

@arunpandianp arunpandianp changed the title [Dataflow Streaming] Cache encoded tags across keys [Dataflow Streaming] Intern encoded tags across keys Oct 22, 2025
@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@arunpandianp
Copy link
Copy Markdown
Contributor Author

Run PreCommit Java

/** Entry in the state cache that stores a map of values. */
private static class StateCacheEntry implements Weighted {
private final HashMap<NamespacedTag<?>, WeightedValue<?>> values;
private final IdentityHashMap<InternedByteString, WeightedValue<?>> values;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this might have collisions between types. For example if we have a StateTag StateTag for the same "tag", I believe the bytestring for both would be the same and we could collide in the cache.

Can you add a test covering that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example if we have a StateTag StateTag for the same "tag", I believe the bytestring for both would be the same and we could collide in the cache.

I don't understand this, could you share examples?

Replacing NamespacedTag with InternedByteString should be same except that InternedByteString also includes the system/user prefix. So I think it should not break anything.

Copy link
Copy Markdown
Contributor

@scwhittle scwhittle Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry that was indeed a confusing comment :)

I meant for example a StateTag representing a Value and a StateTag representing a Bag. Both could be in the same namespace, DoFn, and have the same string tag. At the work item commit level we are putting this ByteString into the correct value/bag proto as the tag field. But the actual contents of the field will be the same ( something like "namespace:tag") and thus will be the same InternedByteString. That is good for reducing objects/memory but means that we can't use just the InternedByteString as the cache key here because those two types will collide.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, having two different states with same tag/StateId inside the same stateFamily is not allowed.

The logic in

"Duplicate %s \"%s\", used on both of [%s] and [%s]",
should prevent user from using duplicate StateId in a DoFn.

The existing logic with NamespacedTag will also cause a collision if there are multiple StateTag with same tags.

@arunpandianp
Copy link
Copy Markdown
Contributor Author

Run PreCommit Java

@arunpandianp
Copy link
Copy Markdown
Contributor Author

Run PreCommit Java

2 similar comments
@arunpandianp
Copy link
Copy Markdown
Contributor Author

Run PreCommit Java

@arunpandianp
Copy link
Copy Markdown
Contributor Author

Run PreCommit Java

@scwhittle
Copy link
Copy Markdown
Contributor

Run Java PreCommit

@scwhittle scwhittle merged commit 0c10658 into apache:master Oct 28, 2025
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants