Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.util.Objects;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Interner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Interners;

/*
* Weakly Interned ByteStrings.
* Used to save memory and GC pressure by sharing ByteStrings,
* that are repeated commonly. Encoded stateTags are an example that are Interned.
* */
@ThreadSafe
public class InternedByteString {

private static final int MAP_CONCURRENCY =
Math.max(4, Runtime.getRuntime().availableProcessors());
private static final Interner<InternedByteString> ENCODED_KEY_INTERNER =
Interners.newBuilder().weak().concurrencyLevel(MAP_CONCURRENCY).build();

// ints don't tear and it is safe to cache without synchronization.
// Defaults to 0.
private int hashCode;
private final ByteString byteString;

private InternedByteString(ByteString byteString) {
this.byteString = byteString;
}

public ByteString byteString() {
return byteString;
}

@Override
public int hashCode() {
if (hashCode == 0) {
hashCode = byteString.hashCode();
}
return hashCode;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}

if (!(o instanceof InternedByteString)) {
return false;
}
InternedByteString that = (InternedByteString) o;
return hashCode() == that.hashCode() && Objects.equals(byteString, that.byteString);
}

public static InternedByteString of(ByteString value) {
return ENCODED_KEY_INTERNER.intern(new InternedByteString(value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.core.StateTable;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForKeyAndFamily;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -36,6 +37,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;

final class CachingStateTable extends StateTable {

private final String stateFamily;
private final WindmillStateReader reader;
private final WindmillStateCache.ForKeyAndFamily cache;
Expand Down Expand Up @@ -84,23 +86,14 @@ protected StateTag.StateBinder binderForNamespace(StateNamespace namespace, Stat
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
StateTag<BagState<T>> resolvedAddress =
isSystemTable ? StateTags.makeSystemTagInternal(address) : address;
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress);

WindmillBag<T> result =
cache
.get(namespace, resolvedAddress)
.map(bagState -> (WindmillBag<T>) bagState)
.orElseGet(
() ->
new WindmillBag<>(
namespace,
resolvedAddress,
stateFamily,
elemCoder,
isNewKey,
windmillStateTagUtil));

result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
@Nullable WindmillBag<T> bag = (WindmillBag<T>) cache.get(namespace, encodedKey);
if (bag == null) {
bag = new WindmillBag<>(namespace, encodedKey, stateFamily, elemCoder, isNewKey);
}
bag.initializeForWorkItem(reader, scopedReadStateSupplier);
return bag;
}

@Override
Expand All @@ -123,20 +116,13 @@ public <KeyT, ValueT> AbstractWindmillMap<KeyT, ValueT> bindMap(
new WindmillMapViaMultimap<>(
bindMultimap(internalMultimapAddress, keyCoder, valueCoder));
} else {
result =
cache
.get(namespace, spec)
.map(mapState -> (AbstractWindmillMap<KeyT, ValueT>) mapState)
.orElseGet(
() ->
new WindmillMap<>(
namespace,
spec,
stateFamily,
keyCoder,
valueCoder,
isNewKey,
windmillStateTagUtil));
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
result = (AbstractWindmillMap<KeyT, ValueT>) cache.get(namespace, encodedKey);
if (result == null) {
result =
new WindmillMap<>(
namespace, encodedKey, stateFamily, keyCoder, valueCoder, isNewKey);
}
}
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
Expand All @@ -147,20 +133,14 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
StateTag<MultimapState<KeyT, ValueT>> spec,
Coder<KeyT> keyCoder,
Coder<ValueT> valueCoder) {
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
WindmillMultimap<KeyT, ValueT> result =
cache
.get(namespace, spec)
.map(multimapState -> (WindmillMultimap<KeyT, ValueT>) multimapState)
.orElseGet(
() ->
new WindmillMultimap<>(
namespace,
spec,
stateFamily,
keyCoder,
valueCoder,
isNewKey,
windmillStateTagUtil));
(WindmillMultimap<KeyT, ValueT>) cache.get(namespace, encodedKey);
if (result == null) {
result =
new WindmillMultimap<>(
namespace, encodedKey, stateFamily, keyCoder, valueCoder, isNewKey);
}
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
}
Expand All @@ -169,21 +149,21 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
public <T> OrderedListState<T> bindOrderedList(
StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
StateTag<OrderedListState<T>> specOrInternalTag = addressOrInternalTag(spec);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, specOrInternalTag);

WindmillOrderedList<T> result =
cache
.get(namespace, specOrInternalTag)
.map(orderedList -> (WindmillOrderedList<T>) orderedList)
.orElseGet(
() ->
new WindmillOrderedList<>(
Optional.ofNullable(derivedStateTable).orElse(CachingStateTable.this),
namespace,
specOrInternalTag,
stateFamily,
elemCoder,
isNewKey,
windmillStateTagUtil));
WindmillOrderedList<T> result = (WindmillOrderedList<T>) cache.get(namespace, encodedKey);
if (result == null) {
result =
new WindmillOrderedList<>(
Optional.ofNullable(derivedStateTable).orElse(CachingStateTable.this),
namespace,
encodedKey,
specOrInternalTag,
stateFamily,
elemCoder,
isNewKey);
}

result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
Expand All @@ -193,21 +173,15 @@ public <T> OrderedListState<T> bindOrderedList(
public WatermarkHoldState bindWatermark(
StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
StateTag<WatermarkHoldState> addressOrInternalTag = addressOrInternalTag(address);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);

WindmillWatermarkHold result =
cache
.get(namespace, addressOrInternalTag)
.map(watermarkHold -> (WindmillWatermarkHold) watermarkHold)
.orElseGet(
() ->
new WindmillWatermarkHold(
namespace,
address,
stateFamily,
timestampCombiner,
isNewKey,
windmillStateTagUtil));

WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey);
if (result == null) {
result =
new WindmillWatermarkHold(
namespace, encodedKey, stateFamily, timestampCombiner, isNewKey);
}
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
}
Expand Down Expand Up @@ -248,21 +222,13 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
@Override
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
StateTag<ValueState<T>> addressOrInternalTag = addressOrInternalTag(address);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);

WindmillValue<T> result =
cache
.get(namespace, addressOrInternalTag)
.map(value -> (WindmillValue<T>) value)
.orElseGet(
() ->
new WindmillValue<>(
namespace,
addressOrInternalTag,
stateFamily,
coder,
isNewKey,
windmillStateTagUtil));

WindmillValue<T> result = (WindmillValue<T>) cache.get(namespace, encodedKey);
if (result == null) {
result = new WindmillValue<>(namespace, encodedKey, stateFamily, coder, isNewKey);
}
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
}
Expand All @@ -274,6 +240,7 @@ private <T extends State> StateTag<T> addressOrInternalTag(StateTag<T> address)
}

static class Builder {

private final String stateFamily;
private final WindmillStateReader reader;
private final WindmillStateCache.ForKeyAndFamily cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
Expand All @@ -41,8 +41,7 @@
public class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {

private final StateNamespace namespace;
private final StateTag<BagState<T>> address;
private final ByteString stateKey;
private final InternedByteString stateKey;
private final String stateFamily;
private final Coder<T> elemCoder;

Expand All @@ -60,14 +59,12 @@ public class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {

WindmillBag(
StateNamespace namespace,
StateTag<BagState<T>> address,
InternedByteString encodeKey,
String stateFamily,
Coder<T> elemCoder,
boolean isNewKey,
WindmillStateTagUtil windmillStateTagUtil) {
boolean isNewKey) {
this.namespace = namespace;
this.address = address;
this.stateKey = windmillStateTagUtil.encodeKey(namespace, address);
this.stateKey = encodeKey;
this.stateFamily = stateFamily;
this.elemCoder = elemCoder;
if (isNewKey) {
Expand Down Expand Up @@ -183,7 +180,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
}

if (bagUpdatesBuilder != null) {
bagUpdatesBuilder.setTag(stateKey).setStateFamily(stateFamily);
bagUpdatesBuilder.setTag(stateKey.byteString()).setStateFamily(stateFamily);
}

if (cachedValues != null) {
Expand All @@ -194,7 +191,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
}
// We now know the complete bag contents, and any read on it will yield a
// cached value, so cache it for future reads.
cache.put(namespace, address, this, encodedSize + stateKey.size());
cache.put(namespace, stateKey, this, encodedSize + stateKey.byteString().size());
}

// Don't reuse the localAdditions object; we don't want future changes to it to
Expand All @@ -205,6 +202,8 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
}

private Future<Iterable<T>> getFuture() {
return cachedValues != null ? null : reader.bagFuture(stateKey, stateFamily, elemCoder);
return cachedValues != null
? null
: reader.bagFuture(stateKey.byteString(), stateFamily, elemCoder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForKeyAndFamily;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -61,20 +62,13 @@ class WindmillCombiningState<InputT, AccumT, OutputT> extends WindmillState
boolean isNewKey,
WindmillStateTagUtil windmillStateTagUtil) {
StateTag<BagState<AccumT>> internalBagAddress = StateTags.convertToBagTagInternal(address);
this.bag =
cache
.get(namespace, internalBagAddress)
.map(state -> (WindmillBag<AccumT>) state)
.orElseGet(
() ->
new WindmillBag<>(
namespace,
internalBagAddress,
stateFamily,
accumCoder,
isNewKey,
windmillStateTagUtil));
InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress);

WindmillBag<AccumT> bag = (WindmillBag<AccumT>) cache.get(namespace, encodeKey);
if (bag == null) {
bag = new WindmillBag<>(namespace, encodeKey, stateFamily, accumCoder, isNewKey);
}
this.bag = bag;
this.combineFn = combineFn;
this.localAdditionsAccumulator = combineFn.createAccumulator();
this.hasLocalAdditions = false;
Expand Down
Loading
Loading