Skip to content

Commit 7a2f0ed

Browse files
authored
safekeeper: lift decoding and interpretation of WAL to the safekeeper (#9746)
## Problem For any given tenant shard, pageservers receive all of the tenant's WAL from the safekeeper. This soft-blocks us from using larger shard counts due to bandwidth concerns and CPU overhead of filtering out the records. ## Summary of changes This PR lifts the decoding and interpretation of WAL from the pageserver into the safekeeper. A customised PG replication protocol is used where instead of sending raw WAL, the safekeeper sends filtered, interpreted records. The receiver drives the protocol selection, so, on the pageserver side, usage of the new protocol is gated by a new pageserver config: `wal_receiver_protocol`. More granularly the changes are: 1. Optionally inject the protocol and shard identity into the arguments used for starting replication 2. On the safekeeper side, implement a new wal sending primitive which decodes and interprets records before sending them over 3. On the pageserver side, implement the ingestion of this new replication message type. It's very similar to what we already have for raw wal (minus decoding and interpreting). ## Notes * This PR currently uses my [branch of rust-postgres](https://github.com/neondatabase/rust-postgres/tree/vlad/interpreted-wal-record-replication-support) which includes the deserialization logic for the new replication message type. PR for that is open [here](neondatabase/rust-postgres#32). * This PR contains changes for both pageservers and safekeepers. It's safe to merge because the new protocol is disabled by default on the pageserver side. We can gradually start enabling it in subsequent releases. * CI tests are running on #9747 ## Links Related: #9336 Epic: #9329
1 parent 5c23569 commit 7a2f0ed

26 files changed

+870
-86
lines changed

Cargo.lock

Lines changed: 7 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/pageserver_api/src/config.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
str::FromStr,
1919
time::Duration,
2020
};
21-
use utils::logging::LogFormat;
21+
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
2222

2323
use crate::models::ImageCompressionAlgorithm;
2424
use crate::models::LsnLease;
@@ -120,6 +120,7 @@ pub struct ConfigToml {
120120
pub no_sync: Option<bool>,
121121
#[serde(with = "humantime_serde")]
122122
pub server_side_batch_timeout: Option<Duration>,
123+
pub wal_receiver_protocol: PostgresClientProtocol,
123124
}
124125

125126
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -330,6 +331,9 @@ pub mod defaults {
330331
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
331332

332333
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
334+
335+
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
336+
utils::postgres_client::PostgresClientProtocol::Vanilla;
333337
}
334338

335339
impl Default for ConfigToml {
@@ -418,6 +422,7 @@ impl Default for ConfigToml {
418422
.map(|duration| humantime::parse_duration(duration).unwrap()),
419423
tenant_config: TenantConfigToml::default(),
420424
no_sync: None,
425+
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
421426
}
422427
}
423428
}

libs/pq_proto/src/lib.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
562562
options: &'a [&'a str],
563563
},
564564
KeepAlive(WalSndKeepAlive),
565+
/// Batch of interpreted, shard filtered WAL records,
566+
/// ready for the pageserver to ingest
567+
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
565568
}
566569

567570
/// Common shorthands.
@@ -672,6 +675,25 @@ pub struct WalSndKeepAlive {
672675
pub request_reply: bool,
673676
}
674677

678+
/// Batch of interpreted WAL records used in the interpreted
679+
/// safekeeper to pageserver protocol.
680+
///
681+
/// Note that the pageserver uses the RawInterpretedWalRecordsBody
682+
/// counterpart of this from the neondatabase/rust-postgres repo.
683+
/// If you're changing this struct, you likely need to change its
684+
/// twin as well.
685+
#[derive(Debug)]
686+
pub struct InterpretedWalRecordsBody<'a> {
687+
/// End of raw WAL in [`Self::data`]
688+
pub streaming_lsn: u64,
689+
/// Current end of WAL on the server
690+
pub commit_lsn: u64,
691+
/// Start LSN of the next record in PG WAL.
692+
/// Is 0 if the portion of PG WAL did not contain any records.
693+
pub next_record_lsn: u64,
694+
pub data: &'a [u8],
695+
}
696+
675697
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
676698

677699
// single text column
@@ -996,6 +1018,20 @@ impl BeMessage<'_> {
9961018
Ok(())
9971019
})?
9981020
}
1021+
1022+
BeMessage::InterpretedWalRecords(rec) => {
1023+
// We use the COPY_DATA_TAG for our custom message
1024+
// since this tag is interpreted as raw bytes.
1025+
buf.put_u8(b'd');
1026+
write_body(buf, |buf| {
1027+
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
1028+
// dependency
1029+
buf.put_u64(rec.streaming_lsn);
1030+
buf.put_u64(rec.commit_lsn);
1031+
buf.put_u64(rec.next_record_lsn);
1032+
buf.put_slice(rec.data);
1033+
});
1034+
}
9991035
}
10001036
Ok(())
10011037
}

libs/utils/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pprof.workspace = true
3333
regex.workspace = true
3434
routerify.workspace = true
3535
serde.workspace = true
36+
serde_with.workspace = true
3637
serde_json.workspace = true
3738
signal-hook.workspace = true
3839
thiserror.workspace = true

libs/utils/src/postgres_client.rs

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
77

88
use crate::id::TenantTimelineId;
99

10+
/// Postgres client protocol types
11+
#[derive(
12+
Copy,
13+
Clone,
14+
PartialEq,
15+
Eq,
16+
strum_macros::EnumString,
17+
strum_macros::Display,
18+
serde_with::DeserializeFromStr,
19+
serde_with::SerializeDisplay,
20+
Debug,
21+
)]
22+
#[strum(serialize_all = "kebab-case")]
23+
#[repr(u8)]
24+
pub enum PostgresClientProtocol {
25+
/// Usual Postgres replication protocol
26+
Vanilla,
27+
/// Custom shard-aware protocol that replicates interpreted records.
28+
/// Used to send wal from safekeeper to pageserver.
29+
Interpreted,
30+
}
31+
32+
impl TryFrom<u8> for PostgresClientProtocol {
33+
type Error = u8;
34+
35+
fn try_from(value: u8) -> Result<Self, Self::Error> {
36+
Ok(match value {
37+
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
38+
v if v == (PostgresClientProtocol::Interpreted as u8) => {
39+
PostgresClientProtocol::Interpreted
40+
}
41+
x => return Err(x),
42+
})
43+
}
44+
}
45+
46+
pub struct ConnectionConfigArgs<'a> {
47+
pub protocol: PostgresClientProtocol,
48+
49+
pub ttid: TenantTimelineId,
50+
pub shard_number: Option<u8>,
51+
pub shard_count: Option<u8>,
52+
pub shard_stripe_size: Option<u32>,
53+
54+
pub listen_pg_addr_str: &'a str,
55+
56+
pub auth_token: Option<&'a str>,
57+
pub availability_zone: Option<&'a str>,
58+
}
59+
60+
impl<'a> ConnectionConfigArgs<'a> {
61+
fn options(&'a self) -> Vec<String> {
62+
let mut options = vec![
63+
"-c".to_owned(),
64+
format!("timeline_id={}", self.ttid.timeline_id),
65+
format!("tenant_id={}", self.ttid.tenant_id),
66+
format!("protocol={}", self.protocol as u8),
67+
];
68+
69+
if self.shard_number.is_some() {
70+
assert!(self.shard_count.is_some());
71+
assert!(self.shard_stripe_size.is_some());
72+
73+
options.push(format!("shard_count={}", self.shard_count.unwrap()));
74+
options.push(format!("shard_number={}", self.shard_number.unwrap()));
75+
options.push(format!(
76+
"shard_stripe_size={}",
77+
self.shard_stripe_size.unwrap()
78+
));
79+
}
80+
81+
options
82+
}
83+
}
84+
1085
/// Create client config for fetching WAL from safekeeper on particular timeline.
1186
/// listen_pg_addr_str is in form host:\[port\].
1287
pub fn wal_stream_connection_config(
13-
TenantTimelineId {
14-
tenant_id,
15-
timeline_id,
16-
}: TenantTimelineId,
17-
listen_pg_addr_str: &str,
18-
auth_token: Option<&str>,
19-
availability_zone: Option<&str>,
88+
args: ConnectionConfigArgs,
2089
) -> anyhow::Result<PgConnectionConfig> {
2190
let (host, port) =
22-
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
91+
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
2392
let port = port.unwrap_or(5432);
2493
let mut connstr = PgConnectionConfig::new_host_port(host, port)
25-
.extend_options([
26-
"-c".to_owned(),
27-
format!("timeline_id={}", timeline_id),
28-
format!("tenant_id={}", tenant_id),
29-
])
30-
.set_password(auth_token.map(|s| s.to_owned()));
94+
.extend_options(args.options())
95+
.set_password(args.auth_token.map(|s| s.to_owned()));
3196

32-
if let Some(availability_zone) = availability_zone {
97+
if let Some(availability_zone) = args.availability_zone {
3398
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
3499
}
35100

libs/wal_decoder/src/models.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,18 @@ pub struct InterpretedWalRecord {
6565
pub xid: TransactionId,
6666
}
6767

68+
impl InterpretedWalRecord {
69+
/// Checks if the WAL record is empty
70+
///
71+
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
72+
/// pageserver.
73+
pub fn is_empty(&self) -> bool {
74+
self.batch.is_empty()
75+
&& self.metadata_record.is_none()
76+
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
77+
}
78+
}
79+
6880
/// The interpreted part of the Postgres WAL record which requires metadata
6981
/// writes to the underlying storage engine.
7082
#[derive(Serialize, Deserialize)]

libs/wal_decoder/src/serialized_batch.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -496,11 +496,16 @@ impl SerializedValueBatch {
496496
}
497497
}
498498

499-
/// Checks if the batch is empty
500-
///
501-
/// A batch is empty when it contains no serialized values.
502-
/// Note that it may still contain observed values.
499+
/// Checks if the batch contains any serialized or observed values
503500
pub fn is_empty(&self) -> bool {
501+
!self.has_data() && self.metadata.is_empty()
502+
}
503+
504+
/// Checks if the batch contains data
505+
///
506+
/// Note that if this returns false, it may still contain observed values or
507+
/// a metadata record.
508+
pub fn has_data(&self) -> bool {
504509
let empty = self.raw.is_empty();
505510

506511
if cfg!(debug_assertions) && empty {
@@ -510,7 +515,7 @@ impl SerializedValueBatch {
510515
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
511516
}
512517

513-
empty
518+
!empty
514519
}
515520

516521
/// Returns the number of values serialized in the batch

pageserver/src/bin/pageserver.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
126126
// after setting up logging, log the effective IO engine choice and read path implementations
127127
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
128128
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
129+
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
129130

130131
// The tenants directory contains all the pageserver local disk state.
131132
// Create if not exists and make sure all the contents are durable before proceeding.

pageserver/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
1414
use std::env;
1515
use storage_broker::Uri;
1616
use utils::logging::SecretString;
17+
use utils::postgres_client::PostgresClientProtocol;
1718

1819
use once_cell::sync::OnceCell;
1920
use reqwest::Url;
@@ -190,6 +191,8 @@ pub struct PageServerConf {
190191
/// Maximum amount of time for which a get page request request
191192
/// might be held up for request merging.
192193
pub server_side_batch_timeout: Option<Duration>,
194+
195+
pub wal_receiver_protocol: PostgresClientProtocol,
193196
}
194197

195198
/// Token for authentication to safekeepers
@@ -350,6 +353,7 @@ impl PageServerConf {
350353
server_side_batch_timeout,
351354
tenant_config,
352355
no_sync,
356+
wal_receiver_protocol,
353357
} = config_toml;
354358

355359
let mut conf = PageServerConf {
@@ -393,6 +397,7 @@ impl PageServerConf {
393397
import_pgdata_upcall_api,
394398
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
395399
import_pgdata_aws_endpoint_url,
400+
wal_receiver_protocol,
396401

397402
// ------------------------------------------------------------
398403
// fields that require additional validation or custom handling

pageserver/src/pgdatadir_mapping.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,10 +1229,9 @@ impl<'a> DatadirModification<'a> {
12291229
}
12301230

12311231
pub(crate) fn has_dirty_data(&self) -> bool {
1232-
!self
1233-
.pending_data_batch
1232+
self.pending_data_batch
12341233
.as_ref()
1235-
.map_or(true, |b| b.is_empty())
1234+
.map_or(false, |b| b.has_data())
12361235
}
12371236

12381237
/// Set the current lsn
@@ -1408,7 +1407,7 @@ impl<'a> DatadirModification<'a> {
14081407
Some(pending_batch) => {
14091408
pending_batch.extend(batch);
14101409
}
1411-
None if !batch.is_empty() => {
1410+
None if batch.has_data() => {
14121411
self.pending_data_batch = Some(batch);
14131412
}
14141413
None => {

0 commit comments

Comments
 (0)