Skip to content

Commit b30dc60

Browse files
committed
Address partial comments
1 parent d10a5e2 commit b30dc60

File tree

6 files changed

+0
-88
lines changed

6 files changed

+0
-88
lines changed

hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.List;
3333

3434
public class AvroSchemaRepair {
35-
public static boolean isLocalTimestampSupported = isLocalTimestampMillisSupported();
3635

3736
public static Schema repairLogicalTypes(Schema fileSchema, Schema tableSchema) {
3837
Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema);
@@ -242,18 +241,4 @@ public static boolean hasTimestampMillisField(Schema tableSchema) {
242241
&& (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis);
243242
}
244243
}
245-
246-
/**
247-
* Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version
248-
*
249-
* @return true if LocalTimestampMillis is available, false otherwise
250-
*/
251-
public static boolean isLocalTimestampMillisSupported() {
252-
try {
253-
return Arrays.stream(LogicalTypes.class.getDeclaredClasses())
254-
.anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis"));
255-
} catch (Exception e) {
256-
return false;
257-
}
258-
}
259244
}

hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -483,11 +483,6 @@ protected boolean shouldReadAsPartitionedTable() {
483483
return (partitionColumns.length > 0 && canParsePartitionValues()) || HoodieTableMetadata.isMetadataTable(basePath);
484484
}
485485

486-
protected PartitionPath convertToPartitionPath(String partitionPath) {
487-
Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath);
488-
return new PartitionPath(partitionPath, partitionColumnValues);
489-
}
490-
491486
private static long fileSliceSize(FileSlice fileSlice) {
492487
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
493488
.filter(s -> s > 0)

hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCache.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
*/
3232
public class AvroSchemaCache {
3333

34-
3534
// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
3635
private static final LoadingCache<Schema, Schema> SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
3736

@@ -43,5 +42,4 @@ public class AvroSchemaCache {
4342
public static Schema intern(Schema schema) {
4443
return SCHEMA_CACHE.get(schema);
4544
}
46-
4745
}

hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,6 @@ public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldNa
223223
return Option.of(getNonNullTypeFromUnion(schema));
224224
}
225225

226-
public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
227-
return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
228-
}
229-
230226
/**
231227
* Appends provided new fields at the end of the given schema
232228
*

hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ public static Instant microsToInstant(long microsFromEpoch) {
5252
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
5353
}
5454

55-
public static Instant nanosToInstant(long nanosFromEpoch) {
56-
long epochSeconds = nanosFromEpoch / (1_000_000_000L);
57-
long nanoAdjustment = nanosFromEpoch % (1_000_000_000L);
58-
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
59-
}
60-
6155
/**
6256
* Converts provided {@link Instant} to microseconds (from epoch)
6357
*/

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -147,62 +147,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
147147
}
148148
}
149149

150-
/**
151-
* Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
152-
* statistics for a single file, returning it as [[DataFrame]]
153-
*
154-
* Please check out scala-doc of the [[transpose]] method explaining this view in more details
155-
*/
156-
def loadTransposed[T](targetColumns: Seq[String],
157-
shouldReadInMemory: Boolean,
158-
prunedPartitions: Option[Set[String]] = None,
159-
prunedFileNamesOpt: Option[Set[String]] = None)(block: DataFrame => T): T = {
160-
cachedColumnStatsIndexViews.get(targetColumns) match {
161-
case Some(cachedDF) =>
162-
block(cachedDF)
163-
case None =>
164-
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = prunedFileNamesOpt match {
165-
case Some(prunedFileNames) =>
166-
val filterFunction = new SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
167-
override def apply(r: HoodieMetadataColumnStats): java.lang.Boolean = {
168-
prunedFileNames.contains(r.getFileName)
169-
}
170-
}
171-
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory).filter(filterFunction)
172-
case None =>
173-
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
174-
}
175-
176-
withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
177-
val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
178-
val df = if (shouldReadInMemory) {
179-
// NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
180-
// of the transposed table in memory, facilitating execution of the subsequently chained operations
181-
// on it locally (on the driver; all such operations are actually going to be performed by Spark's
182-
// Optimizer)
183-
HoodieUnsafeUtils.createDataFrameFromRows(spark, transposedRows.collectAsList().asScala.toSeq, indexSchema)
184-
} else {
185-
val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
186-
spark.createDataFrame(rdd, indexSchema)
187-
}
188-
189-
if (allowCaching) {
190-
cachedColumnStatsIndexViews.put(targetColumns, df)
191-
// NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
192-
// on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
193-
// the referenced to persisted [[DataFrame]] instance
194-
df.persist(StorageLevel.MEMORY_ONLY)
195-
196-
block(df)
197-
} else {
198-
withPersistedDataset(df) {
199-
block(df)
200-
}
201-
}
202-
}
203-
}
204-
}
205-
206150
/**
207151
* Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
208152
*

0 commit comments

Comments
 (0)