11package com .scalar .db .storage .objectstorage .cloudstorage ;
22
33import com .google .api .gax .paging .Page ;
4- import com .google .auth .oauth2 .ServiceAccountCredentials ;
54import com .google .cloud .WriteChannel ;
65import com .google .cloud .storage .Blob ;
76import com .google .cloud .storage .BlobId ;
1615import com .scalar .db .storage .objectstorage .ObjectStorageWrapperResponse ;
1716import com .scalar .db .storage .objectstorage .PreconditionFailedException ;
1817import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
19- import java .io .ByteArrayInputStream ;
2018import java .io .IOException ;
2119import java .nio .ByteBuffer ;
2220import java .nio .charset .StandardCharsets ;
@@ -36,22 +34,10 @@ public class CloudStorageWrapper implements ObjectStorageWrapper {
3634 private final Integer parallelUploadBlockSizeInBytes ;
3735
3836 public CloudStorageWrapper (CloudStorageConfig config ) {
39- ServiceAccountCredentials credentials ;
40- if (config .getPassword () == null ) {
41- throw new RuntimeException (
42- "Service account credentials are not provided in the password field" );
43- }
44- try (ByteArrayInputStream keyStream =
45- new ByteArrayInputStream (config .getPassword ().getBytes (StandardCharsets .UTF_8 ))) {
46- credentials = ServiceAccountCredentials .fromStream (keyStream );
47- } catch (IOException e ) {
48- throw new RuntimeException ("Failed to load the service account credentials" , e );
49- }
50-
5137 storage =
5238 StorageOptions .newBuilder ()
5339 .setProjectId (config .getProjectId ())
54- .setCredentials (credentials )
40+ .setCredentials (config . getCredentials () )
5541 .build ()
5642 .getService ();
5743 bucket = config .getBucket ();
@@ -108,18 +94,8 @@ public Set<String> getKeys(String prefix) throws ObjectStorageWrapperException {
10894 @ Override
10995 public void insert (String key , String object ) throws ObjectStorageWrapperException {
11096 try {
111- byte [] data = object .getBytes (StandardCharsets .UTF_8 );
112- BlobInfo blobInfo = BlobInfo .newBuilder (BlobId .of (bucket , key )).build ();
11397 Storage .BlobWriteOption precondition = Storage .BlobWriteOption .doesNotExist ();
114- try (WriteChannel writer = storage .writer (blobInfo , precondition )) {
115- if (parallelUploadBlockSizeInBytes != null ) {
116- writer .setChunkSize (parallelUploadBlockSizeInBytes );
117- }
118- ByteBuffer buffer = ByteBuffer .wrap (data );
119- while (buffer .hasRemaining ()) {
120- writer .write (buffer );
121- }
122- }
98+ writeData (key , object , precondition );
12399 } catch (StorageException e ) {
124100 if (e .getCode () == CloudStorageErrorCode .PRECONDITION_FAILED .get ()) {
125101 throw new PreconditionFailedException (
@@ -139,19 +115,9 @@ public void insert(String key, String object) throws ObjectStorageWrapperExcepti
139115 public void update (String key , String object , String version )
140116 throws ObjectStorageWrapperException {
141117 try {
142- byte [] data = object .getBytes (StandardCharsets .UTF_8 );
143- BlobInfo blobInfo = BlobInfo .newBuilder (BlobId .of (bucket , key )).build ();
144118 Storage .BlobWriteOption precondition =
145119 Storage .BlobWriteOption .generationMatch (Long .parseLong (version ));
146- try (WriteChannel writer = storage .writer (blobInfo , precondition )) {
147- if (parallelUploadBlockSizeInBytes != null ) {
148- writer .setChunkSize (parallelUploadBlockSizeInBytes );
149- }
150- ByteBuffer buffer = ByteBuffer .wrap (data );
151- while (buffer .hasRemaining ()) {
152- writer .write (buffer );
153- }
154- }
120+ writeData (key , object , precondition );
155121 } catch (StorageException e ) {
156122 if (e .getCode () == CloudStorageErrorCode .PRECONDITION_FAILED .get ()) {
157123 throw new PreconditionFailedException (
@@ -246,4 +212,20 @@ public void close() throws ObjectStorageWrapperException {
246212 throw new ObjectStorageWrapperException ("Failed to close the storage wrapper" , e );
247213 }
248214 }
215+
216+ private void writeData (String key , String object , Storage .BlobWriteOption precondition )
217+ throws IOException {
218+ byte [] data = object .getBytes (StandardCharsets .UTF_8 );
219+ BlobInfo blobInfo = BlobInfo .newBuilder (BlobId .of (bucket , key )).build ();
220+
221+ try (WriteChannel writer = storage .writer (blobInfo , precondition )) {
222+ if (parallelUploadBlockSizeInBytes != null ) {
223+ writer .setChunkSize (parallelUploadBlockSizeInBytes );
224+ }
225+ ByteBuffer buffer = ByteBuffer .wrap (data );
226+ while (buffer .hasRemaining ()) {
227+ writer .write (buffer );
228+ }
229+ }
230+ }
249231}
0 commit comments