Adds Multimap support to JAVA FnApi#36218
Conversation
…no guarantee on the order keys are returned. Also fixes a couple warnings from other FnApi tests.
… a missing FnApi state proto to get all of the entries of a multimap. This type of access is part of the state API (and supported by the non-portable harness), but was not present in the protos.
… turned my entire file in IntelliJ yellow.
Summary of ChangesHello @acrites, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Java FnApi by introducing comprehensive support for Multimap state. It integrates the previously existing Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
The failing PreCommits don't seem related to this CL. |
|
assign set of reviewers |
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Removes non-needed @nonnull annotations.
| return new PrefetchableIterator<Map.Entry<K, V>>() { | ||
| PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator = | ||
| PrefetchableIterables.concat( | ||
| Iterables.concat( |
There was a problem hiding this comment.
Would
PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
PrefetchableIterables.maybePrefetchable(Iterables.transform(
persistedEntries,
entry ->
Iterables.transform(
entry.getValue(),
value -> Maps.immutableEntry(entry.getKey(), value)))
work? seems like single iterable so don't need the two concats. If I'm missing something it at least seems like you could just have PrefetchableIterables.concat and not the other one.
There was a problem hiding this comment.
It seems to have something to do with PrefetchableIterator though. If I replace everything with just Iterators, it works fine.
I've tried to get rid of the inner concat, but then I get errors about trying to pass a PrefetchableIterator<Iterator<Map.Entry<K,V>>> to the outermost concat. I was able to get it to work by making maybePrefetchable public (it's currently private), but I still need a single concat of the combined transforms.
There was a problem hiding this comment.
The WindmillMultimap implementation also needed a concat:
There was a problem hiding this comment.
Im a little worried we're ending up with something that is not actually plumbing prefetch down to the entries. It seems like we need PrefetchableIterables.transform as otherwise Iterables.transform is going to wrap the PrefetchableIterator and PrefetchableIterators.maybePrefetch wrapper is going to just make the prefetch a no-op.
There was a problem hiding this comment.
I added a test similar to the other prefetch tests. It looks like a call to entries().prefetch() produces a data request across the FnApi.
| value -> Maps.immutableEntry(entry.getKey(), value))))) | ||
| .iterator(); | ||
| Iterator<Map.Entry<K, V>> pendingAddsNowIterator; | ||
| boolean hasNext; |
There was a problem hiding this comment.
hasNextReady? it's confusing as is since hasNext=false doesn't mean it doesn't have next, just that we haven't calculated it.
or perhaps you can remove this variable and just mark nextEntry nullable and comment that if it is non-null it is calculated next element to return.
There was a problem hiding this comment.
Sounds good. Got rid of hasNext and reset nextEntry to null when calling next().
| } | ||
| } | ||
|
|
||
| if (pendingAddsNowIterator == null) { |
There was a problem hiding this comment.
do we need to merge the persisted and local adds? Does the iterator have any guarantees on ordering of elements for a key?
There was a problem hiding this comment.
I changed this up so it merges in the local adds with backend values. Still needed two concats for some reason though. Otherwise, it complains that it's getting PrefetchableIterator<Iterable<Entry<K, V>>> instead of a PrefetchableIterator<Entry<K,V>>.
I'm not really sure if the prefetching propagates through all these Iterables.transform() calls though.
…om the backend and local adds together. Also needed to make maybePrefetchable public.
…into multimap-streaming
| return new PrefetchableIterator<Map.Entry<K, V>>() { | ||
| PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator = | ||
| PrefetchableIterables.concat( | ||
| Iterables.concat( |
There was a problem hiding this comment.
Im a little worried we're ending up with something that is not actually plumbing prefetch down to the entries. It seems like we need PrefetchableIterables.transform as otherwise Iterables.transform is going to wrap the PrefetchableIterator and PrefetchableIterators.maybePrefetch wrapper is going to just make the prefetch a no-op.
|
Reminder, please take a look at this pr: @Abacn |
…st sent across FnApi.
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @ahmedabu98 for label java. Available commands:
|
scwhittle
left a comment
There was a problem hiding this comment.
Implementation looks good, just a question about capability protecting it
| // The response data stream will be a concatenation of pairs, where the first | ||
| // component is the map key and the second component is a concatenation of | ||
| // values associated with that map key. | ||
| message MultimapEntriesUserState { |
There was a problem hiding this comment.
maybe we should add a capability for Multimap support similar to ordered list state?
There was a problem hiding this comment.
Good idea! I added a capability to beam_runner_api.proto and specified in for the java sdk. I don't think python or go support it yet.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #36218 +/- ##
=============================================
- Coverage 54.93% 36.12% -18.81%
Complexity 1617 1617
=============================================
Files 1057 1058 +1
Lines 164425 165037 +612
Branches 1165 1165
=============================================
- Hits 90320 59613 -30707
- Misses 71955 103274 +31319
Partials 2150 2150
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Run Java_Amazon-Web-Services2_IO_Direct PreCommit |
|
Run Java PreCommit |
|
I understand it's ready to be merged? |
A MultimapUserState class already existed, but wasn't hooked up to the FnApiStateAccessor. In addition to hooking it up, we also added an API to the FnApi to get all entries in the multimap (which was part of the SDK's state API, but was missing in the FnApi).
Added a new validates runner test for the entries API as well as a MultimapUserStateTest test for the new behavior.
This PR is to help address #23616.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.