Skip to content

Commit 0875830

Browse files
committed
Merge branch 'main' of github.com:irfanghat/spark-connect-cpp into feature/cmake-vcpkg-toolchain
2 parents 08aa60e + 77b107a commit 0875830

File tree

5 files changed

+407
-4
lines changed

5 files changed

+407
-4
lines changed

src/runtime_config.cpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#include "runtime_config.h"
2+
3+
#include <stdexcept>
4+
#include <sstream>
5+
6+
RuntimeConfig::RuntimeConfig(
7+
std::shared_ptr<spark::connect::SparkConnectService::Stub> stub,
8+
const std::string &session_id,
9+
const std::string &user_id)
10+
: stub_(std::move(stub)), session_id_(session_id), user_id_(user_id)
11+
{
12+
}
13+
14+
spark::connect::ConfigResponse RuntimeConfig::sendConfig(
15+
const spark::connect::ConfigRequest::Operation &op) const
16+
{
17+
spark::connect::ConfigRequest request;
18+
request.set_session_id(session_id_);
19+
request.mutable_user_context()->set_user_id(user_id_);
20+
*request.mutable_operation() = op;
21+
22+
grpc::ClientContext context;
23+
spark::connect::ConfigResponse response;
24+
grpc::Status status = stub_->Config(&context, request, &response);
25+
26+
if (!status.ok())
27+
throw std::runtime_error("Config RPC failed: " + status.error_message());
28+
29+
return response;
30+
}
31+
32+
void RuntimeConfig::set(const std::string &key, const std::string &value)
33+
{
34+
spark::connect::ConfigRequest::Operation op;
35+
auto *kv = op.mutable_set()->add_pairs();
36+
kv->set_key(key);
37+
kv->set_value(value);
38+
sendConfig(op);
39+
}
40+
41+
void RuntimeConfig::set(const std::string &key, bool value)
42+
{
43+
set(key, value ? std::string("true") : std::string("false"));
44+
}
45+
46+
void RuntimeConfig::set(const std::string &key, int64_t value)
47+
{
48+
set(key, std::to_string(value));
49+
}
50+
51+
void RuntimeConfig::set(const std::string &key, const char *value)
52+
{
53+
set(key, std::string(value));
54+
}
55+
56+
std::string RuntimeConfig::get(const std::string &key) const
57+
{
58+
spark::connect::ConfigRequest::Operation op;
59+
op.mutable_get()->add_keys(key);
60+
auto response = sendConfig(op);
61+
62+
if (response.pairs_size() == 0 || !response.pairs(0).has_value())
63+
throw std::runtime_error("Config key not found: " + key);
64+
65+
return response.pairs(0).value();
66+
}
67+
68+
std::string RuntimeConfig::get(const std::string &key,
69+
const std::string &default_value) const
70+
{
71+
spark::connect::ConfigRequest::Operation op;
72+
auto *gd = op.mutable_get_with_default()->add_pairs();
73+
gd->set_key(key);
74+
gd->set_value(default_value);
75+
auto response = sendConfig(op);
76+
77+
if (response.pairs_size() == 0)
78+
return default_value;
79+
80+
return response.pairs(0).has_value() ? response.pairs(0).value() : default_value;
81+
}
82+
83+
std::optional<std::string> RuntimeConfig::getOption(const std::string &key) const
84+
{
85+
spark::connect::ConfigRequest::Operation op;
86+
op.mutable_get_option()->add_keys(key);
87+
auto response = sendConfig(op);
88+
89+
if (response.pairs_size() == 0 || !response.pairs(0).has_value())
90+
return std::nullopt;
91+
92+
return response.pairs(0).value();
93+
}
94+
95+
std::map<std::string, std::string> RuntimeConfig::getAll() const
96+
{
97+
spark::connect::ConfigRequest::Operation op;
98+
99+
// -------------------------------------------------
100+
// No arguments are needed, this returns all pairs
101+
// -------------------------------------------------
102+
op.mutable_get_all();
103+
auto response = sendConfig(op);
104+
105+
std::map<std::string, std::string> result;
106+
for (const auto &pair : response.pairs())
107+
{
108+
if (pair.has_value())
109+
result[pair.key()] = pair.value();
110+
else
111+
result[pair.key()] = "";
112+
}
113+
return result;
114+
}
115+
116+
void RuntimeConfig::unset(const std::string &key)
117+
{
118+
spark::connect::ConfigRequest::Operation op;
119+
op.mutable_unset()->add_keys(key);
120+
sendConfig(op);
121+
}
122+
123+
bool RuntimeConfig::isModifiable(const std::string &key) const
124+
{
125+
spark::connect::ConfigRequest::Operation op;
126+
op.mutable_is_modifiable()->add_keys(key);
127+
auto response = sendConfig(op);
128+
129+
if (response.pairs_size() == 0 || !response.pairs(0).has_value())
130+
return false;
131+
132+
return response.pairs(0).value() == "true";
133+
}

src/runtime_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class RuntimeConfig
3737
*/
3838
void set(const std::string &key, int64_t value);
3939

40+
/**
41+
* @brief Sets a config key to a string value.
42+
*/
43+
void set(const std::string &key, const char *value);
44+
4045
/**
4146
* @brief Gets a config value. Throws if the key is not set.
4247
*/

src/session.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,27 @@ SparkSession SparkSession::newSession()
162162
return SparkSession(newConfig);
163163
}
164164

165+
/**
166+
* @brief Runtime configuration interface for Spark.
167+
* This is the interface through which the user can get and set all Spark and Hadoop configurations that are relevant to Spark SQL.
168+
* When getting the value of a config, this defaults to the value set in the underlying `SparkContext`, if any.
169+
*/
170+
RuntimeConfig SparkSession::conf()
171+
{
172+
return RuntimeConfig(stub_, config_.session_id, config_.user_id);
173+
}
174+
175+
/**
176+
* @brief Set the directory under which RDDs are going to be checkpointed.
177+
* The directory must be an HDFS path if running on a cluster.
178+
*
179+
* @param path path to the directory where checkpoint files will be stored (must be HDFS path if running in cluster)
180+
*/
181+
void SparkSession::setCheckpointDir(const std::string &path)
182+
{
183+
conf().set("spark.checkpoint.dir", path);
184+
}
185+
165186
/**
166187
* @brief Stops the underlying Spark session.
167188
*/

src/session.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "config.h"
1414
#include "dataframe.h"
1515
#include "reader.h"
16+
#include "runtime_config.h"
1617

1718
class DataFrameReader;
1819

@@ -38,7 +39,7 @@ class SparkSession
3839
public:
3940
/**
4041
* @brief Sets the master URL for the SparkSession.
41-
* @param master The master URL (e.g., "adb-xxx.azuredatabricks.net").
42+
* @param master The master URL (e.g., "sc://localhost" or "adb-xxx.azuredatabricks.net").
4243
* @return A reference to the Builder for chaining calls.
4344
*/
4445
Builder &master(const std::string &master)
@@ -122,9 +123,32 @@ class SparkSession
122123
void stop();
123124

124125
/**
125-
* @brief Access the session configuration.
126+
* @brief Returns a RuntimeConfig for reading and writing Spark session config.
127+
*
128+
* Changes are applied on the server and survive across DataFrame operations within this session.
129+
*
130+
* @example
131+
* spark.conf().set("spark.sql.shuffle.partitions", "8");
132+
*
133+
* auto val = spark.conf().get("spark.sql.shuffle.partitions");
126134
*/
127-
Config &conf() { return config_; }
135+
RuntimeConfig conf();
136+
137+
/**
138+
* @brief Sets the Spark checkpoint directory for this session.
139+
*
140+
* Required by algorithms that use checkpointing internally, such as
141+
* GraphFrames ConnectedComponents. Equivalent to calling `spark.conf().set("spark.checkpoint.dir", path)`
142+
*
143+
* @param path A path writable by the Spark executors (e.g. `/tmp/checkpoints`,
144+
* `hdfs:///checkpoints`, or an `ABFS/S3` URI for cloud deployments).
145+
*/
146+
void setCheckpointDir(const std::string &path);
147+
148+
/**
149+
* @brief Access the internal connection configuration (host, port, auth, etc.)
150+
*/
151+
Config &connection() { return config_; }
128152

129153
std::string session_id() const { return config_.session_id; }
130154
std::string user_id() const { return config_.user_id; }
@@ -139,7 +163,7 @@ class SparkSession
139163
static std::once_flag once_flag_;
140164

141165
/**
142-
* @brief Internal Spark Configuration
166+
* @brief Internal Spark connection configuration
143167
*/
144168
Config config_;
145169
std::shared_ptr<spark::connect::SparkConnectService::Stub> stub_;

0 commit comments

Comments
 (0)