Skip to content

Conversation

karta0807913
Copy link

  • use FROM_JSON function to unwrap the json
  • compress the casting operation to match user's intention

fixes #11036

Description of changes

This MR enables the pyspark backend to support the ToJSONArray operation.

In this MR, I use the FROM_JSON function to convert the JSON string into an array object.

Additionally, this MR modifies the visit_Cast function to reuse the FROM_JSON function when casting the value returned by from_json to other types.

For typical usage, users might use the following snippet:

import ibis
backend = ibis.pyspark.connect()
tab = backend.table("my_table")
tab.select(tab.val.unwrap_as("array<struct<a: int64>>").name("value")).execute()

However, the generated SQL currently looks like this, which leads to incorrect results:

SELECT CAST(FROM_JSON(`t0`.`val`, 'array<string>') AS ARRAY<STRUCT<`a`: BIGINT>>) AS `value` FROM `my_table`

The correct SQL should be:

SELECT FROM_JSON(`t0`.`val`, 'array<struct<a:bigint>>') AS `value` FROM `my_table`

Please let me know if this MR helps. Thank you! :D

Issues closed

* use FROM_JSON function to unwrap the json
* compress the casting operation to match user's intention.

fixes ibis-project#11036
@github-actions github-actions bot added tests Issues or PRs related to tests pyspark The Apache PySpark backend sql Backends that generate SQL labels Mar 31, 2025
@karta0807913
Copy link
Author

Oops, the test failed, apparently because pyspark is not available.
I can

  1. Remove pyspark dependency and implement simpleString by hand
  2. Add pyspark to test dependencies

Which one is better?

@@ -683,5 +689,8 @@ def visit_ArraySum(self, op, *, arg):
def visit_ArrayMean(self, op, *, arg):
return self._array_reduction(dtype=op.dtype, arg=arg, output=operator.truediv)

def visit_ToJSONArray(self, op, *, arg):
return self.f.from_json(arg, sge.Literal.string("array<string>"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self.f.from_json(arg, sge.Literal.string("array<string>"))
return arg

I think this should probably do nothing, since there's no official JSON type in Spark.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we do this, the following code will throw an error in spark. 🤔

tab = backend.table("table") 
print(tab.select(tab.val.array[0].name("value").compile())

result:

SELECT ELEMENT_AT(`t0`.`val`, 0 + 1) AS `value` from `table`;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the variant type is landing in PySpark 4.0.0, which will solve the problem of having a type to represent JSON-like data. I'm not sure it's worth working around this problem for earlier Spark versions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s awesome!
Let me do some test and back to here later 😃

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, i tried the variant type in the spark 4.0.0-preview2. it actually works great with ibis's current implement.

SELECT CAST(FROM_JSON(a.a, 'variant') as ARRAY<STRUCT<A: STRING>>).A FROM (
    SELECT '[{"A":"B"}]' as a
) a

It outputs (amazingly)

[B]

But I think it's too early to support spark 4.0.0 now.

So. I have another propose.

Another Thoughts
We create a custom function class names FromJson, and we check the input type is FromJson or not. If that is, we extract the first parameters from it and cast the input to the target type.

Snippets:

class FromJson(sge.Func):
    arg_types = {"expressions": True}
    is_var_len_args = True


class PySparkCompiler(SQLGlotCompiler):
    # ...
    def visit_ToJSONArray(self, op, *, arg):
        return FromJson.from_arg_list([arg, sge.Literal.string("array<string>")])

    def visit_ToJsonMap(self, op, *, arg):
        return FromJson.from_arg_list([arg, sge.Literal.string("map<string, string>")])

    def visit_Cast(self, op, *, arg, to):
        if isinstance(arg, FromJson):
            return FromJson.from_arg_list(
                [
                    arg.expressions[0],
                    sge.Literal.string(PySparkTypeMapper.from_ibis(to).simpleString()),
                ]
            )
        # ...

What do you think?

Comment on lines +138 to +142
if isinstance(arg, sge.Func) and arg.name.upper() == "FROM_JSON":
return self.f.from_json(
arg.expressions[0],
sge.Literal.string(PySparkTypeMapper.from_ibis(to).simpleString()),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a less fragile way to do this would be to instead check if op.arg.type().is_json() (or json array or json map) and then use from_json in that case. Checking for a specific function seems very fragile given that there are plenty of APIs that can produce JSON that aren't .array and .map.

Suggested change
if isinstance(arg, sge.Func) and arg.name.upper() == "FROM_JSON":
return self.f.from_json(
arg.expressions[0],
sge.Literal.string(PySparkTypeMapper.from_ibis(to).simpleString()),
)
dtype = op.arg.dtype
# does ibis think it's JSON, ARRAY<JSON> or MAP<STRING, JSON>?
if dtype.is_json() or (dtype.is_array() and dtype.value_type.is_json()) or (dtype.is_map() and dtype.value_type.is_json()):
return self.f.from_json(
arg,
PySparkTypeMapper.from_ibis(to).simpleString(),
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch won't work. because this calls the FROM_JSON twice.

t = ibis.literal('[{},{}]]').cast('json').name('a').as_table()
t1 = t.a.unwrap_as('array<json>')
t2 = t.a.unwrap_as('array<json>').cast('array<struct<>>')

backend.compile(t2)

output:

SELECT FROM_JSON(FROM_JSON(`t0`.`a`, 'array<string>'), 'array<struct<>>') AS `Cast(ToJSONArray(a), array<struct<>>)` FROM (SELECT '[{},{}]]' AS `a`) AS `t0`

we need to extract the 1th parameter from from_json function. ><

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pyspark The Apache PySpark backend sql Backends that generate SQL tests Issues or PRs related to tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: pyspark support json array operation
2 participants