@@ -64,12 +64,6 @@ class UserDataProcessor:
6464 return None
6565```
6666
67- A python class that derives from UserDataProcessor class and implements
68- the above interface is passed into one of the two main command line utilities:
69- - pvapy-hpc-consumer: used for splitting streams and processing stream objects
70- - pvapy-hpc-collector: used for gathering streams, sorting and processing
71- stream objects
72-
7367A working example of a simple processor that rotates Area Detector images
7468can be found [ here] ( ../examples/hpcAdImageProcessorExample.py ) . There are
7569also several processor classes for Area Detector images that can be used
@@ -81,7 +75,14 @@ images
8175- [ AD Output File Processor] ( ../pvapy/hpc/adOutputFileProcessor.py ) : saves output files
8276
8377The encryptor and decryptor processors require python 'rsa' and 'pycryptodome'
84- packages for encryption utilities.
78+ packages for encryption utilities, while the output file processor requires python
79+ 'PIL' module ('pillow' package).
80+
81+ A python class that derives from UserDataProcessor class and implements
82+ the above interface is passed into one of the two main command line utilities:
83+ - pvapy-hpc-consumer: used for splitting streams and processing stream objects
84+ - pvapy-hpc-collector: used for gathering streams, sorting and processing
85+ stream objects
8586
8687In addition to the above consumer and collector commands, the streaming
8788framework also relies on the following:
@@ -544,6 +545,82 @@ Once the data source starts publishing images, they will be streamed through
544545the workflow and processed by each stage, resulting in files being saved
545546in the designated output folder.
546547
548+ ### Parallel Processing Chains
549+
550+ In certain cases (e.g., when reducing client load on the PVA server is needed)
551+ it might be beneficial to split the raw data stream into multiple
552+ parallel streams before processing it. This can be accomplished by inserting a
553+ passthrough/distributor stage in front of the processing stages.
554+
555+ <p align =" center " >
556+ <img alt =" Parallel Processing Chains " src =" images/StreamingFrameworkParallelProcessingChains.jpg " >
557+ </p >
558+
559+ On terminal 1, start two consumers splitting the incoming 'pvapy: image ' stream to two 'passthrough:* : output '
560+ channels:
561+
562+ ``` sh
563+ $ pvapy-hpc-consumer \
564+ --input-channel pvapy:image \
565+ --control-channel passthrough:* :control \
566+ --status-channel passthrough:* :status \
567+ --output-channel passthrough:* :output \
568+ --processor-class pvapy.hpc.userDataProcessor.UserDataProcessor \
569+ --server-queue-size 100 \
570+ --report-period 10 \
571+ --distributor-updates 8 \
572+ --n-consumers 2
573+ ```
574+
575+ In this case we are splitting the stream between the two passthrough consumers
576+ in batches of 8 frames.
577+
578+ On terminal 2, start the first set of processing consumers (1-4) using the following command:
579+
580+ ``` sh
581+ $ pvapy-hpc-consumer \
582+ --consumer-id 1 \
583+ --input-channel passthrough:1:output \
584+ --control-channel consumer:* :control \
585+ --status-channel consumer:* :status \
586+ --output-channel consumer:* :output \
587+ --processor-file /path/to/hpcAdImageProcessorExample.py \
588+ --processor-class HpcAdImageProcessor \
589+ --report-period 10 \
590+ --server-queue-size 100 \
591+ --n-consumers 4 \
592+ --distributor-updates 8 \
593+ --oid-offset 57
594+ ```
595+
596+ On terminal 3, start the second set of processing consumers (5-8) using the following command:
597+
598+ ``` sh
599+ $ pvapy-hpc-consumer \
600+ --consumer-id 5 \
601+ --input-channel passthrough:2:output \
602+ --control-channel consumer:* :control \
603+ --status-channel consumer:* :status \
604+ --output-channel consumer:* :output \
605+ --processor-file /path/to/hpcAdImageProcessorExample.py \
606+ --processor-class HpcAdImageProcessor \
607+ --report-period 10 \
608+ --server-queue-size 100 \
609+ --n-consumers 4 \
610+ --distributor-updates 8 \
611+ --oid-offset 57
612+ ```
613+
614+ On terminal 4 generate images on the 'pvapy: image ' channel:
615+
616+ ``` sh
617+ $ pvapy-ad-sim-server -cn pvapy:image -nx 128 -ny 128 -dt uint8 -rt 60 -fps 8000 -rp 8000
618+ ```
619+
620+ Once the simulation server starts publishing images, the consumer statistics
621+ should indicate that the passthrough and processing consumers are receiving frames
622+ at one half and one eighth of the original input stream data rate, respectively.
623+
547624### Data Collector
548625
549626In this example we show how to use the 'pvapy-hpc-data-collector' utility
@@ -652,9 +729,28 @@ channel names/PvObject queues.
652729
653730This example uses [ sample AD metadata processor] ( ../examples/hpcAdMetadataProcessorExample.py ) module which is capable of
654731associating images with available metadata based on their timestamp comparison, and producing NtNdArray objects
655- that contain additional metadata attributes. To see how it works,
656- download the sample metadata processor and start data collector on terminal 1
657- using the following command:
732+ that contain additional metadata attributes. Note that the streaming framework itself does not care what structure
733+ metadata channels produce, as anything that comes out of metadata channels is simply added
734+ to metadata queues. However, the actual user processor must know the structure of the metadata PvObject in
735+ order to make use of it. In particular, the sample metadata processor works with objects produced
736+ by the simulation server that have the following structure for both CA and PVA metadata channels:
737+
738+ ``` sh
739+ $ pvinfo x
740+ x
741+ Server: ...
742+ Type:
743+ structure
744+ double value
745+ time_t timeStamp
746+ long secondsPastEpoch
747+ int nanoseconds
748+ int userTag
749+ ```
750+
751+ To see how things work in this example,
752+ [ download] ( ../examples/hpcAdMetadataProcessorExample.py ) the sample metadata processor
753+ and start data collector on terminal 1 using the following command:
658754
659755``` sh
660756$ pvapy-hpc-collector \
@@ -689,7 +785,7 @@ $ pvget pvapy:image # original image, no metadata
689785$ pvget collector:1:output # should contain x,y,z metadata
690786```
691787
692- Note that the generated PVA metadata channels have a structure containing value and timestamp:
788+ Keep in mind that the generated PVA metadata channels have a structure containing value and timestamp:
693789
694790``` sh
695791$ pvinfo x
@@ -769,7 +865,7 @@ $ pvapy-ad-sim-server \
769865Processing speed gains are not linear when compared to the single consumer case, because
770866each consumer receives alternate set of images and all metadata values, and hence some
771867metadata values will have to be discarded. This will be reflected in the metadata
772- processor statistics.
868+ processor statistics.
773869
774870### Data Encryption
775871
@@ -1040,6 +1136,8 @@ and process are shown below:
10401136
10411137As the number of data consumers increases, number of metadata updates that each consumer has to
10421138discard increases as well, and hence gains in processing capabilities and in the corresponding
1043- data throughput are getting smaller. Also, note that some optimization could be achieved by batching
1044- sequential images received by consumers (e.g., using '--distributor-updates 10' option).
1139+ data throughput are getting smaller. Some optimization could be achieved by batching
1140+ sequential images received by consumers (e.g., using '--distributor-updates 10' option), as well as
1141+ by reducing client load on the image data source via the mirror server or by using the parallel
1142+ processing chains as in one of the previous examples.
10451143
0 commit comments