Skip to content

[Question][chunjun-oracle] oracle的clob类型,使用transformer报错 #1893

@J1aHe

Description

@J1aHe

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

这是报错

 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:640)
	at com.dtstack.chunjun.Main.syncStreamToTable(Main.java:288)
	at com.dtstack.chunjun.Main.exeSyncJob(Main.java:246)
	at com.dtstack.chunjun.Main.main(Main.java:137)
	at com.dtstack.chunjun.local.test.LocalTest.main(LocalTest.java:136)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.newRelDataType$1(FlinkTypeFactory.scala:76)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.createFieldTypeFromLogicalType(FlinkTypeFactory.scala:167)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.$anonfun$buildStructType$1(FlinkTypeFactory.scala:254)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildStructType(FlinkTypeFactory.scala:252)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildRelNodeRowType(FlinkTypeFactory.scala:224)
	at org.apache.flink.table.planner.sources.TableSourceUtil$.getSourceRowType(TableSourceUtil.scala:191)
	at org.apache.flink.table.planner.sources.TableSourceUtil.getSourceRowType(TableSourceUtil.scala)
	at org.apache.flink.table.planner.catalog.CatalogSchemaTable.getRowType(CatalogSchemaTable.java:171)
	at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
	at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
	at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
	at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
	at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3205)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3187)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461)
	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
	... 8 more

这是json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "allowCreateSlot": false,
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "CLOB"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "CLOB"
              }
            ],
            "connection": [
              {
                "jdbcUrl": [
                  ""
                ],
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "username": "",
            "where": ""
          },
          "table": {
            "tableName": "sourceTable"
          }
        },
        "transformer": {
          "transformSql": "select SHIPPING_ADDRESS,BILLING_ADDRESS from sourceTable "
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "longtext"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "longtext"
              }
            ],
            "connection": [
              {
                "jdbcUrl": "",
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "preSql": [
              "truncate table ORDERS1"
            ],
            "username": ""
          },
          "table": {
            "tableName": "sinkTable"
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 0
      },
      "speed": {
        "bytes": 10485760,
        "writerChannel": 2
      }
    }
  }
}

我本地调试后发现好像是flink的报错,但是具体的还是不太清楚,想问下在使用transformer的情况下clob类型怎么进行同步呢

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions