Skip to content

Commit a369370

Browse files
committed
Merge branch 'main' of github.com:irfanghat/spark-connect-cpp into feature/cmake-vcpkg-toolchain
2 parents ff3b29a + b71fb46 commit a369370

18 files changed

+2546
-35
lines changed

.github/workflows/build_and_test.yml

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,21 @@ jobs:
3232
- name: Start Spark Connect Server
3333
run: |
3434
$HOME/spark/sbin/start-connect-server.sh \
35-
--packages org.apache.spark:spark-connect_2.12:3.5.1
36-
sleep 10
35+
--packages "org.apache.spark:spark-connect_2.12:3.5.3,io.delta:delta-spark_2.12:3.2.0,io.graphframes:graphframes-spark3_2.12:0.10.0,io.graphframes:graphframes-connect-spark3_2.12:0.10.0" \
36+
--conf "spark.connect.extensions.relation.classes=org.apache.spark.sql.graphframes.GraphFramesConnect" \
37+
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError" \
38+
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
39+
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
40+
--conf "spark.driver.memory=4g" \
41+
--conf "spark.executor.memory=4g" \
42+
--conf "spark.memory.fraction=0.8" \
43+
--conf "spark.memory.storageFraction=0.3" \
44+
--conf "spark.sql.shuffle.partitions=8" \
45+
--conf "spark.default.parallelism=8" \
46+
--conf "spark.driver.maxResultSize=2g"
47+
48+
# wait for server
49+
sleep 15
3750
3851
- name: Install gcovr
3952
run: |

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ The Spark Connect C++ client is **not a replacement** for Python or Scala Spark
112112
| Analytics | Window Functions || Planned |
113113
| Catalog | Table/Database Management || Planned |
114114
| Streaming | Structured Streaming || Not Implemented |
115+
| GraphFrames | Graph processing & analytics || Implemented |
115116

116117
---
117118

@@ -349,7 +350,7 @@ Running a sample application:
349350
int main()
350351
{
351352
auto spark = &SparkSession::builder()
352-
.master("localhost")
353+
.master("sc://localhost")
353354
.appName("demo")
354355
.getOrCreate();
355356

docker-compose.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,31 @@ services:
33
image: "apache/spark:3.5.3-scala2.12-java17-python3-ubuntu"
44
command: >
55
/opt/spark/sbin/start-connect-server.sh
6-
--packages "org.apache.spark:spark-connect_2.12:3.5.3,io.delta:delta-spark_2.12:3.2.0"
7-
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp"
6+
--packages "org.apache.spark:spark-connect_2.12:3.5.3,io.delta:delta-spark_2.12:3.2.0,io.graphframes:graphframes-spark3_2.12:0.10.0,io.graphframes:graphframes-connect-spark3_2.12:0.10.0"
7+
--conf "spark.connect.extensions.relation.classes=org.apache.spark.sql.graphframes.GraphFramesConnect"
8+
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError"
89
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
910
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
11+
--conf "spark.driver.memory=4g"
12+
--conf "spark.executor.memory=4g"
13+
--conf "spark.memory.fraction=0.8"
14+
--conf "spark.memory.storageFraction=0.3"
15+
--conf "spark.sql.shuffle.partitions=8"
16+
--conf "spark.default.parallelism=8"
17+
--conf "spark.driver.maxResultSize=2g"
1018
environment:
1119
- SPARK_NO_DAEMONIZE=true
1220
ports:
1321
- "4040:4040"
1422
- "15002:15002"
1523
volumes:
1624
- ./datasets:/opt/spark/work-dir/datasets
25+
deploy:
26+
resources:
27+
limits:
28+
memory: 8g
29+
reservations:
30+
memory: 4g
1731
networks:
1832
- default
1933

docs/API_REFERENCE.md

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010
int main() {
1111
Config conf;
1212
conf.setHost("localhost").setPort(15002);
13+
// -----------------------------------------------------------
14+
// Alternatively...
15+
// conf.setHost("sc://localhost").setPort(15002);
1316
// conf.setHost("123.45.67.8").setPort(15002);
17+
// ...
18+
// -----------------------------------------------------------
1419
SparkSession spark(conf);
1520

1621
auto df = spark->sql("SELECT * FROM range(100)");
@@ -1194,4 +1199,165 @@ df.show();
11941199
| Grant | Martin | 72 | grrm@cmpny.com | Grant Martin | adult |
11951200
| Hannah | Abbott | 18 | h.abbott@hogwarts... | Hannah Abbott | minor |
11961201
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
1202+
```
1203+
1204+
## GraphFrames
1205+
1206+
### Page Rank
1207+
1208+
```cpp
1209+
DataFrame *vertices = nullptr;
1210+
DataFrame *edges = nullptr;
1211+
1212+
vertices = spark->sql(R"(
1213+
SELECT CAST(id AS INT) AS id, name, age FROM VALUES
1214+
(1, 'Alice', 34),
1215+
(2, 'Bob', 36),
1216+
(3, 'Charlie', 30),
1217+
(4, 'Anne', 29)
1218+
AS people(id, name, age)
1219+
)");
1220+
1221+
edges = spark->sql(R"(
1222+
SELECT CAST(src AS INT) AS src, CAST(dst AS INT) AS dst, relationship FROM VALUES
1223+
(1, 2, 'friend'),
1224+
(2, 3, 'follow'),
1225+
(3, 1, 'friend'),
1226+
(1, 4, 'colleague')
1227+
AS connections(src, dst, relationship)
1228+
)");
1229+
1230+
auto gf = GraphFrame(*vertices, *edges);
1231+
1232+
auto rows = gf().pageRank(0.15, 5).collect();
1233+
gf().pageRank(0.15, 5).show();
1234+
```
1235+
1236+
### Motif Matching
1237+
1238+
```cpp
1239+
auto gf = GraphFrame(*vertices, *edges);
1240+
1241+
gf().find("(a)-[e]->(b)");
1242+
gf().find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)");
1243+
gf().find("(a)-[e1]->(b); (b)-[e2]->(c)");
1244+
gf().find("(a)-[e]->(b)");
1245+
gf().find("(a)-[e]->(b)").show();
1246+
```
1247+
1248+
### Triplets
1249+
1250+
```cpp
1251+
auto gf = GraphFrame(*vertices, *edges);
1252+
gf().triplets();
1253+
gf().triplets().show();
1254+
```
1255+
1256+
### Filter Edges
1257+
1258+
```cpp
1259+
auto gf = GraphFrame(*vertices, *edges);
1260+
gf().filterEdges("relationship = 'friend'");
1261+
gf().filterEdges(col("relationship") == lit("friend"));
1262+
gf().filterEdges("relationship = 'enemy'");
1263+
gf().filterEdges("relationship = 'friend'").show()
1264+
```
1265+
1266+
### Filter Vertices
1267+
1268+
```cpp
1269+
auto gf = GraphFrame(*vertices, *edges);
1270+
gf().filterVertices("age < 34");
1271+
gf().filterVertices(col("age") < lit(34));
1272+
gf().filterVertices("age > 100");
1273+
gf().filterVertices("age < 34").show();
1274+
```
1275+
1276+
### Drop Isolated Vertices
1277+
1278+
```cpp
1279+
auto gf = GraphFrame(*vertices, *edges);
1280+
gf().dropIsolatedVertices();
1281+
1282+
auto v_with_isolated = spark->sql(R"(
1283+
SELECT * FROM VALUES
1284+
(1, 'Alice', 34),
1285+
(2, 'Bob', 36),
1286+
(3, 'Charlie', 30),
1287+
(4, 'Anne', 29),
1288+
(99, 'Ghost', 99)
1289+
AS people(id, name, age)
1290+
)");
1291+
1292+
GraphFrame(v_with_isolated, *edges).dropIsolatedVertices().show();
1293+
```
1294+
1295+
### Breadth First Search
1296+
1297+
```cpp
1298+
auto gf = GraphFrame(*vertices, *edges);
1299+
gf().bfs("id = 1", "id = 3");
1300+
gf().bfs("id = 4", "id = 1");
1301+
gf().bfs("id = 1", "id = 2", "relationship = 'friend'");
1302+
gf().bfs(col("id") == lit(1), col("id") == lit(3));
1303+
gf().bfs("id = 1", "id = 3").show();
1304+
```
1305+
1306+
### Connected Components
1307+
1308+
```cpp
1309+
auto gf = GraphFrame(*vertices, *edges);
1310+
gf().connectedComponents();
1311+
gf().connectedComponents().show();
1312+
```
1313+
1314+
### Strongly Connected Components
1315+
1316+
```cpp
1317+
auto gf = GraphFrame(*vertices, *edges);
1318+
gf().stronglyConnectedComponents(10);
1319+
gf().stronglyConnectedComponents();
1320+
gf().stronglyConnectedComponents().show();
1321+
```
1322+
1323+
### Shortest Paths
1324+
1325+
```cpp
1326+
auto gf = GraphFrame(*vertices, *edges);
1327+
gf().shortestPaths(std::vector<int32_t>{1, 3});
1328+
gf().shortestPaths(std::vector<int32_t>{1});
1329+
gf().shortestPaths(std::vector<int32_t>{1}).show();
1330+
```
1331+
1332+
### Triangle Count
1333+
1334+
```cpp
1335+
auto gf = GraphFrame(*vertices, *edges);
1336+
1337+
gf().triangleCount();
1338+
gf().triangleCount().show();
1339+
1340+
auto rows = gf().triangleCount().collect();
1341+
1342+
std::map<int32_t, int64_t> counts;
1343+
for (auto &row : rows)
1344+
counts[row.get<int32_t>("id")] = row.get<int64_t>("count");
1345+
```
1346+
1347+
### Label Propagation
1348+
1349+
```cpp
1350+
auto gf = GraphFrame(*vertices, *edges);
1351+
gf().labelPropagation(5);
1352+
```
1353+
1354+
### Method Chaining (GraphFrames)
1355+
1356+
```cpp
1357+
// GraphFrames result into plain DataFrame ops
1358+
auto result = gf()
1359+
.find("(a)-[e]->(b)")
1360+
.filter("e.relationship = 'friend'");
1361+
1362+
auto result = gf().pageRank(0.15, 5).filter("pagerank > 0.0");
11971363
```

src/buf.yaml

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/dataframe.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -603,12 +603,9 @@ int64_t DataFrame::count()
603603
auto result_df = this->groupBy().count();
604604
auto rows = result_df.collect();
605605

606-
if (rows.empty())
606+
if (rows.empty() || rows[0].values.empty())
607607
return 0;
608608

609-
// -----------------------------------------------------------
610-
// Extract the first column of the first row i.e. the count
611-
// -----------------------------------------------------------
612609
return rows[0].get_long(rows[0].column_names[0]);
613610
}
614611

src/dataframe.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212

1313
using namespace spark::sql::types;
1414

15+
namespace graphframes
16+
{
17+
class GraphFrame;
18+
}
19+
1520
class DataFrameWriter;
1621
class GroupedData;
1722

@@ -369,6 +374,7 @@ class DataFrame
369374

370375
private:
371376
friend class GroupedData;
377+
friend class graphframes::GraphFrame;
372378

373379
std::shared_ptr<spark::connect::SparkConnectService::Stub> stub_;
374380
spark::connect::Plan plan_;

src/functions.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ namespace spark::sql::functions
6767
return Column(std::move(e));
6868
}
6969

70+
Column Column::cast(const std::string &type) const
71+
{
72+
spark::connect::Expression e;
73+
auto *cast = e.mutable_cast();
74+
*cast->mutable_expr() = *this->expr;
75+
cast->mutable_type()->mutable_unparsed()->set_data_type_string(type);
76+
return Column(std::move(e));
77+
}
78+
7079
Column lower(const Column &e)
7180
{
7281
spark::connect::Expression expr;
@@ -85,6 +94,13 @@ namespace spark::sql::functions
8594
return Column(std::move(e));
8695
}
8796

97+
Column lit(int64_t value)
98+
{
99+
spark::connect::Expression e;
100+
e.mutable_literal()->set_integer(value);
101+
return Column(std::move(e));
102+
}
103+
88104
Column lit(double value)
89105
{
90106
spark::connect::Expression e;

src/functions.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ namespace spark::sql::functions
3737

3838
Column alias(const std::string &name) const;
3939
Column otherwise(const Column &value) const;
40+
Column cast(const std::string &type) const;
4041
};
4142

4243
/**
@@ -48,6 +49,7 @@ namespace spark::sql::functions
4849
* @brief Creates a `Column` of literal value.
4950
*/
5051
Column lit(int32_t value);
52+
Column lit(int64_t value);
5153
Column lit(double value);
5254
Column lit(const std::string &value);
5355

0 commit comments

Comments
 (0)