|
| 1 | +# 文件的导入与导出 |
| 2 | + |
| 3 | +Arctern RESTful 服务借助 Spark 的文件读写功能完成数据的导入和导出。Spark 支持多种数据格式文件导入,以下是针对 CSV、PARQUET、JSON、ORC 等数据格式文件的导入导出例子。更多的文件格式支持请查看 [Spark 官方文档](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html) |
| 4 | + |
| 5 | +```python |
| 6 | +>>> # 导入 CSV 文件,导出 PARQUET 文件 |
| 7 | +''' |
| 8 | +CSV 文件内容: |
| 9 | +geos |
| 10 | +POINT (30 10) |
| 11 | +POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10)) |
| 12 | +POLYGON ((1 2, 3 4, 5 6, 1 2)) |
| 13 | +POLYGON ((1 1, 3 1, 3 3, 1 3, 1 1)) |
| 14 | +''' |
| 15 | +>>> from pyspark.sql import SparkSession |
| 16 | +>>> from arctern_pyspark import register_funcs |
| 17 | +>>> |
| 18 | +>>> # 创建 SparkSession 并对其进行配置 |
| 19 | +>>> spark_session = SparkSession.builder.appName("Python Arrow-in-Spark example").getOrCreate() |
| 20 | +>>> spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") |
| 21 | +>>> |
| 22 | +>>> # 注册 Arctern-Spark 提供的函数 |
| 23 | +>>> register_funcs(spark_session) |
| 24 | +>>> |
| 25 | +>>> # 数据导入 |
| 26 | +>>> df = spark_session.read.format('csv').options(header='true',sep='|').load("/path/to/geos.csv") |
| 27 | +>>> df.show(100,0) |
| 28 | ++---------------------------------------------+ |
| 29 | +|geos | |
| 30 | ++---------------------------------------------+ |
| 31 | +|POINT (30 10) | |
| 32 | +|POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))| |
| 33 | +|POLYGON ((1 2, 3 4, 5 6, 1 2)) | |
| 34 | +|POLYGON ((1 1, 3 1, 3 3, 1 3, 1 1)) | |
| 35 | ++---------------------------------------------+ |
| 36 | +>>> |
| 37 | +>>> # 为导入数据创建数据表 ‘simple’ 并对其进行处理 |
| 38 | +>>> df.createOrReplaceTempView("simple") |
| 39 | +>>> spark_session.sql("select ST_IsSimple(ST_GeomFromText(geos)) from simple").show(100,0) |
| 40 | ++----------------------------------+ |
| 41 | +|ST_IsSimple(ST_GeomFromText(geos))| |
| 42 | ++----------------------------------+ |
| 43 | +|true | |
| 44 | +|true | |
| 45 | +|false | |
| 46 | +|true | |
| 47 | ++----------------------------------+ |
| 48 | +>>> |
| 49 | +>>> # 数据导出 |
| 50 | +>>> df.select("geos").write.save("/path/to/geos.parquet", format="parquet") |
| 51 | +>>> |
| 52 | +>>> # 导入 PARQUET 文件,导出 JSON 文件 |
| 53 | +>>> df = spark_session.read.format('parquet').options(header='true',sep='|').load("/path/to/geos.parquet") |
| 54 | +>>> df.show(100,0) |
| 55 | ++---------------------------------------------+ |
| 56 | +|geos | |
| 57 | ++---------------------------------------------+ |
| 58 | +|POINT (30 10) | |
| 59 | +|POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))| |
| 60 | +|POLYGON ((1 2, 3 4, 5 6, 1 2)) | |
| 61 | +|POLYGON ((1 1, 3 1, 3 3, 1 3, 1 1)) | |
| 62 | ++---------------------------------------------+ |
| 63 | +>>> |
| 64 | +>>> df.createOrReplaceTempView("simple") |
| 65 | +>>> spark_session.sql("select ST_IsSimple(ST_GeomFromText(geos)) from simple").show(100,0) |
| 66 | ++----------------------------------+ |
| 67 | +|ST_IsSimple(ST_GeomFromText(geos))| |
| 68 | ++----------------------------------+ |
| 69 | +|true | |
| 70 | +|true | |
| 71 | +|false | |
| 72 | +|true | |
| 73 | ++----------------------------------+ |
| 74 | +>>> |
| 75 | +>>> df.select("geos").write.save("/path/to/geos.json", format="json") |
| 76 | +>>> |
| 77 | +>>> # 导入 JSON 文件,导出 ORC 文件 |
| 78 | +>>> df = spark_session.read.format('json').options(header='true',sep='|').load("/path/to/geos.json") |
| 79 | +>>> df.show(100,0) |
| 80 | ++---------------------------------------------+ |
| 81 | +|geos | |
| 82 | ++---------------------------------------------+ |
| 83 | +|POINT (30 10) | |
| 84 | +|POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))| |
| 85 | +|POLYGON ((1 2, 3 4, 5 6, 1 2)) | |
| 86 | +|POLYGON ((1 1, 3 1, 3 3, 1 3, 1 1)) | |
| 87 | ++---------------------------------------------+ |
| 88 | +>>> |
| 89 | +>>> df.createOrReplaceTempView("simple") |
| 90 | +>>> spark_session.sql("select ST_IsSimple(ST_GeomFromText(geos)) from simple").show(100,0) |
| 91 | ++----------------------------------+ |
| 92 | +|ST_IsSimple(ST_GeomFromText(geos))| |
| 93 | ++----------------------------------+ |
| 94 | +|true | |
| 95 | +|true | |
| 96 | +|false | |
| 97 | +|true | |
| 98 | ++----------------------------------+ |
| 99 | +>>> |
| 100 | +>>> df.write.save("/path/to/geos.orc", format="orc") |
| 101 | +>>> |
| 102 | +>>> # 导入 ORC 文件,导出 CSV 文件 |
| 103 | +>>> df = spark_session.read.format('orc').options(header='true',sep='|').load("/path/to/geos.orc") |
| 104 | +>>> df.show(100,0) |
| 105 | ++---------------------------------------------+ |
| 106 | +|geos | |
| 107 | ++---------------------------------------------+ |
| 108 | +|POINT (30 10) | |
| 109 | +|POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))| |
| 110 | +|POLYGON ((1 2, 3 4, 5 6, 1 2)) | |
| 111 | +|POLYGON ((1 1, 3 1, 3 3, 1 3, 1 1)) | |
| 112 | ++---------------------------------------------+ |
| 113 | +>>> |
| 114 | +>>> df.createOrReplaceTempView("simple") |
| 115 | +>>> spark_session.sql("select ST_IsSimple(ST_GeomFromText(geos)) from simple").show(100,0) |
| 116 | ++----------------------------------+ |
| 117 | +|ST_IsSimple(ST_GeomFromText(geos))| |
| 118 | ++----------------------------------+ |
| 119 | +|true | |
| 120 | +|true | |
| 121 | +|false | |
| 122 | +|true | |
| 123 | ++----------------------------------+ |
| 124 | +>>> |
| 125 | +>>> df.select("geos").write.save("/path/to/geos.csv", format="csv") |
| 126 | +``` |
0 commit comments