Skip to content

Commit bd890c0

Browse files
zymapsrinath-ctds
authored andcommitted
Revert "Revert "[improve][offload] Use filesystemURI as the storage path (apache#23591)""
This reverts commit bb909e6. (cherry picked from commit 720135b)
1 parent 7e58085 commit bd890c0

File tree

5 files changed

+170
-3
lines changed

5 files changed

+170
-3
lines changed

tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ protected Map<String, String> getEnv() {
4949
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
5050
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
5151
result.put("managedLedgerOffloadDriver", "filesystem");
52-
result.put("fileSystemURI", "file:///");
52+
result.put("fileSystemURI", "file:///pulsar/data");
5353

5454
return result;
5555
}

tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected Map<String, String> getEnv() {
9696
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
9797
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
9898
result.put("managedLedgerOffloadDriver", "filesystem");
99-
result.put("fileSystemURI", "file:///");
99+
result.put("fileSystemURI", "file:///pulsar/data");
100100

101101
return result;
102102
}

tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu
105105

106106
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
107107
this.driverName = conf.getManagedLedgerOffloadDriver();
108-
this.storageBasePath = configuration.get("hadoop.tmp.dir");
108+
this.storageBasePath = configuration.get("fs.defaultFS");
109109
this.scheduler = scheduler;
110110
this.fileSystem = FileSystem.get(configuration);
111111
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.offload.filesystem.impl;
20+
21+
import static org.testng.Assert.assertEquals;
22+
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.util.HashMap;
27+
import java.util.Iterator;
28+
import java.util.Map;
29+
import java.util.UUID;
30+
import org.apache.bookkeeper.client.BookKeeper;
31+
import org.apache.bookkeeper.client.LedgerHandle;
32+
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
33+
import org.apache.bookkeeper.client.api.DigestType;
34+
import org.apache.bookkeeper.client.api.LedgerEntries;
35+
import org.apache.bookkeeper.client.api.LedgerEntry;
36+
import org.apache.bookkeeper.client.api.ReadHandle;
37+
import org.apache.bookkeeper.common.util.OrderedScheduler;
38+
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
39+
import org.apache.pulsar.common.naming.TopicName;
40+
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
41+
import org.testng.annotations.Test;
42+
43+
public class FileSystemOffloaderLocalFileTest {
44+
private OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
45+
private LedgerOffloaderStats offloaderStats = LedgerOffloaderStats.create(true, true, scheduler, 60);
46+
47+
48+
private String getResourceFilePath(String name) {
49+
return getClass().getClassLoader().getResource(name).getPath();
50+
}
51+
52+
@Test
53+
public void testReadWriteWithLocalFileUsingFileSystemURI() throws Exception {
54+
// prepare the offload policies
55+
final String basePath = "/tmp";
56+
OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
57+
offloadPolicies.setFileSystemURI("file://" + basePath);
58+
offloadPolicies.setManagedLedgerOffloadDriver("filesystem");
59+
offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml"));
60+
61+
// initialize the offloader with the offload policies
62+
var offloader = FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats);
63+
64+
int numberOfEntries = 100;
65+
66+
// prepare the data in bookkeeper
67+
BookKeeper bk = new PulsarMockBookKeeper(scheduler);
68+
LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "".getBytes());
69+
for (int i = 0; i < numberOfEntries; i++) {
70+
byte[] entry = ("foobar"+i).getBytes();
71+
lh.addEntry(entry);
72+
}
73+
lh.close();
74+
75+
ReadHandle read = bk.newOpenLedgerOp()
76+
.withLedgerId(lh.getId())
77+
.withDigestType(DigestType.CRC32)
78+
.withPassword("".getBytes()).execute().get();
79+
80+
final String mlName = TopicName.get("testWriteLocalFIle").getPersistenceNamingEncoding();
81+
Map<String, String> offloadDriverMetadata = new HashMap<>();
82+
offloadDriverMetadata.put("ManagedLedgerName", mlName);
83+
84+
UUID uuid = UUID.randomUUID();
85+
offloader.offload(read, uuid, offloadDriverMetadata).get();
86+
ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, offloadDriverMetadata).get();
87+
assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed());
88+
LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
89+
LedgerEntries toWriteEntries = read.read(0,numberOfEntries - 1);
90+
Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
91+
Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
92+
while(toTestIter.hasNext()) {
93+
LedgerEntry toWriteEntry = toWriteIter.next();
94+
LedgerEntry toTestEntry = toTestIter.next();
95+
96+
assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
97+
assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
98+
assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
99+
assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
100+
}
101+
toTestEntries = toTest.read(1, numberOfEntries - 1);
102+
toWriteEntries = read.read(1,numberOfEntries - 1);
103+
toTestIter = toTestEntries.iterator();
104+
toWriteIter = toWriteEntries.iterator();
105+
while(toTestIter.hasNext()) {
106+
LedgerEntry toWriteEntry = toWriteIter.next();
107+
LedgerEntry toTestEntry = toTestIter.next();
108+
109+
assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
110+
assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
111+
assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
112+
assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
113+
}
114+
115+
// check the file located in the local file system
116+
Path offloadedFilePath = Paths.get(basePath, mlName);
117+
assertEquals(Files.exists(offloadedFilePath), true);
118+
}
119+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
<configuration>
22+
<!--file system uri, necessary-->
23+
<property>
24+
<name>fs.defaultFS</name>
25+
<value></value>
26+
</property>
27+
<property>
28+
<name>hadoop.tmp.dir</name>
29+
<value>pulsar</value>
30+
</property>
31+
<property>
32+
<name>io.file.buffer.size</name>
33+
<value>4096</value>
34+
</property>
35+
<property>
36+
<name>io.seqfile.compress.blocksize</name>
37+
<value>1000000</value>
38+
</property>
39+
<property>
40+
<name>io.seqfile.compression.type</name>
41+
<value>BLOCK</value>
42+
</property>
43+
<property>
44+
<name>io.map.index.interval</name>
45+
<value>128</value>
46+
</property>
47+
48+
</configuration>

0 commit comments

Comments
 (0)