[Dataflow Streaming] Intern encoded tags across keys#36313
[Dataflow Streaming] Intern encoded tags across keys#36313scwhittle merged 13 commits intoapache:masterfrom
Conversation
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable optimization by pre-encoding and interning state keys, which reduces redundant computations and memory overhead. The use of InternedByteString with an IdentityHashMap in the state cache is a clever approach. The refactoring is applied consistently across most of the relevant classes. I have identified one area where the refactoring appears to be incomplete (WindmillCombiningState) and have also provided a minor suggestion to improve type safety. Overall, this is a solid improvement to the state management logic.
|
R: @scwhittle |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
Run PreCommit Java |
| /** Entry in the state cache that stores a map of values. */ | ||
| private static class StateCacheEntry implements Weighted { | ||
| private final HashMap<NamespacedTag<?>, WeightedValue<?>> values; | ||
| private final IdentityHashMap<InternedByteString, WeightedValue<?>> values; |
There was a problem hiding this comment.
It seems like this might have collisions between types. For example if we have a StateTag StateTag for the same "tag", I believe the bytestring for both would be the same and we could collide in the cache.
Can you add a test covering that?
There was a problem hiding this comment.
For example if we have a StateTag StateTag for the same "tag", I believe the bytestring for both would be the same and we could collide in the cache.
I don't understand this, could you share examples?
Replacing NamespacedTag with InternedByteString should be same except that InternedByteString also includes the system/user prefix. So I think it should not break anything.
There was a problem hiding this comment.
sorry that was indeed a confusing comment :)
I meant for example a StateTag representing a Value and a StateTag representing a Bag. Both could be in the same namespace, DoFn, and have the same string tag. At the work item commit level we are putting this ByteString into the correct value/bag proto as the tag field. But the actual contents of the field will be the same ( something like "namespace:tag") and thus will be the same InternedByteString. That is good for reducing objects/memory but means that we can't use just the InternedByteString as the cache key here because those two types will collide.
There was a problem hiding this comment.
IIUC, having two different states with same tag/StateId inside the same stateFamily is not allowed.
The logic in
The existing logic with NamespacedTag will also cause a collision if there are multiple StateTag with same tags.
|
Run PreCommit Java |
|
Run PreCommit Java |
2 similar comments
|
Run PreCommit Java |
|
Run PreCommit Java |
|
Run Java PreCommit |
Intern encoded tags across keys reduces memory use, GC pressure and improves job performance.
The change also fixes a bug where tag collisions between system and user namespaces are not handled correctly by NamespacedTag class. Added a test conflictingUserAndSystemTags to cover the scenario.
#33578
Before:

After (~50% less number of objects in heap):
