@@ -1949,6 +1949,8 @@ public <V extends JdbcWriteResult> WriteWithResults<T, V> withWriteResults(
19491949 .setStatement (inner .getStatement ())
19501950 .setTable (inner .getTable ())
19511951 .setAutoSharding (inner .getAutoSharding ())
1952+ .setBatchSize (inner .getBatchSize ())
1953+ .setMaxBatchBufferingDuration (inner .getMaxBatchBufferingDuration ())
19521954 .build ();
19531955 }
19541956
@@ -2055,6 +2057,10 @@ public abstract static class WriteWithResults<T, V extends JdbcWriteResult>
20552057
20562058 abstract @ Nullable RowMapper <V > getRowMapper ();
20572059
2060+ abstract @ Nullable Long getBatchSize ();
2061+
2062+ abstract @ Nullable Long getMaxBatchBufferingDuration ();
2063+
20582064 abstract Builder <T , V > toBuilder ();
20592065
20602066 @ AutoValue .Builder
@@ -2064,6 +2070,10 @@ abstract Builder<T, V> setDataSourceProviderFn(
20642070
20652071 abstract Builder <T , V > setAutoSharding (@ Nullable Boolean autoSharding );
20662072
2073+ abstract Builder <T , V > setBatchSize (@ Nullable Long batchSize );
2074+
2075+ abstract Builder <T , V > setMaxBatchBufferingDuration (@ Nullable Long maxBatchBufferingDuration );
2076+
20672077 abstract Builder <T , V > setStatement (@ Nullable ValueProvider <String > statement );
20682078
20692079 abstract Builder <T , V > setPreparedStatementSetter (
@@ -2080,6 +2090,19 @@ abstract Builder<T, V> setPreparedStatementSetter(
20802090 abstract WriteWithResults <T , V > build ();
20812091 }
20822092
2093+ public WriteWithResults <T , V > withBatchSize (long batchSize ) {
2094+ checkArgument (batchSize > 0 , "batchSize must be > 0, but was %s" , batchSize );
2095+ return toBuilder ().setBatchSize (batchSize ).build ();
2096+ }
2097+
2098+ public WriteWithResults <T , V > withMaxBatchBufferingDuration (long maxBatchBufferingDuration ) {
2099+ checkArgument (
2100+ maxBatchBufferingDuration > 0 ,
2101+ "maxBatchBufferingDuration must be > 0, but was %s" ,
2102+ maxBatchBufferingDuration );
2103+ return toBuilder ().setMaxBatchBufferingDuration (maxBatchBufferingDuration ).build ();
2104+ }
2105+
20832106 public WriteWithResults <T , V > withDataSourceConfiguration (DataSourceConfiguration config ) {
20842107 return withDataSourceProviderFn (new DataSourceProviderFromDataSourceConfiguration (config ));
20852108 }
@@ -2173,9 +2196,16 @@ public PCollection<V> expand(PCollection<T> input) {
21732196 autoSharding == null || (autoSharding && input .isBounded () != IsBounded .UNBOUNDED ),
21742197 "Autosharding is only supported for streaming pipelines." );
21752198
2199+ Long batchSizeAsLong = getBatchSize ();
2200+ long batchSize = batchSizeAsLong == null ? DEFAULT_BATCH_SIZE : batchSizeAsLong ;
2201+ Long maxBufferingDurationAsLong = getMaxBatchBufferingDuration ();
2202+ long maxBufferingDuration =
2203+ maxBufferingDurationAsLong == null
2204+ ? DEFAULT_MAX_BATCH_BUFFERING_DURATION
2205+ : maxBufferingDurationAsLong ;
2206+
21762207 PCollection <Iterable <T >> iterables =
2177- JdbcIO .<T >batchElements (
2178- input , autoSharding , DEFAULT_BATCH_SIZE , DEFAULT_MAX_BATCH_BUFFERING_DURATION );
2208+ JdbcIO .<T >batchElements (input , autoSharding , batchSize , maxBufferingDuration );
21792209 return iterables .apply (
21802210 ParDo .of (
21812211 new WriteFn <T , V >(
@@ -2187,8 +2217,8 @@ public PCollection<V> expand(PCollection<T> input) {
21872217 .setStatement (getStatement ())
21882218 .setRetryConfiguration (getRetryConfiguration ())
21892219 .setReturnResults (true )
2190- .setBatchSize (1L )
2191- .setMaxBatchBufferingDuration (DEFAULT_MAX_BATCH_BUFFERING_DURATION )
2220+ .setBatchSize (1L ) // We are writing iterables 1 at a time.
2221+ .setMaxBatchBufferingDuration (maxBufferingDuration )
21922222 .build ())));
21932223 }
21942224 }
0 commit comments