Skip to content

Commit d3a5a6c

Browse files
committed
CNDB-14950 follow up for SequentialWriter
1 parent 3b91075 commit d3a5a6c

File tree

4 files changed

+428
-4
lines changed

4 files changed

+428
-4
lines changed

src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public EncryptedSequentialWriter(File file,
9292
SequentialWriterOption option,
9393
ICompressor encryptor)
9494
{
95-
super(file, SequentialWriterOption.newBuilder()
95+
super(file, true, SequentialWriterOption.newBuilder()
9696
.bufferSize(maxBytesInPage(encryptor))
9797
.bufferType(BufferType.preferredForCompression())
9898
.finishOnClose(option.finishOnClose())

src/java/org/apache/cassandra/io/util/SequentialWriter.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,25 @@ protected Throwable doAbort(Throwable accumulate)
105105

106106
// TODO: we should specify as a parameter if we permit an existing file or not
107107
private static FileChannel openChannel(File file)
108+
{
109+
return openChannel(file, true);
110+
}
111+
112+
private static FileChannel openChannel(File file, boolean readable)
108113
{
109114
try
110115
{
111116
if (file.exists())
112117
{
113-
return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
118+
StandardOpenOption[] options = readable ? new StandardOpenOption[] {StandardOpenOption.WRITE, StandardOpenOption.READ}
119+
: new StandardOpenOption[] {StandardOpenOption.WRITE};
120+
return FileChannel.open(file.toPath(), options);
114121
}
115122
else
116123
{
117-
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
124+
StandardOpenOption[] options = readable ? new StandardOpenOption[] {StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW}
125+
: new StandardOpenOption[] {StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW};
126+
FileChannel channel = FileChannel.open(file.toPath(), options);
118127
try
119128
{
120129
SyncUtil.trySyncDir(file.parent());
@@ -162,7 +171,12 @@ public SequentialWriter(File file, SequentialWriterOption option)
162171
*/
163172
public SequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing)
164173
{
165-
super(openChannel(file), option.allocateBuffer());
174+
this(file, true, option, strictFlushing);
175+
}
176+
177+
public SequentialWriter(File file, boolean readable, SequentialWriterOption option, boolean strictFlushing)
178+
{
179+
super(openChannel(file, readable), option.allocateBuffer());
166180
this.strictFlushing = strictFlushing;
167181
this.fchannel = (FileChannel)channel;
168182

@@ -295,6 +309,29 @@ public long paddedPosition()
295309
return PageAware.padded(position());
296310
}
297311

312+
public void updateFileHandle(FileHandle.Builder fhBuilder)
313+
{
314+
updateFileHandle(fhBuilder, -1);
315+
}
316+
317+
public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
318+
{
319+
// Set actual length to avoid having to read it off the file system.
320+
fhBuilder.withLengthOverride(dataLength > 0 ? dataLength : lastFlushOffset);
321+
}
322+
323+
/**
324+
* Some writers cannot feasibly calculate the exact length of a file. If any user needs to be able to store
325+
* metadata at the end, they should use this function to ensure the content to be written can be addressed
326+
* using `fileLength - bytesNeeded`.
327+
*
328+
* See PartitionIndexBuilder#complete and PartitionIndex#load for usage example.
329+
*/
330+
public void establishEndAddressablePosition(int bytesNeeded) throws IOException
331+
{
332+
// Nothing to do when file length can be exactly determined.
333+
}
334+
298335
/**
299336
* Returns the current file pointer of the underlying on-disk file.
300337
* Note that since write works by buffering data, the value of this will increase by buffer
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.io.compress;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.nio.channels.FileChannel;
23+
import java.nio.file.StandardOpenOption;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.ThreadLocalRandom;
27+
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
31+
import org.apache.cassandra.config.DatabaseDescriptor;
32+
import org.apache.cassandra.io.util.File;
33+
import org.apache.cassandra.io.util.FileUtils;
34+
import org.apache.cassandra.io.util.SequentialWriterOption;
35+
import org.apache.cassandra.schema.CompressionParams;
36+
37+
import static org.junit.Assert.*;
38+
39+
public class EncryptedSequentialWriterReadableTest
40+
{
41+
static final ICompressor AESEncryptor;
42+
static final CompressionParams AESEncryptorParams;
43+
44+
static
45+
{
46+
DatabaseDescriptor.daemonInitialization();
47+
48+
Map<String, String> opts = new HashMap<>();
49+
opts.put(CompressionParams.CLASS, Encryptor.class.getName());
50+
opts.put(EncryptionConfig.CIPHER_ALGORITHM, "AES/CBC/PKCS5Padding");
51+
opts.put(EncryptionConfig.SECRET_KEY_STRENGTH, Integer.toString(128));
52+
opts.put(EncryptionConfig.KEY_PROVIDER, EncryptorTest.KeyProviderFactoryStub.class.getName());
53+
AESEncryptor = Encryptor.create(opts);
54+
AESEncryptorParams = CompressionParams.fromMap(opts);
55+
}
56+
57+
@BeforeClass
58+
public static void setupDD()
59+
{
60+
// Already initialized in static block
61+
}
62+
63+
@Test
64+
public void testEncryptedWriterWithReadableChannel() throws IOException
65+
{
66+
File tempFile = FileUtils.createTempFile("encrypted", "test");
67+
tempFile.tryDelete();
68+
69+
byte[] testData = new byte[8192];
70+
ThreadLocalRandom.current().nextBytes(testData);
71+
72+
ICompressor encryptor = AESEncryptor;
73+
SequentialWriterOption option = SequentialWriterOption.newBuilder()
74+
.bufferSize(1024)
75+
.build();
76+
77+
try (EncryptedSequentialWriter writer = new EncryptedSequentialWriter(tempFile, option, encryptor))
78+
{
79+
writer.write(testData);
80+
writer.finish();
81+
82+
assertTrue(tempFile.exists());
83+
assertTrue(tempFile.length() > 0);
84+
85+
try (FileChannel channel = FileChannel.open(tempFile.toPath(), StandardOpenOption.READ))
86+
{
87+
ByteBuffer headerBuffer = ByteBuffer.allocate(128);
88+
int bytesRead = channel.read(headerBuffer);
89+
assertTrue(bytesRead > 0);
90+
}
91+
}
92+
finally
93+
{
94+
tempFile.tryDelete();
95+
}
96+
}
97+
98+
@Test
99+
public void testEncryptedWriterConstructorPassesReadableTrue() throws IOException
100+
{
101+
File tempFile = FileUtils.createTempFile("encrypted2", "test");
102+
tempFile.tryDelete();
103+
104+
ICompressor encryptor = AESEncryptor;
105+
SequentialWriterOption option = SequentialWriterOption.newBuilder()
106+
.bufferSize(1024)
107+
.finishOnClose(true)
108+
.build();
109+
110+
byte[] testData = "test data for encryption".getBytes();
111+
112+
try (EncryptedSequentialWriter writer = new EncryptedSequentialWriter(tempFile, option, encryptor))
113+
{
114+
writer.write(testData);
115+
}
116+
117+
assertTrue(tempFile.exists());
118+
assertTrue(tempFile.length() > testData.length);
119+
120+
tempFile.tryDelete();
121+
}
122+
123+
}

0 commit comments

Comments
 (0)