-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Description
What happened?
Can u help me how to define TIMESTAMP in schema in python?
import apache_beam as beam
from apache_beam import Row
from apache_beam.transforms import managed
from apache_beam.typehints import row_type
import datetime
from google.cloud import bigquery
from numpy._core.numerictypes import int64
client = bigquery.Client()
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
opts = GoogleCloudOptions(project="myproject",
region="us-west1",
temp_location="gs://myproject/temp")
from typing import NamedTuple
import typing
class TestCol(NamedTuple):
color: str
size: float
class TestRow(NamedTuple):
id: str
name: str
value: int
nested: TestCol
ts: datetime.datetime
def cast_to_schema(schema_class, data):
result = {}
field_types = typing.get_type_hints(schema_class)
if not data:
return None
for field in schema_class._fields:
field_type = field_types.get(field)
value = data.get(field)
if hasattr(field_type, '_fields'):
result[field] = cast_to_schema(field_type, value)
else:
result[field] = value
return schema_class(**result)
with beam.Pipeline(options=opts) as p:
rows = (
p
| "CreateRows" >> beam.Create([
{"id": "a", "name": "Amy", "value": 100, "nested": {"color": "red", "size": 1.0}, "ts": datetime.datetime(2025, 1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)},
{"id": "b", "name": "Bob", "value": 200, "nested": {"color": "blue", "size": 2.0}, "ts": datetime.datetime(2025, 1, 1, 2, 0, 0, tzinfo=datetime.timezone.utc)},
])
| "ToRow" >> beam.Map(lambda x: cast_to_schema(TestRow, x)).with_output_types(row_type.RowTypeConstraint.from_fields(list(TestRow.__annotations__.items())))
)
_ = rows | "WriteToBQ" >> managed.Write(
managed.BIGQUERY,
config={
"table": "myproject.mydataset.mytable",
"write_disposition": "WRITE_APPEND",
},
)"errors": [
{
"message": "Provided Schema does not match Table xxxxx. Field ts has changed type from TIMESTAMP to INTEGER",
"reason": "invalid"
}
],
...
name: "ts"
type {
nullable: true
logical_type {
urn: "beam:logical:pythonsdk_any:v1"
}
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner