22
22
import java .io .FileNotFoundException ;
23
23
import java .io .FileOutputStream ;
24
24
import java .io .IOException ;
25
+ import java .net .URI ;
25
26
import java .net .URL ;
26
27
import java .nio .charset .StandardCharsets ;
27
28
import java .util .ArrayList ;
74
75
75
76
import static org .apache .hadoop .fs .s3a .S3ATestUtils .disableFilesystemCaching ;
76
77
import static org .apache .hadoop .fs .s3a .S3ATestUtils .lsR ;
78
+ import static org .apache .hadoop .fs .s3a .S3ATestUtils .removeBaseAndBucketOverrides ;
77
79
import static org .apache .hadoop .fs .s3a .S3AUtils .applyLocatedFiles ;
78
80
import static org .apache .hadoop .fs .s3a .commit .CommitConstants .FS_S3A_COMMITTER_STAGING_TMP_PATH ;
79
81
import static org .apache .hadoop .fs .s3a .commit .CommitConstants .MAGIC_PATH_PREFIX ;
82
84
import static org .apache .hadoop .fs .s3a .commit .staging .Paths .getMultipartUploadCommitsDirectory ;
83
85
import static org .apache .hadoop .fs .s3a .commit .staging .StagingCommitterConstants .STAGING_UPLOADS ;
84
86
import static org .apache .hadoop .mapred .JobConf .MAPRED_TASK_ENV ;
87
+ import static org .apache .hadoop .mapreduce .lib .input .FileInputFormat .LIST_STATUS_NUM_THREADS ;
85
88
86
89
/**
87
90
* Test an MR Job with all the different committers.
105
108
* <li>
106
109
* The test suites are declared to be executed in ascending order, so
107
110
* that for a specific binding, the order is
108
- * {@link #test_000(CommitterTestBinding )},
109
- * {@link #test_100(CommitterTestBinding )}
110
- * {@link #test_200_execute(CommitterTestBinding, java.nio.file.Path )} and finally
111
- * {@link #test_500(CommitterTestBinding )}.
111
+ * {@link #test_000()},
112
+ * {@link #test_100()}
113
+ * {@link #test_200_execute()} and finally
114
+ * {@link #test_500()}.
112
115
* </li>
113
116
* <li>
114
- * {@link #test_000(CommitterTestBinding )} calls
117
+ * {@link #test_000()} calls
115
118
* {@link CommitterTestBinding#validate()} to
116
119
* as to validate the state of the committer. This is primarily to
117
120
* verify that the binding setup mechanism is working.
118
121
* </li>
119
122
* <li>
120
- * {@link #test_100(CommitterTestBinding )} is relayed to
123
+ * {@link #test_100()} is relayed to
121
124
* {@link CommitterTestBinding#test_100()},
122
125
* for any preflight tests.
123
126
* </li>
124
127
* <li>
125
- * The {@link #test_200_execute(CommitterTestBinding, java.nio.file.Path )}
128
+ * The {@link #test_200_execute()}
126
129
* test runs the MR job for that
127
130
* particular binding with standard reporting and verification of the
128
131
* outcome.
@@ -165,6 +168,9 @@ public static Collection<Object[]> params() {
165
168
*/
166
169
private final CommitterTestBinding committerTestBinding ;
167
170
171
+ @ TempDir
172
+ private java .nio .file .Path localFilesDir ;
173
+
168
174
/**
169
175
* Parameterized constructor.
170
176
* @param committerTestBinding binding for the test.
@@ -186,6 +192,9 @@ public void setup() throws Exception {
186
192
protected Configuration createConfiguration () {
187
193
Configuration conf = super .createConfiguration ();
188
194
disableFilesystemCaching (conf );
195
+ removeBaseAndBucketOverrides (conf ,
196
+ LIST_STATUS_NUM_THREADS );
197
+ conf .setInt (LIST_STATUS_NUM_THREADS , 16 );
189
198
return conf ;
190
199
}
191
200
@@ -208,9 +217,9 @@ public void test_100() throws Throwable {
208
217
}
209
218
210
219
@ Test
211
- public void test_200_execute (
212
- @ TempDir java .nio .file .Path localFilesDir ) throws Exception {
220
+ public void test_200_execute () throws Exception {
213
221
describe ("Run an MR with committer %s" , committerName ());
222
+ LOG .info ("Local Temp directory is {}" , localFilesDir );
214
223
215
224
S3AFileSystem fs = getFileSystem ();
216
225
// final dest is in S3A
@@ -253,8 +262,10 @@ public void test_200_execute(
253
262
jobConf .set (FS_S3A_COMMITTER_UUID , commitUUID );
254
263
255
264
mrJob .setInputFormatClass (TextInputFormat .class );
256
- FileInputFormat .addInputPath (mrJob ,
257
- new Path (localFilesDir .getRoot ().toUri ()));
265
+
266
+ final URI inputPath = localFilesDir .toUri ();
267
+ LOG .info ("Job input path {}" , inputPath );
268
+ FileInputFormat .addInputPath (mrJob , new Path (inputPath ));
258
269
259
270
mrJob .setMapperClass (MapClass .class );
260
271
mrJob .setNumReduceTasks (0 );
0 commit comments