File tree Expand file tree Collapse file tree 4 files changed +8
-16
lines changed
src/main/java/com/example/dataflow Expand file tree Collapse file tree 4 files changed +8
-16
lines changed Original file line number Diff line number Diff line change 3737 <maven .compiler.source>11</maven .compiler.source>
3838 <maven .compiler.target>11</maven .compiler.target>
3939 <project .build.sourceEncoding>UTF-8</project .build.sourceEncoding>
40- <apache_beam .version>2.56 .0</apache_beam .version>
40+ <apache_beam .version>2.57 .0</apache_beam .version>
4141 <slf4j .version>2.0.12</slf4j .version>
4242 <parquet .version>1.14.0</parquet .version>
4343 <iceberg .version>1.4.2</iceberg .version>
Original file line number Diff line number Diff line change @@ -59,7 +59,7 @@ public static void main(String[] args) {
5959
6060 // Parse the pipeline options passed into the application. Example:
6161 // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
62- // -tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
62+ // -- tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
6363 // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
6464 Options options = PipelineOptionsFactory .fromArgs (args ).withValidation ().as (Options .class );
6565 Pipeline pipeline = Pipeline .create (options );
@@ -77,11 +77,8 @@ public static void main(String[] args) {
7777 .build ();
7878
7979 // Build the pipeline.
80- PCollectionRowTuple .empty (pipeline ).apply (
81- Managed .read (Managed .ICEBERG )
82- .withConfig (config )
83- )
84- .get ("output" )
80+ pipeline .apply (Managed .read (Managed .ICEBERG ).withConfig (config ))
81+ .getSinglePCollection ()
8582 // Format each record as a string with the format 'id:name'.
8683 .apply (MapElements
8784 .into (TypeDescriptors .strings ())
Original file line number Diff line number Diff line change @@ -85,14 +85,9 @@ public static void main(String[] args) {
8585 .build ();
8686
8787 // Build the pipeline.
88- var input = pipeline
89- .apply (Create .of (TABLE_ROWS ))
90- .apply (JsonToRow .withSchema (SCHEMA ));
91-
92- PCollectionRowTuple .of ("input" , input ).apply (
93- Managed .write (Managed .ICEBERG )
94- .withConfig (config )
95- );
88+ pipeline .apply (Create .of (TABLE_ROWS ))
89+ .apply (JsonToRow .withSchema (SCHEMA ))
90+ .apply (Managed .write (Managed .ICEBERG ).withConfig (config ));
9691
9792 pipeline .run ().waitUntilFinish ();
9893 }
Original file line number Diff line number Diff line change @@ -70,7 +70,7 @@ public static void main(String[] args) {
7070 );
7171
7272 // Parse the pipeline options passed into the application. Example:
73- // ---- runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
73+ // --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
7474 // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
7575 var options = PipelineOptionsFactory .fromArgs (args ).withValidation ().as (Options .class );
7676 var pipeline = Pipeline .create (options );
You can’t perform that action at this time.
0 commit comments