Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5575,6 +5575,32 @@
},
"sqlState" : "42616"
},
"STATE_REWRITER_INVALID_CHECKPOINT" : {
"message" : [
"The state rewrite checkpoint location '<checkpointLocation>' is in an invalid state."
],
"subClass" : {
"MISSING_KEY_ENCODER_SPEC" : {
"message" : [
"Key state encoder spec is expected for column family '<colFamilyName>' but was not found.",
"This is likely a bug, please report it."
]
},
"MISSING_OPERATOR_METADATA" : {
"message" : [
"No stateful operator metadata was found for batch <batchId>.",
"Ensure that the checkpoint is for a stateful streaming query and the query ran on a Spark version that supports operator metadata (Spark 4.0+)."
]
},
"UNSUPPORTED_STATE_STORE_METADATA_VERSION" : {
"message" : [
"Unsupported state store metadata version encountered.",
"Only StateStoreMetadataV1 and StateStoreMetadataV2 are supported."
]
}
},
"sqlState" : "55019"
},
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
"message" : [
"Failed to create column family with unsupported starting character and name=<colFamilyName>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets

import scala.collection.immutable.ArraySeq
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -80,19 +81,27 @@ trait OperatorStateMetadata {
def version: Int

def operatorInfo: OperatorInfo

def stateStoresMetadata: Seq[StateStoreMetadata]
}

case class OperatorStateMetadataV1(
operatorInfo: OperatorInfoV1,
stateStoreInfo: Array[StateStoreMetadataV1]) extends OperatorStateMetadata {
override def version: Int = 1

override def stateStoresMetadata: Seq[StateStoreMetadata] =
ArraySeq.unsafeWrapArray(stateStoreInfo)
}

case class OperatorStateMetadataV2(
operatorInfo: OperatorInfoV1,
stateStoreInfo: Array[StateStoreMetadataV2],
operatorPropertiesJson: String) extends OperatorStateMetadata {
override def version: Int = 2

override def stateStoresMetadata: Seq[StateStoreMetadata] =
ArraySeq.unsafeWrapArray(stateStoreInfo)
}

object OperatorStateMetadataUtils extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StatePartitionAllColumnFamiliesWriter(
hadoopConf: Configuration,
partitionId: Int,
targetCpLocation: String,
operatorId: Int,
operatorId: Long,
storeName: String,
currentBatchId: Long,
colFamilyToWriterInfoMap: Map[String, StatePartitionWriterColumnFamilyInfo],
Expand Down Expand Up @@ -153,6 +153,7 @@ class StatePartitionAllColumnFamiliesWriter(
if (!stateStore.hasCommitted) {
stateStore.abort()
}
provider.close()
}
}

Expand Down
Loading