Skip to content

Commit 7235088

Browse files
authored
GCS client library migration in Java SDK - part 3 (#37900)
* Implement open method for gcsutilv2. Add an integration test. * Implement create method and add an integration test. * Store the gcs path into GcsWritableByteChannel. * Rename the new create method to createV2. * Revise according to reviewer comments.
1 parent 455075b commit 7235088

3 files changed

Lines changed: 291 additions & 11 deletions

File tree

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.cloud.storage.BucketInfo;
2626
import com.google.cloud.storage.Storage.BlobGetOption;
2727
import com.google.cloud.storage.Storage.BlobListOption;
28+
import com.google.cloud.storage.Storage.BlobSourceOption;
29+
import com.google.cloud.storage.Storage.BlobWriteOption;
2830
import com.google.cloud.storage.Storage.BucketGetOption;
2931
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
3032
import java.io.IOException;
@@ -186,9 +188,19 @@ public Page<Blob> listBlobs(
186188
}
187189

188190
public SeekableByteChannel open(GcsPath path) throws IOException {
191+
if (delegateV2 != null) {
192+
return delegateV2.open(path);
193+
}
189194
return delegate.open(path);
190195
}
191196

197+
public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) throws IOException {
198+
if (delegateV2 != null) {
199+
return delegateV2.open(path, options);
200+
}
201+
throw new IOException("GcsUtil V2 not initialized.");
202+
}
203+
192204
/** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
193205
@Deprecated
194206
public WritableByteChannel create(GcsPath path, String type) throws IOException {
@@ -254,9 +266,20 @@ public CreateOptions build() {
254266
}
255267

256268
public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException {
269+
if (delegateV2 != null) {
270+
delegateV2.create(path, options.delegate);
271+
}
257272
return delegate.create(path, options.delegate);
258273
}
259274

275+
public WritableByteChannel createV2(
276+
GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) throws IOException {
277+
if (delegateV2 != null) {
278+
return delegateV2.create(path, options.delegate, writeOptions);
279+
}
280+
throw new IOException("GcsUtil V2 not initialized.");
281+
}
282+
260283
public void verifyBucketAccessible(GcsPath path) throws IOException {
261284
if (delegateV2 != null) {
262285
delegateV2.verifyBucketAccessible(path);

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import com.google.api.gax.paging.Page;
2525
import com.google.auto.value.AutoValue;
26+
import com.google.cloud.ReadChannel;
27+
import com.google.cloud.WriteChannel;
2628
import com.google.cloud.storage.Blob;
2729
import com.google.cloud.storage.BlobId;
2830
import com.google.cloud.storage.BlobInfo;
@@ -33,21 +35,29 @@
3335
import com.google.cloud.storage.Storage.BlobField;
3436
import com.google.cloud.storage.Storage.BlobGetOption;
3537
import com.google.cloud.storage.Storage.BlobListOption;
38+
import com.google.cloud.storage.Storage.BlobSourceOption;
39+
import com.google.cloud.storage.Storage.BlobWriteOption;
3640
import com.google.cloud.storage.Storage.BucketField;
3741
import com.google.cloud.storage.Storage.BucketGetOption;
3842
import com.google.cloud.storage.Storage.CopyRequest;
3943
import com.google.cloud.storage.StorageBatch;
4044
import com.google.cloud.storage.StorageBatchResult;
45+
import com.google.cloud.storage.StorageChannelUtils;
4146
import com.google.cloud.storage.StorageException;
4247
import com.google.cloud.storage.StorageOptions;
4348
import java.io.FileNotFoundException;
4449
import java.io.IOException;
50+
import java.nio.ByteBuffer;
51+
import java.nio.channels.SeekableByteChannel;
52+
import java.nio.channels.WritableByteChannel;
4553
import java.nio.file.AccessDeniedException;
4654
import java.nio.file.FileAlreadyExistsException;
4755
import java.util.ArrayList;
56+
import java.util.Arrays;
4857
import java.util.List;
4958
import java.util.regex.Pattern;
5059
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
60+
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
5161
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
5262
import org.apache.beam.sdk.options.DefaultValueFactory;
5363
import org.apache.beam.sdk.options.PipelineOptions;
@@ -70,6 +80,8 @@ public GcsUtilV2 create(PipelineOptions options) {
7080

7181
private Storage storage;
7282

83+
private final @Nullable Integer uploadBufferSizeBytes;
84+
7385
/** Maximum number of items to retrieve per Objects.List request. */
7486
private static final long MAX_LIST_BLOBS_PER_CALL = 1024;
7587

@@ -85,13 +97,14 @@ public GcsUtilV2 create(PipelineOptions options) {
8597
GcsUtilV2(PipelineOptions options) {
8698
String projectId = options.as(GcpOptions.class).getProject();
8799
storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
100+
uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes();
88101
}
89102

90103
@SuppressWarnings({
91104
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
92105
// FileAlreadyExistsException with null.
93106
})
94-
private IOException translateStorageException(GcsPath gcsPath, StorageException e) {
107+
private static IOException translateStorageException(GcsPath gcsPath, StorageException e) {
95108
switch (e.getCode()) {
96109
case 403:
97110
return new AccessDeniedException(gcsPath.toString(), null, e.getMessage());
@@ -481,4 +494,152 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException {
481494
throw translateStorageException(bucketInfo.getName(), null, e);
482495
}
483496
}
497+
498+
/** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */
499+
private static class GcsSeekableByteChannel implements SeekableByteChannel {
500+
private final ReadChannel reader;
501+
private final long size;
502+
private long position = 0;
503+
504+
GcsSeekableByteChannel(ReadChannel reader, long size) {
505+
this.reader = reader;
506+
this.size = size;
507+
this.position = 0;
508+
}
509+
510+
@Override
511+
public int read(ByteBuffer dst) throws IOException {
512+
int count = StorageChannelUtils.blockingFillFrom(dst, reader);
513+
if (count > 0) {
514+
this.position += count;
515+
}
516+
return count;
517+
}
518+
519+
@Override
520+
public SeekableByteChannel position(long newPosition) throws IOException {
521+
checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition);
522+
reader.seek(newPosition);
523+
this.position = newPosition;
524+
return this;
525+
}
526+
527+
@Override
528+
public long position() throws IOException {
529+
return this.position;
530+
}
531+
532+
@Override
533+
public long size() throws IOException {
534+
return size;
535+
}
536+
537+
@Override
538+
public SeekableByteChannel truncate(long size) throws IOException {
539+
throw new UnsupportedOperationException(
540+
"GcsSeekableByteChannels are read-only and cannot be truncated.");
541+
}
542+
543+
@Override
544+
public int write(ByteBuffer src) throws IOException {
545+
throw new UnsupportedOperationException(
546+
"GcsSeekableByteChannel are read-only and does not support writing.");
547+
}
548+
549+
@Override
550+
public boolean isOpen() {
551+
return reader.isOpen();
552+
}
553+
554+
@Override
555+
public void close() throws IOException {
556+
if (isOpen()) {
557+
reader.close();
558+
}
559+
}
560+
}
561+
562+
public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions)
563+
throws IOException {
564+
Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
565+
ReadChannel reader = blob.getStorage().reader(blob.getBlobId(), sourceOptions);
566+
// disable internal buffering, and make the channel non-blocking
567+
reader.setChunkSize(0);
568+
return new GcsSeekableByteChannel(reader, blob.getSize());
569+
}
570+
571+
/** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */
572+
private static class GcsWritableByteChannel implements WritableByteChannel {
573+
private final WriteChannel writer;
574+
private final GcsPath gcsPath;
575+
576+
GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
577+
this.writer = writer;
578+
this.gcsPath = gcsPath;
579+
}
580+
581+
@Override
582+
public int write(ByteBuffer src) throws IOException {
583+
try {
584+
return writer.write(src);
585+
} catch (StorageException e) {
586+
throw translateStorageException(gcsPath, e);
587+
}
588+
}
589+
590+
@Override
591+
public boolean isOpen() {
592+
return writer.isOpen();
593+
}
594+
595+
@Override
596+
public void close() throws IOException {
597+
writer.close();
598+
}
599+
}
600+
601+
public WritableByteChannel create(
602+
GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions)
603+
throws IOException {
604+
try {
605+
// Define the metadata for the new object
606+
BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject());
607+
String type = options.getContentType();
608+
if (type != null) {
609+
builder.setContentType(type);
610+
}
611+
612+
BlobInfo blobInfo = builder.build();
613+
614+
List<BlobWriteOption> writeOptionList = new ArrayList<>(Arrays.asList(writeOptions));
615+
if (options.getExpectFileToNotExist()) {
616+
writeOptionList.add(BlobWriteOption.doesNotExist());
617+
} else {
618+
// We do not merge this check with the getExpectFileToNotExist() branch above
619+
// because we don't want to always make the storage.get() RPC call.
620+
Blob blob = storage.get(path.getBucket(), path.getObject());
621+
if (blob == null) {
622+
writeOptionList.add(BlobWriteOption.doesNotExist());
623+
} else {
624+
writeOptionList.add(BlobWriteOption.generationMatch(blob.getGeneration()));
625+
}
626+
}
627+
// Open a WriteChannel from the storage service
628+
WriteChannel writer =
629+
storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0]));
630+
Integer uploadBufferSizeBytes =
631+
options.getUploadBufferSizeBytes() != null
632+
? options.getUploadBufferSizeBytes()
633+
: this.uploadBufferSizeBytes;
634+
if (uploadBufferSizeBytes != null) {
635+
writer.setChunkSize(uploadBufferSizeBytes);
636+
}
637+
638+
// Return the bridge wrapper
639+
return new GcsWritableByteChannel(writer, path);
640+
641+
} catch (StorageException e) {
642+
throw translateStorageException(path, e);
643+
}
644+
}
484645
}

0 commit comments

Comments
 (0)