Skip to content

Commit 211ec61

Browse files
committed
Slightly refactor bigtable implementation
This is just a few renames and refactors
1 parent 2367574 commit 211ec61

File tree

1 file changed

+41
-43
lines changed

1 file changed

+41
-43
lines changed

objectstore-service/src/backend/bigtable.rs

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -49,73 +49,92 @@ pub struct BigTableConfig {
4949
}
5050

5151
pub struct BigTableBackend {
52-
config: BigTableConfig,
5352
bigtable: BigTableConnection,
5453
admin: BigTableTableAdminConnection,
54+
5555
instance_path: String,
5656
table_path: String,
57+
table_name: String,
58+
}
59+
60+
impl fmt::Debug for BigTableBackend {
61+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62+
f.debug_struct("BigTableBackend")
63+
.field("instance_path", &self.instance_path)
64+
.field("table_path", &self.table_path)
65+
.field("table_name", &self.table_name)
66+
.finish_non_exhaustive()
67+
}
5768
}
5869

5970
impl BigTableBackend {
60-
pub async fn new(config: BigTableConfig) -> Result<Self> {
71+
pub async fn new(
72+
BigTableConfig {
73+
project_id,
74+
instance_name,
75+
table_name,
76+
}: BigTableConfig,
77+
) -> Result<Self> {
6178
let tokio_runtime = Handle::current();
6279
let tokio_workers = tokio_runtime.metrics().num_workers();
6380

6481
// TODO on channel_size: Idle connections are automatically closed in “a few minutes”. We
6582
// need to make sure that on longer idle period the channels are re-opened.
83+
let channel_size = 2 * tokio_workers;
6684

6785
// NB: Defaults to gcp_auth::provider() internally, but first checks the
6886
// BIGTABLE_EMULATOR_HOST environment variable for local dev & tests.
6987
let bigtable = BigTableConnection::new(
70-
&config.project_id,
71-
&config.instance_name,
72-
false, // is_read_only
73-
2 * tokio_workers, // channel_size
88+
&project_id,
89+
&instance_name,
90+
false, // is_read_only
91+
channel_size,
7492
Some(CONNECT_TIMEOUT),
7593
)
7694
.await?;
7795

7896
let client = bigtable.client();
79-
let instance_path = format!(
80-
"projects/{}/instances/{}",
81-
config.project_id, config.instance_name
82-
);
83-
let table_path = client.get_full_table_name(&config.table_name);
97+
let instance_path = format!("projects/{project_id}/instances/{instance_name}");
98+
let table_path = client.get_full_table_name(&table_name);
8499

85100
let admin = BigTableTableAdminConnection::new(
86-
&config.project_id,
87-
&config.instance_name,
88-
2 * tokio_workers, // channel_size
101+
&project_id,
102+
&instance_name,
103+
channel_size,
89104
Some(CONNECT_TIMEOUT),
90105
)
91106
.await?;
92107

93108
let backend = Self {
94-
config,
95109
bigtable,
96110
admin,
111+
97112
instance_path,
98113
table_path,
114+
table_name,
99115
};
100116

101117
backend.ensure_table().await?;
102118
Ok(backend)
103119
}
104-
}
105120

106-
impl BigTableBackend {
107121
async fn ensure_table(&self) -> Result<Table> {
108122
let mut admin = self.admin.client();
109123

110-
match admin.get_table(self.get_table_request()).await {
124+
let get_request = GetTableRequest {
125+
name: self.table_path.clone(),
126+
view: View::Unspecified as i32,
127+
};
128+
129+
match admin.get_table(get_request.clone()).await {
111130
Err(AdminError::RpcError(e)) if e.code() == Code::NotFound => (), // fall through
112131
Err(e) => return Err(e.into()),
113132
Ok(table) => return Ok(table),
114133
}
115134

116-
let request = CreateTableRequest {
135+
let create_request = CreateTableRequest {
117136
parent: self.instance_path.clone(),
118-
table_id: self.config.table_name.clone(), // name without full path
137+
table_id: self.table_name.clone(), // name without full path
119138
table: Some(Table {
120139
name: String::new(), // Must be empty during creation
121140
cluster_states: Default::default(),
@@ -152,23 +171,16 @@ impl BigTableBackend {
152171
initial_splits: vec![],
153172
};
154173

155-
match admin.create_table(request).await {
174+
match admin.create_table(create_request).await {
156175
// Race condition: table was created by another concurrent call.
157176
Err(AdminError::RpcError(e)) if e.code() == Code::AlreadyExists => {
158-
Ok(admin.get_table(self.get_table_request()).await?)
177+
Ok(admin.get_table(get_request).await?)
159178
}
160179
Err(e) => Err(e.into()),
161180
Ok(created_table) => Ok(created_table),
162181
}
163182
}
164183

165-
fn get_table_request(&self) -> GetTableRequest {
166-
GetTableRequest {
167-
name: self.table_path.clone(),
168-
view: View::Unspecified as i32,
169-
}
170-
}
171-
172184
async fn mutate<I>(&self, path: Vec<u8>, mutations: I) -> Result<MutateRowResponse>
173185
where
174186
I: IntoIterator<Item = Mutation>,
@@ -189,20 +201,6 @@ impl BigTableBackend {
189201
}
190202
}
191203

192-
impl fmt::Debug for BigTableBackend {
193-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194-
f.debug_struct("BigTableBackend")
195-
.field("config", &self.config)
196-
.field("connection", &format_args!("BigTableConnection {{ ... }}"))
197-
.field("client", &format_args!("BigTableClient {{ ... }}"))
198-
// .field("admin", &self.admin)
199-
.field("table_name", &self.table_path)
200-
.finish_non_exhaustive()
201-
}
202-
}
203-
204-
impl BigTableBackend {}
205-
206204
#[async_trait::async_trait]
207205
impl Backend for BigTableBackend {
208206
async fn put_object(

0 commit comments

Comments
 (0)