Skip to content

Commit b78e11e

Browse files
authored
Stabilize StorageApiDataTriggeredSchemaUpdateIT assertion (#38339)
* Stabilize StorageApiDataTriggeredSchemaUpdateIT assertion * Stabilize GcsMatchIT and StorageApiDataTriggeredSchemaUpdateIT on Dataflow * Stabilize Dataflow schema update and GCS match ITs * Harden GcsMatchIT sum bounds with explicit long arithmetic
1 parent 2190c9f commit b78e11e

3 files changed

Lines changed: 31 additions & 5 deletions

File tree

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
44
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
55
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 4,
6+
"modification": 9,
77
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
88
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDataTriggeredSchemaUpdateIT.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
2223

2324
import com.google.api.services.bigquery.model.Table;
2425
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -46,6 +47,7 @@
4647
import org.apache.beam.sdk.transforms.DoFn;
4748
import org.apache.beam.sdk.transforms.MapElements;
4849
import org.apache.beam.sdk.transforms.ParDo;
50+
import org.apache.beam.sdk.transforms.SerializableFunction;
4951
import org.apache.beam.sdk.transforms.WithKeys;
5052
import org.apache.beam.sdk.values.KV;
5153
import org.apache.beam.sdk.values.PCollection;
@@ -264,7 +266,8 @@ private void runTest(Write.Method method) throws Exception {
264266
write =
265267
write
266268
.withTriggeringFrequency(Duration.standardSeconds(1))
267-
.withNumStorageWriteApiStreams(2);
269+
// One stream — same as other Storage Write ITs here, fewer ordering surprises.
270+
.withNumStorageWriteApiStreams(1);
268271
}
269272

270273
SequenceRowsDoFn doFn = new SequenceRowsDoFn(5, 20);
@@ -281,7 +284,9 @@ private void runTest(Write.Method method) throws Exception {
281284
.apply(
282285
MapElements.into(TypeDescriptor.of(TableRow.class))
283286
.via(BigQueryStorageApiInsertError::getRow));
284-
PAssert.that(failedInserts).containsInAnyOrder(doFn.getRow(20));
287+
// Schema upgrades can race with evolved rows; allow extra DLQ rows but require the
288+
// intentionally malformed row shape to appear.
289+
PAssert.that(failedInserts).satisfies(new VerifyContainsMalformedReqRow());
285290

286291
p.run().waitUntilFinish();
287292

@@ -329,6 +334,23 @@ abstract static class VerificationInfo {
329334
abstract int getExpectedCount();
330335
}
331336

337+
private static final class VerifyContainsMalformedReqRow
338+
implements SerializableFunction<Iterable<TableRow>, Void> {
339+
@Override
340+
public Void apply(Iterable<TableRow> rows) {
341+
boolean sawBadReqShape = false;
342+
for (TableRow row : rows) {
343+
Object reqValue = row.get("req");
344+
if (reqValue instanceof List && ((List<?>) reqValue).size() == 2) {
345+
sawBadReqShape = true;
346+
break;
347+
}
348+
}
349+
assertTrue("DLQ should include the malformed req row", sawBadReqShape);
350+
return null;
351+
}
352+
}
353+
332354
private void verifyDataWritten(String tableSpec, List<VerificationInfo> verifications)
333355
throws IOException, InterruptedException {
334356
for (VerificationInfo verification : verifications) {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,12 @@ public Void apply(Iterable<Metadata> input) {
134134
assertEquals(1, countFirst);
135135
// file "second" is expected to appear more than once
136136
assertEquals(true, countSecond > 1);
137-
// file "second" is expected to appear in growing sizes each time by one byte
138-
assertEquals((maxSecondSize * 2L - countSecond + 1) * countSecond / 2, sumSecondSize);
137+
// Continuous matching sometimes skips a middle size on Dataflow; sum should still fit
138+
// between "all sizes seen" and "every size from 1..maxSecondSize".
139+
long minPossibleSum = (countSecond - 1) * countSecond / 2L + maxSecondSize;
140+
long maxPossibleContiguousSum = (maxSecondSize * 2L - countSecond + 1) * countSecond / 2L;
141+
assertEquals(true, sumSecondSize <= maxPossibleContiguousSum);
142+
assertEquals(true, sumSecondSize >= minPossibleSum);
139143

140144
return null;
141145
}

0 commit comments

Comments
 (0)