Skip to content

Commit 21dbf59

Browse files
authored
Adds Multimap support to JAVA FnApi (#36218)
* Changes multimap state key() tests to not care about order. There is no guarantee on the order keys are returned. Also fixes a couple warnings from other FnApi tests. * Adds Multimap user state support to the Java FnApi harness. Also adds a missing FnApi state proto to get all of the entries of a multimap. This type of access is part of the state API (and supported by the non-portable harness), but was not present in the protos. * Adds FnApi binding for entries() method. * Changes multimap entries() iterable to put values for the same key from the backend and local adds together. Also needed to make maybePrefetchable public. * Adds a test that prefetching multimap entries results in a StateRequest sent across FnApi. * Adds an environment capability for multimap state and sets in for the java sdk.
1 parent 3432480 commit 21dbf59

9 files changed

Lines changed: 480 additions & 16 deletions

File tree

model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,29 @@ message StateKey {
10171017
bytes key = 4;
10181018
}
10191019

1020+
// Represents a request for all of the entries of a multimap associated with a
1021+
// specified user key and window for a PTransform. See
1022+
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
1023+
// details.
1024+
//
1025+
// Can only be used to perform StateGetRequests and StateClearRequests on the
1026+
// user state.
1027+
//
1028+
// The response data stream will be a concatenation of pairs, where the first
1029+
// component is the map key and the second component is a concatenation of
1030+
// values associated with that map key.
1031+
message MultimapEntriesUserState {
1032+
// (Required) The id of the PTransform containing user state.
1033+
string transform_id = 1;
1034+
// (Required) The id of the user state.
1035+
string user_state_id = 2;
1036+
// (Required) The window encoded in a nested context.
1037+
bytes window = 3;
1038+
// (Required) The key of the currently executing element encoded in a
1039+
// nested context.
1040+
bytes key = 4;
1041+
}
1042+
10201043
// Represents a request for the values of the map key associated with a
10211044
// specified user key and window for a PTransform. See
10221045
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
@@ -1072,6 +1095,7 @@ message StateKey {
10721095
MultimapKeysSideInput multimap_keys_side_input = 5;
10731096
MultimapKeysValuesSideInput multimap_keys_values_side_input = 8;
10741097
MultimapKeysUserState multimap_keys_user_state = 6;
1098+
MultimapEntriesUserState multimap_entries_user_state = 10;
10751099
MultimapUserState multimap_user_state = 7;
10761100
OrderedListUserState ordered_list_user_state = 9;
10771101
}

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1621,13 +1621,13 @@ message AnyOfEnvironmentPayload {
16211621
// environment understands.
16221622
message StandardProtocols {
16231623
enum Enum {
1624-
// Indicates suport for progress reporting via the legacy Metrics proto.
1624+
// Indicates support for progress reporting via the legacy Metrics proto.
16251625
LEGACY_PROGRESS_REPORTING = 0 [(beam_urn) = "beam:protocol:progress_reporting:v0"];
16261626

1627-
// Indicates suport for progress reporting via the new MonitoringInfo proto.
1627+
// Indicates support for progress reporting via the new MonitoringInfo proto.
16281628
PROGRESS_REPORTING = 1 [(beam_urn) = "beam:protocol:progress_reporting:v1"];
16291629

1630-
// Indicates suport for worker status protocol defined at
1630+
// Indicates support for worker status protocol defined at
16311631
// https://s.apache.org/beam-fn-api-harness-status.
16321632
WORKER_STATUS = 2 [(beam_urn) = "beam:protocol:worker_status:v1"];
16331633

@@ -1681,6 +1681,10 @@ message StandardProtocols {
16811681
// Indicates support for reading, writing and propagating Element's metadata
16821682
ELEMENT_METADATA = 11
16831683
[(beam_urn) = "beam:protocol:element_metadata:v1"];
1684+
1685+
// Indicates whether the SDK supports multimap state.
1686+
MULTIMAP_STATE = 12
1687+
[(beam_urn) = "beam:protocol:multimap_state:v1"];
16841688
}
16851689
}
16861690

sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/PrefetchableIterables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public PrefetchableIterator<T> createIterator() {
9494
* constructed that ensures that {@link PrefetchableIterator#prefetch()} is a no-op and {@link
9595
* PrefetchableIterator#isReady()} always returns true.
9696
*/
97-
private static <T> PrefetchableIterable<T> maybePrefetchable(Iterable<T> iterable) {
97+
public static <T> PrefetchableIterable<T> maybePrefetchable(Iterable<T> iterable) {
9898
if (iterable instanceof PrefetchableIterable) {
9999
return (PrefetchableIterable<T>) iterable;
100100
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ public static Set<String> getJavaCapabilities() {
521521
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.DATA_SAMPLING));
522522
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.SDK_CONSUMING_RECEIVED_DATA));
523523
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.ORDERED_LIST_STATE));
524+
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTIMAP_STATE));
524525
return capabilities.build();
525526
}
526527

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2917,6 +2917,73 @@ public void processElement(
29172917
pipeline.run();
29182918
}
29192919

2920+
@Test
2921+
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMultimapState.class})
2922+
public void testMultimapStateEntries() {
2923+
final String stateId = "foo:";
2924+
final String countStateId = "count";
2925+
DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn =
2926+
new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
2927+
2928+
@StateId(stateId)
2929+
private final StateSpec<MultimapState<String, Integer>> multimapState =
2930+
StateSpecs.multimap(StringUtf8Coder.of(), VarIntCoder.of());
2931+
2932+
@StateId(countStateId)
2933+
private final StateSpec<CombiningState<Integer, int[], Integer>> countState =
2934+
StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers());
2935+
2936+
@ProcessElement
2937+
public void processElement(
2938+
ProcessContext c,
2939+
@Element KV<String, KV<String, Integer>> element,
2940+
@StateId(stateId) MultimapState<String, Integer> state,
2941+
@StateId(countStateId) CombiningState<Integer, int[], Integer> count,
2942+
OutputReceiver<KV<String, Integer>> r) {
2943+
// Empty before we process any elements.
2944+
if (count.read() == 0) {
2945+
assertThat(state.entries().read(), emptyIterable());
2946+
}
2947+
assertEquals(count.read().intValue(), Iterables.size(state.entries().read()));
2948+
2949+
KV<String, Integer> value = element.getValue();
2950+
state.put(value.getKey(), value.getValue());
2951+
count.add(1);
2952+
2953+
if (count.read() >= 4) {
2954+
// This should be evaluated only when ReadableState.read is called.
2955+
ReadableState<Iterable<Entry<String, Integer>>> entriesView = state.entries();
2956+
2957+
// This is evaluated immediately.
2958+
Iterable<Entry<String, Integer>> entries = state.entries().read();
2959+
2960+
state.remove("b");
2961+
assertEquals(4, Iterables.size(entries));
2962+
state.put("a", 2);
2963+
state.put("a", 3);
2964+
2965+
assertEquals(5, Iterables.size(entriesView.read()));
2966+
// Note we output the view of state before the modifications in this if statement.
2967+
for (Entry<String, Integer> entry : entries) {
2968+
r.output(KV.of(entry.getKey(), entry.getValue()));
2969+
}
2970+
}
2971+
}
2972+
};
2973+
PCollection<KV<String, Integer>> output =
2974+
pipeline
2975+
.apply(
2976+
Create.of(
2977+
KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("a", 97)),
2978+
KV.of("hello", KV.of("a", 98)), KV.of("hello", KV.of("b", 33))))
2979+
.apply(ParDo.of(fn));
2980+
PAssert.that(output)
2981+
.containsInAnyOrder(
2982+
KV.of("a", 97), KV.of("a", 97),
2983+
KV.of("a", 98), KV.of("b", 33));
2984+
pipeline.run();
2985+
}
2986+
29202987
@Test
29212988
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMultimapState.class})
29222989
public void testMultimapStateRemove() {

sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ public void testCapabilities() {
219219
assertThat(
220220
Environments.getJavaCapabilities(),
221221
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.ORDERED_LIST_STATE)));
222+
assertThat(
223+
Environments.getJavaCapabilities(),
224+
hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.MULTIMAP_STATE)));
222225
// Check that SDF truncation is supported
223226
assertThat(
224227
Environments.getJavaCapabilities(),

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public static class Factory<K> {
117117

118118
public Factory(
119119
PipelineOptions pipelineOptions,
120-
Set<String> runnerCapabilites,
120+
Set<String> runnerCapabilities,
121121
String ptransformId,
122122
Supplier<String> processBundleInstructionId,
123123
Supplier<List<CacheToken>> cacheTokens,
@@ -128,7 +128,7 @@ public Factory(
128128
Coder<K> keyCoder,
129129
Coder<BoundedWindow> windowCoder) {
130130
this.pipelineOptions = pipelineOptions;
131-
this.runnerCapabilities = runnerCapabilites;
131+
this.runnerCapabilities = runnerCapabilities;
132132
this.ptransformId = ptransformId;
133133
this.processBundleInstructionId = processBundleInstructionId;
134134
this.cacheTokens = cacheTokens;
@@ -240,7 +240,7 @@ public FnApiStateAccessor<K> create() {
240240
}
241241

242242
private final PipelineOptions pipelineOptions;
243-
private final Set<String> runnerCapabilites;
243+
private final Set<String> runnerCapabilities;
244244
private final Map<StateKey, Object> stateKeyObjectCache;
245245
private final Map<TupleTag<?>, SideInputSpec> sideInputSpecMap;
246246
private final BeamFnStateClient beamFnStateClient;
@@ -259,7 +259,7 @@ public FnApiStateAccessor<K> create() {
259259

260260
public FnApiStateAccessor(
261261
PipelineOptions pipelineOptions,
262-
Set<String> runnerCapabilites,
262+
Set<String> runnerCapabilities,
263263
String ptransformId,
264264
Supplier<String> processBundleInstructionId,
265265
Supplier<List<CacheToken>> cacheTokens,
@@ -270,7 +270,7 @@ public FnApiStateAccessor(
270270
Coder<K> keyCoder,
271271
Coder<BoundedWindow> windowCoder) {
272272
this.pipelineOptions = pipelineOptions;
273-
this.runnerCapabilites = runnerCapabilites;
273+
this.runnerCapabilities = runnerCapabilities;
274274
this.stateKeyObjectCache = Maps.newHashMap();
275275
this.sideInputSpecMap = sideInputSpecMap;
276276
this.beamFnStateClient = beamFnStateClient;
@@ -414,7 +414,7 @@ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
414414
key,
415415
((KvCoder) sideInputSpec.getCoder()).getKeyCoder(),
416416
((KvCoder) sideInputSpec.getCoder()).getValueCoder(),
417-
runnerCapabilites.contains(
417+
runnerCapabilities.contains(
418418
BeamUrns.getUrn(
419419
RunnerApi.StandardRunnerProtocols.Enum
420420
.MULTIMAP_KEYS_VALUES_SIDE_INPUT))));
@@ -762,8 +762,113 @@ public <KeyT, ValueT> MultimapState<KeyT, ValueT> bindMultimap(
762762
StateSpec<MultimapState<KeyT, ValueT>> spec,
763763
Coder<KeyT> keyCoder,
764764
Coder<ValueT> valueCoder) {
765-
// TODO(https://github.com/apache/beam/issues/23616)
766-
throw new UnsupportedOperationException("Multimap is not currently supported with Fn API.");
765+
return (MultimapState<KeyT, ValueT>)
766+
stateKeyObjectCache.computeIfAbsent(
767+
createMultimapKeysUserStateKey(id),
768+
new Function<StateKey, Object>() {
769+
@Override
770+
public Object apply(StateKey stateKey) {
771+
return new MultimapState<KeyT, ValueT>() {
772+
private final MultimapUserState<KeyT, ValueT> impl =
773+
createMultimapUserState(stateKey, keyCoder, valueCoder);
774+
775+
@Override
776+
public void put(KeyT key, ValueT value) {
777+
impl.put(key, value);
778+
}
779+
780+
@Override
781+
public ReadableState<Iterable<ValueT>> get(KeyT key) {
782+
return new ReadableState<Iterable<ValueT>>() {
783+
@Override
784+
public Iterable<ValueT> read() {
785+
return impl.get(key);
786+
}
787+
788+
@Override
789+
public ReadableState<Iterable<ValueT>> readLater() {
790+
impl.get(key).prefetch();
791+
return this;
792+
}
793+
};
794+
}
795+
796+
@Override
797+
public void remove(KeyT key) {
798+
impl.remove(key);
799+
}
800+
801+
@Override
802+
public ReadableState<Iterable<KeyT>> keys() {
803+
return new ReadableState<Iterable<KeyT>>() {
804+
@Override
805+
public Iterable<KeyT> read() {
806+
return impl.keys();
807+
}
808+
809+
@Override
810+
public ReadableState<Iterable<KeyT>> readLater() {
811+
impl.keys().prefetch();
812+
return this;
813+
}
814+
};
815+
}
816+
817+
@Override
818+
public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
819+
return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>() {
820+
@Override
821+
public Iterable<Map.Entry<KeyT, ValueT>> read() {
822+
return impl.entries();
823+
}
824+
825+
@Override
826+
public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
827+
impl.entries().prefetch();
828+
return this;
829+
}
830+
};
831+
}
832+
833+
@Override
834+
public ReadableState<Boolean> containsKey(KeyT key) {
835+
return new ReadableState<Boolean>() {
836+
@Override
837+
public Boolean read() {
838+
return !Iterables.isEmpty(impl.get(key));
839+
}
840+
841+
@Override
842+
public ReadableState<Boolean> readLater() {
843+
impl.get(key).prefetch();
844+
return this;
845+
}
846+
};
847+
}
848+
849+
@Override
850+
public ReadableState<Boolean> isEmpty() {
851+
return new ReadableState<Boolean>() {
852+
@Override
853+
public Boolean read() {
854+
return Iterables.isEmpty(impl.keys());
855+
}
856+
857+
@Override
858+
public ReadableState<Boolean> readLater() {
859+
impl.keys().prefetch();
860+
return this;
861+
}
862+
};
863+
}
864+
865+
@Override
866+
public void clear() {
867+
impl.clear();
868+
}
869+
};
870+
}
871+
});
767872
}
768873

769874
@Override

0 commit comments

Comments
 (0)