Skip to content

[FEATURE]Getting inner_hits using Spark connector on Databricks #1235

@PlacidPages

Description

@PlacidPages

Is your feature request related to a problem?
Yes. I have an application where Databricks workflows read and write to OpenSearch cluster, but both are in different VPCs. I want to reduce the load of read operation, by adding filters in the OpenSearch query.
My index has a parent-child structured index, where a child and parent exist as separate index records with a document_join field connecting them, I have a field file_name in both parent and child record. My need is to get the file_name and _id (parent_hashkey) field of the Parent Record, and the _id/child_hashkey of the Child Record, where the children records do not have a file_name. I need to get this data into a Databricks dataframe.

I am getting only the child_hashkey and nothing from the inner_hits in the my output "opensearch_df".

Connector: elasticsearch-spark-30_2.12-8.6.0.jar
(Please note that this is being used after getting a buy in from AWS architect)

What solution would you like?
Able to read the inner_hits into a Databricks dataframe.

What alternatives have you considered?
A clear and concise description of any alternative solutions or features you've considered.

Do you have any additional context?
Code:
def read_opensearch_missing_filename(indexName, timeout, retry_num):

query = """
{
"track_total_hits": true,
"_source": ["child_hashkey"],
"query": {
"bool": {
"must": [
{
"has_parent": {
"parent_type": "parent_doc",
"query": {
"bool": {
"must": [
{
"exists": {
"field": "file_name"
}
}
]
}
},
"inner_hits": {
"_source": ["parent_hashkey", "file_name"]
}
}
},
{
"range": {
"index_record_update_date": {
"gte": "now-1000h/h",
"lte": "now"
}
}
}
],
"must_not": [
{
"exists": {
"field": "file_name"
}
}
]
}
}
}
"""

df = (spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes", 'https://' + secrets_storage["HOST"])
.option("es.port", secrets_storage["PORT"])
.option("es.net.ssl", "true")
.option("es.net.ssl.cert.allow.self.signed", "true")
.option("es.net.http.auth.user", secrets_storage["USER"])
.option("es.net.http.auth.pass", secrets_storage["PASSWORD"])
.option("es.nodes.wan.only", "true")
.option("es.nodes.discovery", "false")
.option("es.mapping.date.rich", "false")
.option("es.http.timeout", timeout)
.option("es.http.retries", retry_num)
.option("es.read.field.include", "child_hashkey,routing,parent_hashkey,file_name")
.option("es.query", query)
.load(indexName)
)

return df
#Usage
opensearch_df= read_opensearch_missing_filename('index-name', '1h', '3')

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions