Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
Expand All @@ -51,8 +52,10 @@ class AssignDestinationsAndPartitions

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;

static final String DESTINATION = "destination";
static final String PARTITION = "partition";

static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
org.apache.beam.sdk.schemas.Schema.builder()
.addStringField(DESTINATION)
Expand All @@ -75,8 +78,13 @@ public PCollection<KV<Row, Row>> expand(PCollection<Row> input) {
}

static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {

private static final Duration REFRESH_INTERVAL = Duration.standardMinutes(5);

private transient @MonotonicNonNull Map<String, PartitionKey> partitionKeys;
private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
private transient @MonotonicNonNull Map<String, Instant> lastRefreshTimes;

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;

Expand All @@ -89,6 +97,7 @@ static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
public void setup() {
this.wrappers = new HashMap<>();
this.partitionKeys = new HashMap<>();
this.lastRefreshTimes = new HashMap<>();
}

@ProcessElement
Expand All @@ -98,42 +107,74 @@ public void processElement(
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<KV<Row, Row>> out) {

String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));

Row data = dynamicDestinations.getData(element);

@Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier);

@Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier);
if (partitionKey == null || wrapper == null) {

@Nullable Instant lastRefresh = checkStateNotNull(lastRefreshTimes).get(tableIdentifier);

Instant now = Instant.now();

boolean shouldRefresh =
partitionKey == null
|| wrapper == null
|| lastRefresh == null
|| now.isAfter(lastRefresh.plus(REFRESH_INTERVAL));

if (shouldRefresh) {

PartitionSpec spec = PartitionSpec.unpartitioned();

Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());

@Nullable
IcebergTableCreateConfig createConfig =
dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();

if (createConfig != null && createConfig.getPartitionFields() != null) {

spec =
PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema());

} else {

try {
// see if table already exists with a spec
// TODO(https://github.com/apache/beam/issues/38337): improve this by periodically
// refreshing the table to fetch updated specs
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();

} catch (NoSuchTableException ignored) {
// no partition to apply
}
}

partitionKey = new PartitionKey(spec, schema);

wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());

checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey);

checkStateNotNull(wrappers).put(tableIdentifier, wrapper);

checkStateNotNull(lastRefreshTimes).put(tableIdentifier, now);
}

partitionKey = checkStateNotNull(partitionKey);
wrapper = checkStateNotNull(wrapper);

partitionKey.partition(wrapper.wrap(data));

String partitionPath = partitionKey.toPath();

Row destAndPartition =
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();

out.output(KV.of(destAndPartition, data));
}
}
Expand Down
Loading