1717 */
1818package org .apache .beam .sdk .io .gcp .spanner ;
1919
20+ import static org .apache .beam .sdk .util .Preconditions .checkArgumentNotNull ;
2021import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkArgument ;
2122import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkNotNull ;
2223
2324import com .google .auto .service .AutoService ;
2425import com .google .auto .value .AutoValue ;
2526import com .google .cloud .spanner .Struct ;
2627import java .io .Serializable ;
28+ import java .util .Arrays ;
2729import java .util .Collections ;
2830import java .util .List ;
2931import javax .annotation .Nullable ;
32+ import org .apache .beam .sdk .metrics .Counter ;
33+ import org .apache .beam .sdk .metrics .Metrics ;
3034import org .apache .beam .sdk .schemas .AutoValueSchema ;
3135import org .apache .beam .sdk .schemas .Schema ;
3236import org .apache .beam .sdk .schemas .annotations .DefaultSchema ;
3337import org .apache .beam .sdk .schemas .annotations .SchemaFieldDescription ;
3438import org .apache .beam .sdk .schemas .transforms .SchemaTransform ;
3539import org .apache .beam .sdk .schemas .transforms .SchemaTransformProvider ;
3640import org .apache .beam .sdk .schemas .transforms .TypedSchemaTransformProvider ;
37- import org .apache .beam .sdk .transforms .MapElements ;
41+ import org .apache .beam .sdk .schemas .transforms .providers .ErrorHandling ;
42+ import org .apache .beam .sdk .transforms .DoFn ;
43+ import org .apache .beam .sdk .transforms .DoFn .FinishBundle ;
44+ import org .apache .beam .sdk .transforms .DoFn .FinishBundleContext ;
45+ import org .apache .beam .sdk .transforms .DoFn .MultiOutputReceiver ;
46+ import org .apache .beam .sdk .transforms .DoFn .ProcessElement ;
47+ import org .apache .beam .sdk .transforms .ParDo ;
3848import org .apache .beam .sdk .values .PCollection ;
3949import org .apache .beam .sdk .values .PCollectionRowTuple ;
50+ import org .apache .beam .sdk .values .PCollectionTuple ;
4051import org .apache .beam .sdk .values .Row ;
41- import org .apache .beam .sdk .values .TypeDescriptor ;
52+ import org .apache .beam .sdk .values .TupleTag ;
53+ import org .apache .beam .sdk .values .TupleTagList ;
4254import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Strings ;
4355
4456/** A provider for reading from Cloud Spanner using a Schema Transform Provider. */
@@ -61,6 +73,11 @@ public class SpannerReadSchemaTransformProvider
6173 extends TypedSchemaTransformProvider <
6274 SpannerReadSchemaTransformProvider .SpannerReadSchemaTransformConfiguration > {
6375
76+ public static final TupleTag <Row > OUTPUT_TAG = new TupleTag <Row >() {};
77+ public static final TupleTag <Row > ERROR_TAG = new TupleTag <Row >() {};
78+ public static final Schema ERROR_SCHEMA =
79+ Schema .builder ().addStringField ("error" ).addStringField ("row" ).build ();
80+
6481 @ Override
6582 public String identifier () {
6683 return "beam:schematransform:org.apache.beam:spanner_read:v1" ;
@@ -133,6 +150,7 @@ static class SpannerSchemaTransformRead extends SchemaTransform implements Seria
133150 @ Override
134151 public PCollectionRowTuple expand (PCollectionRowTuple input ) {
135152 checkNotNull (input , "Input to SpannerReadSchemaTransform cannot be null." );
153+ boolean handleErrors = ErrorHandling .hasOutput (configuration .getErrorHandling ());
136154 SpannerIO .Read read =
137155 SpannerIO .readWithSchema ()
138156 .withProjectId (configuration .getProjectId ())
@@ -152,12 +170,66 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
152170 }
153171 PCollection <Struct > spannerRows = input .getPipeline ().apply (read );
154172 Schema schema = spannerRows .getSchema ();
155- PCollection <Row > rows =
173+
174+ PCollectionTuple outputTuple =
156175 spannerRows .apply (
157- MapElements .into (TypeDescriptor .of (Row .class ))
158- .via ((Struct struct ) -> StructUtils .structToBeamRow (struct , schema )));
176+ ParDo .of (
177+ new ErrorFn ("spanner-read-error-counter" , ERROR_SCHEMA , schema , handleErrors ))
178+ .withOutputTags (OUTPUT_TAG , TupleTagList .of (ERROR_TAG )));
179+
180+ PCollectionRowTuple outputRows =
181+ PCollectionRowTuple .of ("output" , outputTuple .get (OUTPUT_TAG ).setRowSchema (schema ));
182+
183+ // Error handling
184+ PCollection <Row > errorOutput = outputTuple .get (ERROR_TAG ).setRowSchema (ERROR_SCHEMA );
185+ if (handleErrors ) {
186+ outputRows =
187+ outputRows .and (
188+ checkArgumentNotNull (configuration .getErrorHandling ()).getOutput (), errorOutput );
189+ }
159190
160- return PCollectionRowTuple .of ("output" , rows .setRowSchema (schema ));
191+ return outputRows ;
192+ }
193+ }
194+
195+ public static class ErrorFn extends DoFn <Struct , Row > {
196+ private final Counter errorCounter ;
197+ private Long errorsInBundle = 0L ;
198+ private final boolean handleErrors ;
199+ private final Schema errorSchema ;
200+ private final Schema schema ;
201+
202+ public ErrorFn (String name , Schema errorSchema , Schema schema , boolean handleErrors ) {
203+ this .errorCounter = Metrics .counter (SpannerReadSchemaTransformProvider .class , name );
204+ this .handleErrors = handleErrors ;
205+ this .errorSchema = errorSchema ;
206+ this .schema = schema ;
207+ }
208+
209+ @ ProcessElement
210+ public void processElement (@ DoFn .Element Struct struct , MultiOutputReceiver receiver ) {
211+ Row mappedRow = null ;
212+ try {
213+ mappedRow = StructUtils .structToBeamRow (struct , schema );
214+ } catch (Exception e ) {
215+ if (!handleErrors ) {
216+ throw new RuntimeException (e );
217+ }
218+ errorsInBundle += 1 ;
219+ receiver
220+ .get (ERROR_TAG )
221+ .output (
222+ Row .withSchema (errorSchema ).addValues (e .getMessage (), struct .toString ()).build ());
223+ }
224+ if (mappedRow != null ) {
225+ receiver .get (OUTPUT_TAG ).output (mappedRow );
226+ }
227+ }
228+
229+ @ FinishBundle
230+ public void finish (FinishBundleContext c ) {
231+ errorCounter .inc (errorsInBundle );
232+ errorsInBundle = 0L ;
161233 }
162234 }
163235
@@ -168,7 +240,7 @@ public List<String> inputCollectionNames() {
168240
169241 @ Override
170242 public List <String > outputCollectionNames () {
171- return Collections . singletonList ("output" );
243+ return Arrays . asList ("output" , "errors " );
172244 }
173245
174246 @ DefaultSchema (AutoValueSchema .class )
@@ -193,6 +265,8 @@ public abstract static class Builder {
193265
194266 public abstract Builder setBatching (Boolean batching );
195267
268+ public abstract Builder setErrorHandling (ErrorHandling errorHandling );
269+
196270 public abstract SpannerReadSchemaTransformConfiguration build ();
197271 }
198272
@@ -261,6 +335,10 @@ public static Builder builder() {
261335 "Set to false to disable batching. Useful when using a query that is not compatible with the PartitionQuery API. Defaults to true." )
262336 @ Nullable
263337 public abstract Boolean getBatching ();
338+
339+ @ SchemaFieldDescription ("This option specifies whether and where to output unwritable rows." )
340+ @ Nullable
341+ public abstract ErrorHandling getErrorHandling ();
264342 }
265343
266344 @ Override
0 commit comments