Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@
</properties>

<dependencies>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-java-root</artifactId>
<version>18.2.0</version>
<type>pom</type>
</dependency>


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-common_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this once the internal ArrowSerializer can be used


<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;

class ArrowBatchWrite implements BatchWrite {
private final LogicalWriteInfo logicalWriteInfo;

public ArrowBatchWrite(LogicalWriteInfo info) {
this.logicalWriteInfo = info;
}

@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
return new ArrowDataWriterFactory(logicalWriteInfo.schema());
}

@Override
public void commit(WriterCommitMessage[] messages) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'commit'");
}

@Override
public void abort(WriterCommitMessage[] messages) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'abort'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.types.StructType;

class ArrowDataWriterFactory implements DataWriterFactory {
private final StructType schema;

public ArrowDataWriterFactory(StructType schema) {
this.schema = schema;
}

@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
return new ArrowWriter(partitionId, taskId, schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.util.HashSet;
import java.util.Set;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.SupportsOverwrite;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;

class ArrowTable implements SupportsWrite, SupportsOverwrite {
private Set<TableCapability> capabilities;
private StructType schema;

ArrowTable(StructType schema) {
this.schema = schema;
}

@Override
public String name() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'name'");
}

@Override
public StructType schema() {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'schema'");
}

@Override
public Set<TableCapability> capabilities() {
if (capabilities == null) {
this.capabilities = new HashSet<>();
capabilities.add(TableCapability.BATCH_WRITE);
}
return capabilities;
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new ArrowWriteBuilder(info);
}

@Override
public WriteBuilder overwrite(Filter[] filters) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'overwrite'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.util.Map;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

class ArrowTableProvider implements TableProvider {

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'inferSchema'");
}

@Override
public Table getTable(
StructType schema, Transform[] partitioning, Map<String, String> properties) {
return new ArrowTable(schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;

class ArrowWriteBuilder implements WriteBuilder {

private final LogicalWriteInfo writeInfo;

public ArrowWriteBuilder(LogicalWriteInfo writeInfo) {
this.writeInfo = writeInfo;
}

@Override
public BatchWrite buildForBatch() {
return new ArrowBatchWrite(writeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.arrow;

import java.io.IOException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to change this to org.apache.sedona.arrow.ArrowSerializer. Do I need to do anything special to make Scala classes accessible to Java here? (Or should the whole thing be done in Scala?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok to use scala classes in java code in the same project. However, it seems that the javadoc runs before the scala compiling finishes, and thus it fails the java doc process. We can set failedOnError to false since we are ignoring the doclint anyway in the maven-javadoc-plugin. E.g.,

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-javadoc-plugin</artifactId>
                <version>2.10.4</version>
                <executions>
                    <execution>
                        <id>attach-javadocs</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <additionalparam>-Xdoclint:none</additionalparam>
                    <failOnError>false</failOnError>
                </configuration>
            </plugin>

Also, some of the classes created in this PR uses spark classes (e.g., SparkIntervalUtils) introduced in 3.5 so it won't build on 3.3 and 3.4 unless these codes are moved to version specific folders.

import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

class ArrowWriter implements DataWriter<InternalRow> {
private final int partitionId;
private final long taskId;
private int rowCount;
private AgnosticEncoder<Row> encoder;
private Encoder<Row> rowEncoder;
// https://github.com/apache/spark/blob/9353e94e50f3f73565f5f0023effd7e265c177b9/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala#L50
private ArrowSerializer<Row> serializer;

public ArrowWriter(int partitionId, long taskId, StructType schema) {
this.partitionId = partitionId;
this.taskId = taskId;
this.rowCount = 0;
this.encoder = RowEncoder.encoderFor(schema);
this.serializer = new ArrowSerializer<Row>(encoder, new RootAllocator(), "UTC");

// Create file, write schema
// Problem: ArrowSerializer() does not expose internal to write just the schema
// bytes.
}

@Override
public void close() throws IOException {
// Close file
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'close'");
}

@Override
public void write(InternalRow record) throws IOException {
// Problem: serializer needs a Row but we have an InternalRow
// serializer.append(encoder.fromRow(record));

rowCount++;
if (rowCount > 1024) {
// Problem: writeIpcStream() writes both the schema and the batch, but
// we only want the batch
// serializer.writeIpcStream(null);
rowCount = 0;
}
}

@Override
public WriterCommitMessage commit() throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'commit'");
}

@Override
public void abort() throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'abort'");
}
}
Loading
Loading