diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index 475786d3a4f6..e5d70d85d875 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -39,6 +39,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -51,8 +52,10 @@ class AssignDestinationsAndPartitions private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + static final String DESTINATION = "destination"; static final String PARTITION = "partition"; + static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder() .addStringField(DESTINATION) @@ -75,8 +78,13 @@ public PCollection> expand(PCollection input) { } static class AssignDoFn extends DoFn> { + + private static final Duration REFRESH_INTERVAL = Duration.standardMinutes(5); + private transient @MonotonicNonNull Map partitionKeys; private transient @MonotonicNonNull Map wrappers; + private transient @MonotonicNonNull Map lastRefreshTimes; + private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -89,6 +97,7 @@ static class AssignDoFn extends DoFn> { public void setup() { this.wrappers = new HashMap<>(); this.partitionKeys = new HashMap<>(); + this.lastRefreshTimes = new HashMap<>(); } @ProcessElement @@ -98,42 +107,74 @@ public void processElement( PaneInfo paneInfo, @Timestamp Instant timestamp, OutputReceiver> out) { + String tableIdentifier = dynamicDestinations.getTableStringIdentifier( ValueInSingleWindow.of(element, timestamp, window, paneInfo)); + Row data = dynamicDestinations.getData(element); @Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier); + @Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier); - if (partitionKey == null || wrapper == null) { + + @Nullable Instant lastRefresh = checkStateNotNull(lastRefreshTimes).get(tableIdentifier); + + Instant now = Instant.now(); + + boolean shouldRefresh = + partitionKey == null + || wrapper == null + || lastRefresh == null + || now.isAfter(lastRefresh.plus(REFRESH_INTERVAL)); + + if (shouldRefresh) { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema()); + @Nullable IcebergTableCreateConfig createConfig = dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig(); + if (createConfig != null && createConfig.getPartitionFields() != null) { + spec = PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema()); + } else { + try { // see if table already exists with a spec - // TODO(https://github.com/apache/beam/issues/38337): improve this by periodically - // refreshing the table to fetch updated specs spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec(); + } catch (NoSuchTableException ignored) { // no partition to apply } } + partitionKey = new PartitionKey(spec, schema); + wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); + checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey); + checkStateNotNull(wrappers).put(tableIdentifier, wrapper); + + checkStateNotNull(lastRefreshTimes).put(tableIdentifier, now); } + + partitionKey = checkStateNotNull(partitionKey); + wrapper = checkStateNotNull(wrapper); + partitionKey.partition(wrapper.wrap(data)); + String partitionPath = partitionKey.toPath(); Row destAndPartition = Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build(); + out.output(KV.of(destAndPartition, data)); } }