Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ target
dependency-reduced-pom.xml
.idea/*
target/
examples/data/**
.cache
*~
mvn_install.log
.vscode/*
.DS_Store

6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ Parquet-Java has supported Java Vector API to speed up reading, to enable this f
* Edit spark class#VectorizedRleValuesReader, function#readNextGroup refer to parquet class#ParquetReadRouter, function#readBatchUsing512Vector
* Build spark with maven and replace spark-sql_2.12-{VERSION}.jar on the spark jars folder

## Documentation

For usage documentation, examples, and tutorials, see:

- **[Examples](examples/)** - Practical examples demonstrating basic and advanced usage

## Map/Reduce integration

[Input](https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java) and [Output](https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java) formats.
Expand Down
61 changes: 61 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

# Parquet Java Examples

This directory contains self-contained code snippets that demonstrate how to use the Apache Parquet Java library.

## Examples Overview

### 1. BasicReadWriteExample.java
Demonstrates basic reading and writing of Parquet files using the example API.

- Schema definition
- Writing data with compression
- Reading data and calculating statistics
- Basic configuration options

### 2. AvroIntegrationExample.java
Shows how to integrate Avro with Parquet format.

- Avro schema definition
- Writing Avro records to Parquet
- Reading Parquet files as Avro records
- Schema projection for performance

### 3. AdvancedFeaturesExample.java
Demonstrates advanced Parquet features.

- Predicate pushdown filtering
- Performance optimization with projections
- Complex filter conditions

## Prerequisites

- Java 8 or higher
- Maven 3.6+

## Contributing

Feel free to contribute additional examples by:

1. Creating new example classes
2. Improving existing examples
3. Adding more comprehensive test cases
4. Updating documentation
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.examples;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;

public class AdvancedFeaturesExample {

public static void main(String[] args) throws IOException {
String filename = "data/sales.parquet";

MessageType schema = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("id")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("product")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("amount")
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("region")
.named("sale");

writeSalesData(filename, schema);

testPredicatePushdown(filename);

testPerformanceOptimization(filename);
}

private static void writeSalesData(String filename, MessageType schema) throws IOException {
Path file = new Path(filename);

Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

try (ParquetWriter<Group> writer = new ParquetWriter<>(
file, // destination path
new GroupWriteSupport(), // write support implementation
CompressionCodecName.SNAPPY, // compression codec
64 * 1024 * 1024, // row-group size
1024 * 1024, // page size
1024 * 1024, // dictionary page size
true, // enable dictionary encoding
false, // disable validation
ParquetWriter.DEFAULT_WRITER_VERSION, // writer version
conf)) {

SimpleGroupFactory factory = new SimpleGroupFactory(schema);
Random random = new Random(42);
String[] products = {"Laptop", "Phone", "Tablet", "Monitor", "Keyboard"};
String[] regions = {"North", "South", "East", "West"};

for (int i = 0; i < 10000; i++) {
Group group = factory.newGroup()
.append("id", i)
.append("product", products[random.nextInt(products.length)])
.append("amount", 100.0 + random.nextDouble() * 900.0)
.append("region", regions[random.nextInt(regions.length)]);
writer.write(group);
}

System.out.println("Wrote 10000 sales records");
}
}

private static void testPredicatePushdown(String filename) throws IOException {
Path file = new Path(filename);

Operators.DoubleColumn amountColumn = FilterApi.doubleColumn("amount");
FilterPredicate amountFilter = FilterApi.gt(amountColumn, 500.0);

long startTime = System.currentTimeMillis();
int count = 0;
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(amountFilter))
.build()) {

Group group;
while ((group = reader.read()) != null) {
count++;
}
}
long filterTime = System.currentTimeMillis() - startTime;

Operators.BinaryColumn regionColumn = FilterApi.binaryColumn("region");
FilterPredicate complexFilter = FilterApi.and(
FilterApi.gt(amountColumn, 500.0),
FilterApi.eq(regionColumn, org.apache.parquet.io.api.Binary.fromString("North"))
);

startTime = System.currentTimeMillis();
count = 0;
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(complexFilter))
.build()) {

Group group;
while ((group = reader.read()) != null) {
count++;
}
}
filterTime = System.currentTimeMillis() - startTime;
System.out.printf("Found %d records in %dms%n", count, filterTime);
}

private static void testPerformanceOptimization(String filename) throws IOException {
Path file = new Path(filename);

System.out.println("Testing default reading performance...");
long startTime = System.currentTimeMillis();
int count = 0;
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).build()) {
Group group;
while ((group = reader.read()) != null) {
count++;
}
}
long defaultTime = System.currentTimeMillis() - startTime;
System.out.printf("Default: Read %d records in %dms%n", count, defaultTime);

System.out.println("Testing projection reading performance...");
MessageType projection = Types.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("id")
.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("amount")
.named("sale");

startTime = System.currentTimeMillis();
count = 0;
// Note: withProjection is not available in this version, using configuration instead
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("parquet.read.schema", projection.toString());

try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(conf)
.build()) {

Group group;
while ((group = reader.read()) != null) {
count++;
}
}
long projectionTime = System.currentTimeMillis() - startTime;
System.out.printf("Projection: Read %d records in %dms (%.1fx faster)%n",
count, projectionTime, (double) defaultTime / projectionTime);
}
}
Loading