1+ package htmlpublisher .util ;
2+
3+ import java .io .File ;
4+ import java .io .Serializable ;
5+
6+ import java .util .concurrent .atomic .AtomicInteger ;
7+ import java .util .concurrent .atomic .AtomicLong ;
8+ import java .util .concurrent .ConcurrentHashMap ;
9+ import java .util .concurrent .LinkedBlockingQueue ;
10+ import java .util .UUID ;
11+ import java .util .Map ;
12+
13+ /**
14+ * A queue that contains files
15+ *
16+ * When reading entries, the queue returns a file or waits until a file is
17+ * added.<br>
18+ * Also implements signaling the end of the queue with shutdown operations.
19+ */
20+ public class FileEntryQueue {
21+
22+ /**
23+ * Marker for the end of the queue
24+ */
25+ private static final FileEntry POISON_PILL = new FileEntry (null , null );
26+
27+ /**
28+ * Local cache that contains queues
29+ */
30+ private static final Map <UUID , FileEntryQueue > queues = new ConcurrentHashMap <>();
31+
32+ /**
33+ * Internal queue for managing the entries
34+ */
35+ private final LinkedBlockingQueue <FileEntry > queue = new LinkedBlockingQueue <>();
36+
37+ /**
38+ * Number of all added files since the creation of this queue
39+ */
40+ private final AtomicInteger overallCount = new AtomicInteger (0 );
41+
42+ /**
43+ * Size of all added files since the creation of this queue
44+ */
45+ private final AtomicLong overallSize = new AtomicLong (0 );
46+
47+ /**
48+ * An entry in a queue
49+ */
50+ public static class FileEntry implements Serializable {
51+
52+ private File file ;
53+ private String relativePath ;
54+
55+ public FileEntry (File file , String relativePath ) {
56+
57+ this .file = file ;
58+ this .relativePath = relativePath ;
59+
60+ }
61+
62+ public File getFile () {
63+ return this .file ;
64+ }
65+
66+ public String getRelativePath () {
67+ return this .relativePath ;
68+ }
69+
70+ private static final long serialVersionUID = 1L ;
71+
72+ }
73+
74+ /**
75+ * Some statistical data about the queue
76+ */
77+ public static class Statistic implements Serializable {
78+
79+ private int overallCount ;
80+ private long overallSize ;
81+
82+ public Statistic (int overallCount , long overallSize ) {
83+ this .overallCount = overallCount ;
84+ this .overallSize = overallSize ;
85+ }
86+
87+ public int getOverallCount () {
88+ return this .overallCount ;
89+ }
90+
91+ public long getOverallSize () {
92+ return this .overallSize ;
93+ }
94+
95+ private static final long serialVersionUID = 1L ;
96+
97+ }
98+
99+ /**
100+ * Get the queue specified by the key from the cache. If the key does not exist,
101+ * a new queue will be created and added to the cache. The queue cache is
102+ * located only on the local machine (agent or controller) and will not be
103+ * remotely synchronized.
104+ */
105+ public static FileEntryQueue getOrCreateQueue (UUID queueKey ) {
106+
107+ return queues .computeIfAbsent (queueKey , key -> new FileEntryQueue ());
108+
109+ }
110+
111+ /**
112+ * Remove the queue specified by the key from the cache
113+ *
114+ * @return the removed queue or null, if queue was not in the cache before
115+ */
116+ public static FileEntryQueue remove (UUID queueKey ) {
117+
118+ return queues .remove (queueKey );
119+
120+ }
121+
122+ /**
123+ * Inserts the specified file into this queue if it is possible to do so
124+ * immediately without violating capacity restrictions
125+ *
126+ * @return the newly created file entry
127+ * @throws IllegalStateException if no space is currently available
128+ *
129+ */
130+ public FileEntry add (File file , String relativePath ) {
131+
132+ FileEntry entry = new FileEntry (file , relativePath );
133+
134+ this .queue .add (entry );
135+
136+ this .overallCount .incrementAndGet ();
137+ this .overallSize .addAndGet (file .length ());
138+
139+ return entry ;
140+
141+ }
142+
143+ /**
144+ * Retrieves and removes the head of this queue, waiting if necessary until an
145+ * element becomes available.
146+ *
147+ * @throws InterruptedException if the queue is closed
148+ *
149+ */
150+ public FileEntry take () throws InterruptedException {
151+
152+ FileEntry entry = this .queue .take ();
153+
154+ if (entry == POISON_PILL ) {
155+ this .queue .add (FileEntryQueue .POISON_PILL );
156+ throw new InterruptedException ();
157+ }
158+
159+ return entry ;
160+
161+ }
162+
163+ /**
164+ * Shutdown the queue, so no new work will be accepted but the existing work
165+ * remains until processed
166+ */
167+ public void shutdown () {
168+
169+ this .queue .add (POISON_PILL );
170+
171+ }
172+
173+ /**
174+ * All workers should stop there work as we want to stop as soon as possible -
175+ * regardless if there is more to do or not
176+ */
177+ public void shutdownNow () {
178+
179+ // Remove all upcoming work
180+ this .queue .clear ();
181+
182+ // Signal, that this is the end and no more work will come
183+ this .shutdown ();
184+
185+ }
186+
187+ /**
188+ * @return the number of all added files since the creation of this queue
189+ */
190+ public int getOverallCount () {
191+
192+ return this .overallCount .get ();
193+
194+ }
195+
196+ /**
197+ * @return the size of all added files since the creation of this queue
198+ */
199+ public long getOverallSize () {
200+
201+ return this .overallSize .get ();
202+
203+ }
204+
205+ /**
206+ * @return some statistic about this queue
207+ */
208+ public Statistic getStatistic () {
209+
210+ return new FileEntryQueue .Statistic (this .getOverallCount (), this .getOverallSize ());
211+
212+ }
213+
214+ }
0 commit comments