|
| 1 | +/* |
| 2 | + * dCache - http://www.dcache.org/ |
| 3 | + * |
| 4 | + * Copyright (C) 2025 Deutsches Elektronen-Synchrotron |
| 5 | + * |
| 6 | + * This program is free software: you can redistribute it and/or modify |
| 7 | + * it under the terms of the GNU Affero General Public License as |
| 8 | + * published by the Free Software Foundation, either version 3 of the |
| 9 | + * License, or (at your option) any later version. |
| 10 | + * |
| 11 | + * This program is distributed in the hope that it will be useful, |
| 12 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 13 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 14 | + * GNU Affero General Public License for more details. |
| 15 | + * |
| 16 | + * You should have received a copy of the GNU Affero General Public License |
| 17 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 18 | + */ |
| 19 | +package org.dcache.util.list; |
| 20 | + |
| 21 | +import com.google.common.collect.Range; |
| 22 | +import diskCacheV111.util.CacheException; |
| 23 | +import diskCacheV111.util.FsPath; |
| 24 | +import diskCacheV111.util.PnfsHandler; |
| 25 | +import dmg.cells.nucleus.CellMessageReceiver; |
| 26 | +import java.util.Iterator; |
| 27 | +import java.util.Map; |
| 28 | +import java.util.NoSuchElementException; |
| 29 | +import java.util.Set; |
| 30 | +import java.util.UUID; |
| 31 | +import java.util.concurrent.BlockingQueue; |
| 32 | +import java.util.concurrent.ConcurrentHashMap; |
| 33 | +import java.util.concurrent.LinkedBlockingQueue; |
| 34 | +import java.util.concurrent.TimeUnit; |
| 35 | +import javax.security.auth.Subject; |
| 36 | +import org.dcache.auth.attributes.Restriction; |
| 37 | +import org.dcache.namespace.FileAttribute; |
| 38 | +import org.dcache.util.CacheExceptionFactory; |
| 39 | + |
| 40 | +import org.dcache.vehicles.PnfsListDirectoryMessage; |
| 41 | +import org.slf4j.Logger; |
| 42 | +import org.slf4j.LoggerFactory; |
| 43 | + |
| 44 | +/** |
| 45 | + * VirtualDirectoryListSource which delegates the virtual directory listing operation to the PnfsManager. |
| 46 | + * <p> |
| 47 | + * Large virtual directories are broken into several reply messages by the PnfsManager. For that reason the |
| 48 | + * regular Cells callback mechanism for replies cannot be used. Instead messages of type |
| 49 | + * PnfsListDirectoryMessage must be routed to the VirtualDirectoryListHandler. This also has the |
| 50 | + * consequence that a VirtualDirectoryListHandler cannot be used from the Cells messages thread. Any |
| 51 | + * attempt to do so will cause the message thread to block, as the replies cannot be delivered to |
| 52 | + * the VirtualDirectoryListHandler. |
| 53 | + */ |
| 54 | +public class VirtualDirectoryListHandler |
| 55 | + implements CellMessageReceiver, VirtualDirectoryListSource { |
| 56 | + |
| 57 | + private static final Logger LOGGER = |
| 58 | + LoggerFactory.getLogger(ListDirectoryHandler.class); |
| 59 | + |
| 60 | + private final PnfsHandler _pnfs; |
| 61 | + private final Map<UUID, Stream> _replies = |
| 62 | + new ConcurrentHashMap<>(); |
| 63 | + |
| 64 | + public VirtualDirectoryListHandler(PnfsHandler pnfs) { |
| 65 | + _pnfs = pnfs; |
| 66 | + } |
| 67 | + |
| 68 | + |
| 69 | + @Override |
| 70 | + public DirectoryStream |
| 71 | + listVirtualDirectory(Subject subject, Restriction restriction, FsPath path, |
| 72 | + Range<Integer> range, Set<FileAttribute> attributes) |
| 73 | + throws InterruptedException, CacheException |
| 74 | + { |
| 75 | + String dir = path.toString(); |
| 76 | + PnfsListDirectoryMessage msg = |
| 77 | + new PnfsListDirectoryMessage(dir, null, range, attributes); |
| 78 | + UUID uuid = msg.getUUID(); |
| 79 | + boolean success = false; |
| 80 | + Stream stream = new Stream(dir, uuid); |
| 81 | + try { |
| 82 | + msg.setPathType(PnfsListDirectoryMessage.PathType.LABEL); |
| 83 | + msg.setSubject(subject); |
| 84 | + |
| 85 | + msg.setRestriction(restriction); |
| 86 | + _replies.put(uuid, stream); |
| 87 | + _pnfs.send(msg); |
| 88 | + stream.waitForMoreEntries(); |
| 89 | + success = true; |
| 90 | + return stream; |
| 91 | + } finally { |
| 92 | + if (!success) { |
| 93 | + _replies.remove(uuid); |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + /** |
| 99 | + * Callback for delivery of replies from PnfsManager. PnfsListDirectoryMessage have to be routed |
| 100 | + * to this message. |
| 101 | + */ |
| 102 | + public void messageArrived(PnfsListDirectoryMessage reply) { |
| 103 | + if (reply.isReply()) { |
| 104 | + try { |
| 105 | + UUID uuid = reply.getUUID(); |
| 106 | + VirtualDirectoryListHandler.Stream stream = _replies.get(uuid); |
| 107 | + if (stream != null) { |
| 108 | + stream.put(reply); |
| 109 | + } else { |
| 110 | + LOGGER.warn( |
| 111 | + "Received list result for an unknown request. Virtual Directory listing was possibly incomplete."); |
| 112 | + } |
| 113 | + } catch (InterruptedException e) { |
| 114 | + Thread.currentThread().interrupt(); |
| 115 | + } |
| 116 | + } |
| 117 | + } |
| 118 | + |
| 119 | + |
| 120 | + /** |
| 121 | + * Implementation of DirectoryStream, translating PnfsListDirectoryMessage replies to a stream |
| 122 | + * of DirectoryEntries. |
| 123 | + * <p> |
| 124 | + * The stream acts as its own iterator, and multiple iterators are not supported. |
| 125 | + */ |
| 126 | + public class Stream |
| 127 | + implements DirectoryStream, Iterator<DirectoryEntry> { |
| 128 | + |
| 129 | + private final BlockingQueue<PnfsListDirectoryMessage> _queue = |
| 130 | + new LinkedBlockingQueue<>(); |
| 131 | + private final UUID _uuid; |
| 132 | + private final String _path; |
| 133 | + private boolean _isFinal; |
| 134 | + private Iterator<DirectoryEntry> _iterator; |
| 135 | + private int _count; |
| 136 | + private int _total; |
| 137 | + |
| 138 | + public Stream(String path, UUID uuid) { |
| 139 | + _path = path; |
| 140 | + _uuid = uuid; |
| 141 | + } |
| 142 | + |
| 143 | + @Override |
| 144 | + public void close() { |
| 145 | + _replies.remove(_uuid); |
| 146 | + } |
| 147 | + |
| 148 | + private void put(PnfsListDirectoryMessage msg) |
| 149 | + throws InterruptedException { |
| 150 | + _queue.put(msg); |
| 151 | + } |
| 152 | + |
| 153 | + private void waitForMoreEntries() |
| 154 | + throws InterruptedException, CacheException { |
| 155 | + if (_isFinal) { |
| 156 | + _iterator = null; |
| 157 | + return; |
| 158 | + } |
| 159 | + |
| 160 | + PnfsListDirectoryMessage msg = |
| 161 | + _queue.poll(_pnfs.getPnfsTimeout(), TimeUnit.MILLISECONDS); |
| 162 | + if (msg == null) { |
| 163 | + throw new CacheException(CacheException.TIMEOUT, |
| 164 | + "Timeout during virtual directory listing."); |
| 165 | + } |
| 166 | + |
| 167 | + if (msg.isFinal()) { |
| 168 | + _total = msg.getMessageCount(); |
| 169 | + } |
| 170 | + _count++; |
| 171 | + if (_count == _total) { |
| 172 | + _isFinal = true; |
| 173 | + } |
| 174 | + |
| 175 | + if (msg.getReturnCode() != 0) { |
| 176 | + throw CacheExceptionFactory.exceptionOf(msg); |
| 177 | + } |
| 178 | + |
| 179 | + _iterator = msg.getEntries().iterator(); |
| 180 | + |
| 181 | + /* If the message is empty, then the iterator has no next |
| 182 | + * element. In that case we wait for the next reply. This |
| 183 | + * may in particular happen with the final message. |
| 184 | + */ |
| 185 | + if (!_iterator.hasNext()) { |
| 186 | + waitForMoreEntries(); |
| 187 | + } |
| 188 | + } |
| 189 | + |
| 190 | + @Override |
| 191 | + public Iterator<DirectoryEntry> iterator() { |
| 192 | + return this; |
| 193 | + } |
| 194 | + |
| 195 | + @Override |
| 196 | + public boolean hasNext() { |
| 197 | + try { |
| 198 | + if (_iterator == null || !_iterator.hasNext()) { |
| 199 | + waitForMoreEntries(); |
| 200 | + if (_iterator == null) { |
| 201 | + return false; |
| 202 | + } |
| 203 | + } |
| 204 | + } catch (CacheException e) { |
| 205 | + LOGGER.error("Listing of {} incomplete: {}", _path, e.getMessage()); |
| 206 | + return false; |
| 207 | + } catch (InterruptedException e) { |
| 208 | + Thread.currentThread().interrupt(); |
| 209 | + return false; |
| 210 | + } |
| 211 | + |
| 212 | + return true; |
| 213 | + } |
| 214 | + |
| 215 | + @Override |
| 216 | + public DirectoryEntry next() { |
| 217 | + if (!hasNext()) { |
| 218 | + throw new NoSuchElementException(); |
| 219 | + } |
| 220 | + |
| 221 | + return _iterator.next(); |
| 222 | + } |
| 223 | + |
| 224 | + @Override |
| 225 | + public void remove() { |
| 226 | + throw new UnsupportedOperationException(); |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | +} |
0 commit comments