diff --git a/.gitignore b/.gitignore index 2fd06049ea..f37540bc54 100644 --- a/.gitignore +++ b/.gitignore @@ -15,9 +15,9 @@ target dependency-reduced-pom.xml .idea/* target/ +examples/data/** .cache *~ mvn_install.log .vscode/* .DS_Store - diff --git a/README.md b/README.md index 4c2223cf3b..a3fccbf312 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,12 @@ Parquet-Java has supported Java Vector API to speed up reading, to enable this f * Edit spark class#VectorizedRleValuesReader, function#readNextGroup refer to parquet class#ParquetReadRouter, function#readBatchUsing512Vector * Build spark with maven and replace spark-sql_2.12-{VERSION}.jar on the spark jars folder +## Documentation + +For usage documentation, examples, and tutorials, see: + +- **[Examples](examples/)** - Practical examples demonstrating basic and advanced usage + ## Map/Reduce integration [Input](https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java) and [Output](https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java) formats. diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000000..4410f4b2cc --- /dev/null +++ b/examples/README.md @@ -0,0 +1,61 @@ + + +# Parquet Java Examples + +This directory contains self-contained code snippets that demonstrate how to use the Apache Parquet Java library. + +## Examples Overview + +### 1. BasicReadWriteExample.java +Demonstrates basic reading and writing of Parquet files using the example API. + +- Schema definition +- Writing data with compression +- Reading data and calculating statistics +- Basic configuration options + +### 2. AvroIntegrationExample.java +Shows how to integrate Avro with Parquet format. + +- Avro schema definition +- Writing Avro records to Parquet +- Reading Parquet files as Avro records +- Schema projection for performance + +### 3. AdvancedFeaturesExample.java +Demonstrates advanced Parquet features. + +- Predicate pushdown filtering +- Performance optimization with projections +- Complex filter conditions + +## Prerequisites + +- Java 8 or higher +- Maven 3.6+ + +## Contributing + +Feel free to contribute additional examples by: + +1. Creating new example classes +2. Improving existing examples +3. Adding more comprehensive test cases +4. Updating documentation diff --git a/examples/src/main/java/org/apache/parquet/examples/AdvancedFeaturesExample.java b/examples/src/main/java/org/apache/parquet/examples/AdvancedFeaturesExample.java new file mode 100644 index 0000000000..52f2d11689 --- /dev/null +++ b/examples/src/main/java/org/apache/parquet/examples/AdvancedFeaturesExample.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.examples; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; + +public class AdvancedFeaturesExample { + + public static void main(String[] args) throws IOException { + String filename = "data/sales.parquet"; + + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("id") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("product") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("amount") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("region") + .named("sale"); + + writeSalesData(filename, schema); + + testPredicatePushdown(filename); + + testPerformanceOptimization(filename); + } + + private static void writeSalesData(String filename, MessageType schema) throws IOException { + Path file = new Path(filename); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + try (ParquetWriter writer = new ParquetWriter<>( + file, // destination path + new GroupWriteSupport(), // write support implementation + CompressionCodecName.SNAPPY, // compression codec + 64 * 1024 * 1024, // row-group size + 1024 * 1024, // page size + 1024 * 1024, // dictionary page size + true, // enable dictionary encoding + false, // disable validation + ParquetWriter.DEFAULT_WRITER_VERSION, // writer version + conf)) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Random random = new Random(42); + String[] products = {"Laptop", "Phone", "Tablet", "Monitor", "Keyboard"}; + String[] regions = {"North", "South", "East", "West"}; + + for (int i = 0; i < 10000; i++) { + Group group = factory.newGroup() + .append("id", i) + .append("product", products[random.nextInt(products.length)]) + .append("amount", 100.0 + random.nextDouble() * 900.0) + .append("region", regions[random.nextInt(regions.length)]); + writer.write(group); + } + + System.out.println("Wrote 10000 sales records"); + } + } + + private static void testPredicatePushdown(String filename) throws IOException { + Path file = new Path(filename); + + Operators.DoubleColumn amountColumn = FilterApi.doubleColumn("amount"); + FilterPredicate amountFilter = FilterApi.gt(amountColumn, 500.0); + + long startTime = System.currentTimeMillis(); + int count = 0; + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(amountFilter)) + .build()) { + + Group group; + while ((group = reader.read()) != null) { + count++; + } + } + long filterTime = System.currentTimeMillis() - startTime; + + Operators.BinaryColumn regionColumn = FilterApi.binaryColumn("region"); + FilterPredicate complexFilter = FilterApi.and( + FilterApi.gt(amountColumn, 500.0), + FilterApi.eq(regionColumn, org.apache.parquet.io.api.Binary.fromString("North")) + ); + + startTime = System.currentTimeMillis(); + count = 0; + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(FilterCompat.get(complexFilter)) + .build()) { + + Group group; + while ((group = reader.read()) != null) { + count++; + } + } + filterTime = System.currentTimeMillis() - startTime; + System.out.printf("Found %d records in %dms%n", count, filterTime); + } + + private static void testPerformanceOptimization(String filename) throws IOException { + Path file = new Path(filename); + + System.out.println("Testing default reading performance..."); + long startTime = System.currentTimeMillis(); + int count = 0; + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file).build()) { + Group group; + while ((group = reader.read()) != null) { + count++; + } + } + long defaultTime = System.currentTimeMillis() - startTime; + System.out.printf("Default: Read %d records in %dms%n", count, defaultTime); + + System.out.println("Testing projection reading performance..."); + MessageType projection = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("id") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("amount") + .named("sale"); + + startTime = System.currentTimeMillis(); + count = 0; + // Note: withProjection is not available in this version, using configuration instead + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("parquet.read.schema", projection.toString()); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) + .withConf(conf) + .build()) { + + Group group; + while ((group = reader.read()) != null) { + count++; + } + } + long projectionTime = System.currentTimeMillis() - startTime; + System.out.printf("Projection: Read %d records in %dms (%.1fx faster)%n", + count, projectionTime, (double) defaultTime / projectionTime); + } +} diff --git a/examples/src/main/java/org/apache/parquet/examples/AvroIntegrationExample.java b/examples/src/main/java/org/apache/parquet/examples/AvroIntegrationExample.java new file mode 100644 index 0000000000..703dbd4aa3 --- /dev/null +++ b/examples/src/main/java/org/apache/parquet/examples/AvroIntegrationExample.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.examples; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.Random; + +/** + * Example demonstrating Avro integration with Parquet. + * + * This example shows: + * 1. How to define an Avro schema + * 2. How to write Avro records to Parquet format + * 3. How to read Parquet files as Avro records + * 4. How to use different Avro data models (Generic vs Specific) + */ +public class AvroIntegrationExample { + + public static void main(String[] args) throws IOException { + String outputFile = "data/products.parquet"; + + String schemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Product\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"category\", \"type\": \"string\"},\n" + + " {\"name\": \"price\", \"type\": \"double\"},\n" + + " {\"name\": \"inStock\", \"type\": \"boolean\"},\n" + + " {\"name\": \"tags\", \"type\": {\"type\": \"array\", \"items\": \"string\"}}\n" + + " ]\n" + + "}"; + + Schema schema = new Schema.Parser().parse(schemaJson); + + writeProductData(outputFile, schema); + + readProductData(outputFile); + + readProductDataWithProjection(outputFile); + } + + private static void writeProductData(String filename, Schema schema) throws IOException { + Path file = new Path(filename); + + try (ParquetWriter writer = AvroParquetWriter.builder(file) + .withSchema(schema) + .withDataModel(GenericData.get()) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withRowGroupSize(64 * 1024 * 1024) + .withPageSize(1024 * 1024) + .withDictionaryEncoding(true) + .build()) { + + Random random = new Random(42); + String[] categories = {"Electronics", "Clothing", "Books", "Home", "Sports"}; + String[] tagSets = { + "new,featured", + "sale,discount", + "premium,quality", + "popular,bestseller", + "limited,exclusive" + }; + + for (int i = 0; i < 500; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("id", i); + record.put("name", "Product " + i); + record.put("category", categories[random.nextInt(categories.length)]); + record.put("price", 10.0 + random.nextDouble() * 990.0); + record.put("inStock", random.nextBoolean()); + + String[] tags = tagSets[random.nextInt(tagSets.length)].split(","); + record.put("tags", java.util.Arrays.asList(tags)); + + writer.write(record); + } + + System.out.println("Successfully wrote 500 product records"); + } + } + + private static void readProductData(String filename) throws IOException { + Path file = new Path(filename); + + try (ParquetReader reader = AvroParquetReader.builder(file) + .withDataModel(GenericData.get()) + .build()) { + + GenericRecord record; + int count = 0; + double totalPrice = 0.0; + int inStockCount = 0; + + while ((record = reader.read()) != null) { + Integer id = (Integer) record.get("id"); + String name = record.get("name").toString(); + String category = record.get("category").toString(); + Double price = (Double) record.get("price"); + Boolean inStock = (Boolean) record.get("inStock"); + @SuppressWarnings("unchecked") + java.util.List tagsList = (java.util.List) record.get("tags"); + java.util.List tags = new java.util.ArrayList<>(); + for (Object tag : tagsList) { + tags.add(tag.toString()); + } + + totalPrice += price; + if (inStock) { + inStockCount++; + } + + if (count < 3) { + System.out.printf("Product %d: %s (%s), Price: $%.2f, In Stock: %s, Tags: %s%n", + id, name, category, price, inStock ? "Yes" : "No", tags); + } + + count++; + } + + System.out.printf("%nStatistics:%n"); + System.out.printf("Total products: %d%n", count); + System.out.printf("Average price: $%.2f%n", totalPrice / count); + System.out.printf("Products in stock: %d (%.1f%%)%n", + inStockCount, (inStockCount * 100.0) / count); + } + } + + private static void readProductDataWithProjection(String filename) throws IOException { + Path file = new Path(filename); + + String projectionSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"ProductProjection\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + Schema projectionSchema = new Schema.Parser().parse(projectionSchemaJson); + + // Note: withProjection is not available in this version, using configuration instead + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("parquet.avro.projection", projectionSchemaJson); + + try (ParquetReader reader = AvroParquetReader.builder(file) + .withDataModel(GenericData.get()) + .withConf(conf) + .build()) { + + GenericRecord record; + int count = 0; + + while ((record = reader.read()) != null && count < 5) { + Integer id = (Integer) record.get("id"); + String name = record.get("name").toString(); + + System.out.printf("Product %d: %s%n", id, name); + count++; + } + + System.out.println("... (showing only first 5 records with projection)"); + } + } +} diff --git a/examples/src/main/java/org/apache/parquet/examples/BasicReadWriteExample.java b/examples/src/main/java/org/apache/parquet/examples/BasicReadWriteExample.java new file mode 100644 index 0000000000..97ce5c0756 --- /dev/null +++ b/examples/src/main/java/org/apache/parquet/examples/BasicReadWriteExample.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.examples; + +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Random; + +/** + * Basic example demonstrating how to read and write Parquet files + * using the example API provided by the Parquet library. + * + * This example shows: + * 1. How to define a schema + * 2. How to write data to a Parquet file + * 3. How to read data from a Parquet file + * 4. Basic configuration options + */ +public class BasicReadWriteExample { + + public static void main(String[] args) throws IOException { + String outputFile = "data/employees.parquet"; + + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("id") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("name") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("salary") + .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("active") + .named("employee"); + + writeEmployeeData(outputFile, schema); + + readEmployeeData(outputFile); + } + + private static void writeEmployeeData(String filename, MessageType schema) throws IOException { + Path file = new Path(filename); + + Configuration conf = new Configuration(); + // Propagate the schema for the write support + GroupWriteSupport.setSchema(schema, conf); + + try (ParquetWriter writer = new ParquetWriter<>( + file, // destination path + new GroupWriteSupport(), // write support implementation + CompressionCodecName.SNAPPY, // compression codec + 64 * 1024 * 1024, // row-group size (64 MB) + 1024 * 1024, // page size (1 MB) + 1024 * 1024, // dictionary page size (1 MB) + true, // enable dictionary encoding + false, // disable validation + ParquetWriter.DEFAULT_WRITER_VERSION, // writer version + conf)) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Random random = new Random(42); + + for (int i = 0; i < 1000; i++) { + Group group = factory.newGroup() + .append("id", i) + .append("name", "Employee " + i) + .append("salary", 50000.0 + random.nextDouble() * 50000.0) + .append("active", random.nextBoolean()); + + writer.write(group); + } + } + } + + private static void readEmployeeData(String filename) throws IOException { + Path file = new Path(filename); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file).build()) { + Group group; + int count = 0; + double totalSalary = 0.0; + int activeCount = 0; + + while ((group = reader.read()) != null) { + int id = group.getInteger("id", 0); + String name = group.getString("name", 0); + double salary = group.getDouble("salary", 0); + boolean active = group.getBoolean("active", 0); + + totalSalary += salary; + if (active) { + activeCount++; + } + + if (count < 5) { + System.out.printf("Employee %d: %s, Salary: $%.2f, Active: %s%n", + id, name, salary, active ? "Yes" : "No"); + } + + count++; + } + + System.out.printf("%nStatistics:%n"); + System.out.printf("Total employees: %d%n", count); + System.out.printf("Average salary: $%.2f%n", totalSalary / count); + System.out.printf("Active employees: %d (%.1f%%)%n", + activeCount, (activeCount * 100.0) / count); + } + } +}