Skip to content

Commit bf3af88

Browse files
Simplify COPY command
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent e8b2e3c commit bf3af88

File tree

1 file changed

+6
-74
lines changed

1 file changed

+6
-74
lines changed

etl/src/replication/client.rs

Lines changed: 6 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -814,31 +814,12 @@ impl PgReplicationClient {
814814
.collect::<Vec<_>>()
815815
.join(", ");
816816

817-
let copy_query = if self.is_partitioned_table(table_id).await?
818-
&& let leaf_partitions = self.get_leaf_partition_ids(table_id).await?
819-
&& !leaf_partitions.is_empty()
820-
{
821-
let mut selects = Vec::with_capacity(leaf_partitions.len());
822-
for child_id in leaf_partitions {
823-
let child_name = self.get_table_name(child_id).await?;
824-
let select = format!(
825-
"select {} from {}",
826-
column_list,
827-
child_name.as_quoted_identifier()
828-
);
829-
selects.push(select);
830-
}
831-
832-
let union_query = selects.join(" union all ");
833-
format!(r#"copy ({union_query}) to stdout with (format text);"#)
834-
} else {
835-
let table_name = self.get_table_name(table_id).await?;
836-
format!(
837-
r#"copy {} ({}) to stdout with (format text);"#,
838-
table_name.as_quoted_identifier(),
839-
column_list
840-
)
841-
};
817+
let table_name = self.get_table_name(table_id).await?;
818+
let copy_query = format!(
819+
r#"copy (select {} from {}) to stdout with (format text);"#,
820+
column_list,
821+
table_name.as_quoted_identifier()
822+
);
842823

843824
// TODO: allow passing in format binary or text
844825
let stream = self.client.copy_out_simple(&copy_query).await?;
@@ -876,53 +857,4 @@ impl PgReplicationClient {
876857
)
877858
})
878859
}
879-
880-
/// Returns true if the given table id refers to a partitioned table (relkind = 'p').
881-
async fn is_partitioned_table(&self, table_id: TableId) -> EtlResult<bool> {
882-
let query = format!("select c.relkind from pg_class c where c.oid = {table_id}");
883-
884-
for msg in self.client.simple_query(&query).await? {
885-
if let SimpleQueryMessage::Row(row) = msg {
886-
let relkind = Self::get_row_value::<String>(&row, "relkind", "pg_class").await?;
887-
return Ok(relkind == "p");
888-
}
889-
}
890-
891-
bail!(
892-
ErrorKind::SourceSchemaError,
893-
"Table not found",
894-
format!("Table not found in database (table id: {})", table_id)
895-
);
896-
}
897-
898-
/// Returns all leaf partition OIDs for a partitioned table.
899-
async fn get_leaf_partition_ids(&self, parent_id: TableId) -> EtlResult<Vec<TableId>> {
900-
let query = format!(
901-
r#"
902-
with recursive parts(relid) as (
903-
select i.inhrelid
904-
from pg_inherits i
905-
where i.inhparent = {parent_id}
906-
union all
907-
select i.inhrelid
908-
from pg_inherits i
909-
join parts p on p.relid = i.inhparent
910-
)
911-
select p.relid as oid
912-
from parts p
913-
left join pg_inherits i on i.inhparent = p.relid
914-
where i.inhrelid is null
915-
"#
916-
);
917-
918-
let mut ids = Vec::new();
919-
for msg in self.client.simple_query(&query).await? {
920-
if let SimpleQueryMessage::Row(row) = msg {
921-
let id = Self::get_row_value::<TableId>(&row, "oid", "pg_inherits").await?;
922-
ids.push(id);
923-
}
924-
}
925-
926-
Ok(ids)
927-
}
928860
}

0 commit comments

Comments
 (0)