2424import java .io .IOException ;
2525import java .io .InputStream ;
2626import java .util .ArrayList ;
27+ import java .util .Collection ;
2728import java .util .Collections ;
2829import java .util .Comparator ;
30+ import java .util .HashSet ;
2931import java .util .LinkedList ;
3032import java .util .List ;
31- import java .util .HashSet ;
3233import java .util .Set ;
3334import java .util .concurrent .ExecutionException ;
3435import java .util .concurrent .ExecutorService ;
3536import java .util .concurrent .Executors ;
3637import java .util .concurrent .Future ;
3738import java .util .function .BiConsumer ;
39+ import java .util .stream .Collectors ;
3840import org .apache .hadoop .conf .Configuration ;
3941import org .apache .hadoop .fs .FSDataInputStream ;
4042import org .apache .hadoop .fs .FSDataOutputStream ;
7274import org .apache .hadoop .mapreduce .TaskAttemptContext ;
7375import org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
7476import org .apache .hadoop .mapreduce .security .TokenCache ;
77+ import org .apache .hadoop .util .ReflectionUtils ;
7578import org .apache .hadoop .util .StringUtils ;
7679import org .apache .hadoop .util .Tool ;
7780import org .apache .yetus .audience .InterfaceAudience ;
7881import org .slf4j .Logger ;
7982import org .slf4j .LoggerFactory ;
8083
84+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableList ;
85+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableSet ;
8186import org .apache .hbase .thirdparty .org .apache .commons .cli .CommandLine ;
8287import org .apache .hbase .thirdparty .org .apache .commons .cli .Option ;
8388
@@ -115,6 +120,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
115120 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group" ;
116121 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb" ;
117122 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name" ;
123+ private static final String CONF_INPUT_FILE_GROUPER_CLASS =
124+ "snapshot.export.input.file.grouper.class" ;
125+ private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS =
126+ "snapshot.export.input.file.location.resolver.class" ;
118127 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp" ;
119128 private static final String CONF_COPY_MANIFEST_THREADS =
120129 "snapshot.export.copy.references.threads" ;
@@ -152,11 +161,19 @@ static final class Options {
152161 static final Option CHMOD =
153162 new Option (null , "chmod" , true , "Change the permission of the files to the specified one." );
154163 static final Option MAPPERS = new Option (null , "mappers" , true ,
155- "Number of mappers to use during the copy (mapreduce.job.maps)." );
164+ "Number of mappers to use during the copy (mapreduce.job.maps). "
165+ + "If you provide a --custom-file-grouper, "
166+ + "then --mappers is interpreted as the number of mappers per group." );
156167 static final Option BANDWIDTH =
157168 new Option (null , "bandwidth" , true , "Limit bandwidth to this value in MB/second." );
158169 static final Option RESET_TTL =
159170 new Option (null , "reset-ttl" , false , "Do not copy TTL for the snapshot" );
171+ static final Option CUSTOM_FILE_GROUPER = new Option (null , "custom-file-grouper" , true ,
172+ "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. "
173+ + "See JavaDoc on that class for more information." );
174+ static final Option FILE_LOCATION_RESOLVER = new Option (null , "file-location-resolver" , true ,
175+ "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. "
176+ + "See JavaDoc on that class for more information." );
160177 }
161178
162179 // Export Map-Reduce Counters, to keep track of the progress
@@ -179,6 +196,54 @@ public enum ChecksumComparison {
179196 INCOMPATIBLE , // checksum comparison is not compatible.
180197 }
181198
199+ /**
200+ * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot
201+ * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation
202+ * must return a data structure that contains each input file exactly once. Files that appear in
203+ * separate entries in the top-level returned Collection are guaranteed to not be placed in the
204+ * same InputSplit. This can be used to segregate your input files by the rack or host on which
205+ * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve
206+ * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper
207+ * argument with the fully qualified class name of an implementation of CustomFileGrouper that's
208+ * on the classpath. If this argument is not used, no particular grouping logic will be applied.
209+ */
210+ @ InterfaceAudience .Public
211+ public interface CustomFileGrouper {
212+ Collection <Collection <Pair <SnapshotFileInfo , Long >>>
213+ getGroupedInputFiles (final Collection <Pair <SnapshotFileInfo , Long >> snapshotFiles );
214+ }
215+
216+ private static class NoopCustomFileGrouper implements CustomFileGrouper {
217+ @ Override
218+ public Collection <Collection <Pair <SnapshotFileInfo , Long >>>
219+ getGroupedInputFiles (final Collection <Pair <SnapshotFileInfo , Long >> snapshotFiles ) {
220+ return ImmutableList .of (snapshotFiles );
221+ }
222+ }
223+
224+ /**
225+ * If desired, you may implement a FileLocationResolver in order to influence the _location_
226+ * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The
227+ * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit
228+ * being constructed. Whatever is returned will ultimately be reported by that split's
229+ * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the
230+ * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the
231+ * --file-location-resolver argument with the fully qualified class name of an implementation of
232+ * FileLocationResolver that's on the classpath. If this argument is not used, no locations will
233+ * be attached to the InputSplits.
234+ */
235+ @ InterfaceAudience .Public
236+ public interface FileLocationResolver {
237+ Set <String > getLocationsForInputFiles (final Collection <Pair <SnapshotFileInfo , Long >> files );
238+ }
239+
240+ static class NoopFileLocationResolver implements FileLocationResolver {
241+ @ Override
242+ public Set <String > getLocationsForInputFiles (Collection <Pair <SnapshotFileInfo , Long >> files ) {
243+ return ImmutableSet .of ();
244+ }
245+ }
246+
182247 private static class ExportMapper
183248 extends Mapper <BytesWritable , NullWritable , NullWritable , NullWritable > {
184249 private static final Logger LOG = LoggerFactory .getLogger (ExportMapper .class );
@@ -710,8 +775,9 @@ private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs
710775 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
711776 * group fetch the bigger file available, iterating through groups alternating the direction.
712777 */
713- static List <List <Pair <SnapshotFileInfo , Long >>>
714- getBalancedSplits (final List <Pair <SnapshotFileInfo , Long >> files , final int ngroups ) {
778+ static List <List <Pair <SnapshotFileInfo , Long >>> getBalancedSplits (
779+ final Collection <Pair <SnapshotFileInfo , Long >> unsortedFiles , final int ngroups ) {
780+ List <Pair <SnapshotFileInfo , Long >> files = new ArrayList <>(unsortedFiles );
715781 // Sort files by size, from small to big
716782 Collections .sort (files , new Comparator <Pair <SnapshotFileInfo , Long >>() {
717783 public int compare (Pair <SnapshotFileInfo , Long > a , Pair <SnapshotFileInfo , Long > b ) {
@@ -722,7 +788,6 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>
722788
723789 // create balanced groups
724790 List <List <Pair <SnapshotFileInfo , Long >>> fileGroups = new LinkedList <>();
725- long [] sizeGroups = new long [ngroups ];
726791 int hi = files .size () - 1 ;
727792 int lo = 0 ;
728793
@@ -741,7 +806,6 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>
741806 Pair <SnapshotFileInfo , Long > fileInfo = files .get (hi --);
742807
743808 // add the hi one
744- sizeGroups [g ] += fileInfo .getSecond ();
745809 group .add (fileInfo );
746810
747811 // change direction when at the end or the beginning
@@ -755,16 +819,10 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>
755819 }
756820 }
757821
758- if (LOG .isDebugEnabled ()) {
759- for (int i = 0 ; i < sizeGroups .length ; ++i ) {
760- LOG .debug ("export split=" + i + " size=" + StringUtils .humanReadableInt (sizeGroups [i ]));
761- }
762- }
763-
764822 return fileGroups ;
765823 }
766824
767- private static class ExportSnapshotInputFormat extends InputFormat <BytesWritable , NullWritable > {
825+ static class ExportSnapshotInputFormat extends InputFormat <BytesWritable , NullWritable > {
768826 @ Override
769827 public RecordReader <BytesWritable , NullWritable > createRecordReader (InputSplit split ,
770828 TaskAttemptContext tac ) throws IOException , InterruptedException {
@@ -778,37 +836,78 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
778836 FileSystem fs = FileSystem .get (snapshotDir .toUri (), conf );
779837
780838 List <Pair <SnapshotFileInfo , Long >> snapshotFiles = getSnapshotFiles (conf , fs , snapshotDir );
839+
840+ Collection <List <Pair <SnapshotFileInfo , Long >>> balancedGroups =
841+ groupFilesForSplits (conf , snapshotFiles );
842+
843+ Class <? extends FileLocationResolver > fileLocationResolverClass =
844+ conf .getClass (CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS , NoopFileLocationResolver .class ,
845+ FileLocationResolver .class );
846+ FileLocationResolver fileLocationResolver =
847+ ReflectionUtils .newInstance (fileLocationResolverClass , conf );
848+ LOG .info ("FileLocationResolver {} will provide location metadata for each InputSplit" ,
849+ fileLocationResolverClass );
850+
851+ List <InputSplit > splits = new ArrayList <>(balancedGroups .size ());
852+ for (Collection <Pair <SnapshotFileInfo , Long >> files : balancedGroups ) {
853+ splits .add (new ExportSnapshotInputSplit (files , fileLocationResolver ));
854+ }
855+ return splits ;
856+ }
857+
858+ Collection <List <Pair <SnapshotFileInfo , Long >>> groupFilesForSplits (Configuration conf ,
859+ List <Pair <SnapshotFileInfo , Long >> snapshotFiles ) {
781860 int mappers = conf .getInt (CONF_NUM_SPLITS , 0 );
782- if (mappers == 0 && snapshotFiles .size () > 0 ) {
861+ if (mappers == 0 && ! snapshotFiles .isEmpty () ) {
783862 mappers = 1 + (snapshotFiles .size () / conf .getInt (CONF_MAP_GROUP , 10 ));
784863 mappers = Math .min (mappers , snapshotFiles .size ());
785864 conf .setInt (CONF_NUM_SPLITS , mappers );
786865 conf .setInt (MR_NUM_MAPS , mappers );
787866 }
788867
789- List <List <Pair <SnapshotFileInfo , Long >>> groups = getBalancedSplits (snapshotFiles , mappers );
790- List <InputSplit > splits = new ArrayList (groups .size ());
791- for (List <Pair <SnapshotFileInfo , Long >> files : groups ) {
792- splits .add (new ExportSnapshotInputSplit (files ));
793- }
794- return splits ;
868+ Class <? extends CustomFileGrouper > inputFileGrouperClass = conf .getClass (
869+ CONF_INPUT_FILE_GROUPER_CLASS , NoopCustomFileGrouper .class , CustomFileGrouper .class );
870+ CustomFileGrouper customFileGrouper =
871+ ReflectionUtils .newInstance (inputFileGrouperClass , conf );
872+ Collection <Collection <Pair <SnapshotFileInfo , Long >>> groups =
873+ customFileGrouper .getGroupedInputFiles (snapshotFiles );
874+
875+ LOG .info ("CustomFileGrouper {} split input files into {} groups" , inputFileGrouperClass ,
876+ groups .size ());
877+ int mappersPerGroup = groups .isEmpty () ? 1 : Math .max (mappers / groups .size (), 1 );
878+ LOG .info (
879+ "Splitting each group into {} InputSplits, "
880+ + "to achieve closest possible amount of mappers to target of {}" ,
881+ mappersPerGroup , mappers );
882+
883+ // Within each group, create splits of equal size. Groups are not mixed together.
884+ return groups .stream ().map (g -> getBalancedSplits (g , mappersPerGroup ))
885+ .flatMap (Collection ::stream ).collect (Collectors .toList ());
795886 }
796887
797- private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
888+ static class ExportSnapshotInputSplit extends InputSplit implements Writable {
889+
798890 private List <Pair <BytesWritable , Long >> files ;
891+ private String [] locations ;
799892 private long length ;
800893
801894 public ExportSnapshotInputSplit () {
802895 this .files = null ;
896+ this .locations = null ;
803897 }
804898
805- public ExportSnapshotInputSplit (final List <Pair <SnapshotFileInfo , Long >> snapshotFiles ) {
806- this .files = new ArrayList (snapshotFiles .size ());
899+ public ExportSnapshotInputSplit (final Collection <Pair <SnapshotFileInfo , Long >> snapshotFiles ,
900+ FileLocationResolver fileLocationResolver ) {
901+ this .files = new ArrayList <>(snapshotFiles .size ());
807902 for (Pair <SnapshotFileInfo , Long > fileInfo : snapshotFiles ) {
808903 this .files .add (
809904 new Pair <>(new BytesWritable (fileInfo .getFirst ().toByteArray ()), fileInfo .getSecond ()));
810905 this .length += fileInfo .getSecond ();
811906 }
907+ this .locations =
908+ fileLocationResolver .getLocationsForInputFiles (snapshotFiles ).toArray (new String [0 ]);
909+ LOG .trace ("This ExportSnapshotInputSplit has files {} of collective size {}, "
910+ + "with location hints: {}" , files , length , locations );
812911 }
813912
814913 private List <Pair <BytesWritable , Long >> getSplitKeys () {
@@ -822,7 +921,7 @@ public long getLength() throws IOException, InterruptedException {
822921
823922 @ Override
824923 public String [] getLocations () throws IOException , InterruptedException {
825- return new String [] {} ;
924+ return locations ;
826925 }
827926
828927 @ Override
@@ -837,6 +936,12 @@ public void readFields(DataInput in) throws IOException {
837936 files .add (new Pair <>(fileInfo , size ));
838937 length += size ;
839938 }
939+ int locationCount = in .readInt ();
940+ List <String > locations = new ArrayList <>(locationCount );
941+ for (int i = 0 ; i < locationCount ; ++i ) {
942+ locations .add (in .readUTF ());
943+ }
944+ this .locations = locations .toArray (new String [0 ]);
840945 }
841946
842947 @ Override
@@ -846,6 +951,10 @@ public void write(DataOutput out) throws IOException {
846951 fileInfo .getFirst ().write (out );
847952 out .writeLong (fileInfo .getSecond ());
848953 }
954+ out .writeInt (locations .length );
955+ for (String location : locations ) {
956+ out .writeUTF (location );
957+ }
849958 }
850959 }
851960
@@ -905,7 +1014,8 @@ public boolean nextKeyValue() {
9051014 */
9061015 private void runCopyJob (final Path inputRoot , final Path outputRoot , final String snapshotName ,
9071016 final Path snapshotDir , final boolean verifyChecksum , final String filesUser ,
908- final String filesGroup , final int filesMode , final int mappers , final int bandwidthMB )
1017+ final String filesGroup , final int filesMode , final int mappers , final int bandwidthMB ,
1018+ final String customFileGrouper , final String fileLocationResolver )
9091019 throws IOException , InterruptedException , ClassNotFoundException {
9101020 Configuration conf = getConf ();
9111021 if (filesGroup != null ) conf .set (CONF_FILES_GROUP , filesGroup );
@@ -921,6 +1031,12 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final Strin
9211031 conf .setInt (CONF_BANDWIDTH_MB , bandwidthMB );
9221032 conf .set (CONF_SNAPSHOT_NAME , snapshotName );
9231033 conf .set (CONF_SNAPSHOT_DIR , snapshotDir .toString ());
1034+ if (customFileGrouper != null ) {
1035+ conf .set (CONF_INPUT_FILE_GROUPER_CLASS , customFileGrouper );
1036+ }
1037+ if (fileLocationResolver != null ) {
1038+ conf .set (CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS , fileLocationResolver );
1039+ }
9241040
9251041 String jobname = conf .get (CONF_MR_JOB_NAME , "ExportSnapshot-" + snapshotName );
9261042 Job job = new Job (conf );
@@ -1021,6 +1137,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
10211137 private int filesMode = 0 ;
10221138 private int mappers = 0 ;
10231139 private boolean resetTtl = false ;
1140+ private String customFileGrouper = null ;
1141+ private String fileLocationResolver = null ;
10241142
10251143 @ Override
10261144 protected void processOptions (CommandLine cmd ) {
@@ -1043,6 +1161,12 @@ protected void processOptions(CommandLine cmd) {
10431161 verifyTarget = !cmd .hasOption (Options .NO_TARGET_VERIFY .getLongOpt ());
10441162 verifySource = !cmd .hasOption (Options .NO_SOURCE_VERIFY .getLongOpt ());
10451163 resetTtl = cmd .hasOption (Options .RESET_TTL .getLongOpt ());
1164+ if (cmd .hasOption (Options .CUSTOM_FILE_GROUPER .getLongOpt ())) {
1165+ customFileGrouper = cmd .getOptionValue (Options .CUSTOM_FILE_GROUPER .getLongOpt ());
1166+ }
1167+ if (cmd .hasOption (Options .FILE_LOCATION_RESOLVER .getLongOpt ())) {
1168+ fileLocationResolver = cmd .getOptionValue (Options .FILE_LOCATION_RESOLVER .getLongOpt ());
1169+ }
10461170 }
10471171
10481172 /**
@@ -1211,7 +1335,7 @@ public int doWork() throws IOException {
12111335 // by the HFileArchiver, since they have no references.
12121336 try {
12131337 runCopyJob (inputRoot , outputRoot , snapshotName , snapshotDir , verifyChecksum , filesUser ,
1214- filesGroup , filesMode , mappers , bandwidthMB );
1338+ filesGroup , filesMode , mappers , bandwidthMB , customFileGrouper , fileLocationResolver );
12151339
12161340 LOG .info ("Finalize the Snapshot Export" );
12171341 if (!skipTmp ) {
@@ -1269,6 +1393,8 @@ protected void addOptions() {
12691393 addOption (Options .MAPPERS );
12701394 addOption (Options .BANDWIDTH );
12711395 addOption (Options .RESET_TTL );
1396+ addOption (Options .CUSTOM_FILE_GROUPER );
1397+ addOption (Options .FILE_LOCATION_RESOLVER );
12721398 }
12731399
12741400 public static void main (String [] args ) {
0 commit comments