|
| 1 | +use std::sync::Arc; |
| 2 | + |
| 3 | +use datafusion::arrow::array::{ |
| 4 | + ArrayRef, BooleanArray, Int32Array, ListBuilder, RecordBatch, StringArray, StringBuilder, |
| 5 | + TimestampMicrosecondBuilder, |
| 6 | +}; |
| 7 | +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; |
| 8 | +use datafusion::error::Result; |
| 9 | +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; |
| 10 | +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; |
| 11 | +use datafusion::physical_plan::streaming::PartitionStream; |
| 12 | + |
| 13 | +use crate::auth::AuthManager; |
| 14 | + |
| 15 | +#[derive(Debug, Clone)] |
| 16 | +pub(crate) struct PgRolesTable { |
| 17 | + schema: SchemaRef, |
| 18 | + auth_manager: Arc<AuthManager>, |
| 19 | +} |
| 20 | + |
| 21 | +impl PgRolesTable { |
| 22 | + pub(crate) fn new(auth_manager: Arc<AuthManager>) -> Self { |
| 23 | + let schema = Arc::new(Schema::new(vec![ |
| 24 | + Field::new("rolname", DataType::Utf8, true), |
| 25 | + Field::new("rolsuper", DataType::Boolean, true), |
| 26 | + Field::new("rolinherit", DataType::Boolean, true), |
| 27 | + Field::new("rolcreaterole", DataType::Boolean, true), |
| 28 | + Field::new("rolcreatedb", DataType::Boolean, true), |
| 29 | + Field::new("rolcanlogin", DataType::Boolean, true), |
| 30 | + Field::new("rolreplication", DataType::Boolean, true), |
| 31 | + Field::new("rolconnlimit", DataType::Int32, true), |
| 32 | + Field::new("rolpassword", DataType::Utf8, true), |
| 33 | + Field::new( |
| 34 | + "rolvaliduntil", |
| 35 | + DataType::Timestamp( |
| 36 | + datafusion::arrow::datatypes::TimeUnit::Microsecond, |
| 37 | + Some(Arc::from("UTC")), |
| 38 | + ), |
| 39 | + true, |
| 40 | + ), |
| 41 | + Field::new("rolbypassrls", DataType::Boolean, true), |
| 42 | + Field::new( |
| 43 | + "rolconfig", |
| 44 | + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), |
| 45 | + true, |
| 46 | + ), |
| 47 | + Field::new("oid", DataType::Int32, true), |
| 48 | + ])); |
| 49 | + |
| 50 | + Self { |
| 51 | + schema, |
| 52 | + auth_manager, |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + async fn get_data(this: Self) -> Result<RecordBatch> { |
| 57 | + let mut rolname = Vec::new(); |
| 58 | + let mut rolsuper = Vec::new(); |
| 59 | + let mut rolinherit = Vec::new(); |
| 60 | + let mut rolcreaterole = Vec::new(); |
| 61 | + let mut rolcreatedb = Vec::new(); |
| 62 | + let mut rolcanlogin = Vec::new(); |
| 63 | + let mut rolreplication = Vec::new(); |
| 64 | + let mut rolconnlimit = Vec::new(); |
| 65 | + let mut rolpassword: Vec<Option<String>> = Vec::new(); |
| 66 | + let mut rolvaliduntil: Vec<Option<i64>> = Vec::new(); |
| 67 | + let mut rolbypassrls = Vec::new(); |
| 68 | + let mut rolconfig: Vec<Option<Vec<String>>> = Vec::new(); |
| 69 | + let mut oid: Vec<i32> = Vec::new(); |
| 70 | + |
| 71 | + for role_name in &this.auth_manager.list_roles().await { |
| 72 | + let role = &this.auth_manager.get_role(role_name).await.unwrap(); |
| 73 | + rolname.push(role.name.clone()); |
| 74 | + rolsuper.push(role.is_superuser); |
| 75 | + rolinherit.push(true); |
| 76 | + rolcreaterole.push(role.can_create_role); |
| 77 | + rolcreatedb.push(role.can_create_db); |
| 78 | + rolcanlogin.push(role.can_login); |
| 79 | + rolreplication.push(role.can_replication); |
| 80 | + rolconnlimit.push(-1); |
| 81 | + rolpassword.push(None); |
| 82 | + rolvaliduntil.push(None); |
| 83 | + rolbypassrls.push(None); |
| 84 | + rolconfig.push(None); |
| 85 | + oid.push(0); // TODO: handle oid properly somehow |
| 86 | + } |
| 87 | + |
| 88 | + let arrays: Vec<ArrayRef> = vec![ |
| 89 | + Arc::new(StringArray::from(rolname)), |
| 90 | + Arc::new(BooleanArray::from(rolsuper)), |
| 91 | + Arc::new(BooleanArray::from(rolinherit)), |
| 92 | + Arc::new(BooleanArray::from(rolcreaterole)), |
| 93 | + Arc::new(BooleanArray::from(rolcreatedb)), |
| 94 | + Arc::new(BooleanArray::from(rolcanlogin)), |
| 95 | + Arc::new(BooleanArray::from(rolreplication)), |
| 96 | + Arc::new(Int32Array::from(rolconnlimit)), |
| 97 | + Arc::new(StringArray::from(rolpassword)), |
| 98 | + Arc::new({ |
| 99 | + let mut builder = |
| 100 | + TimestampMicrosecondBuilder::with_capacity(rolconfig.len()).with_data_type( |
| 101 | + DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))), |
| 102 | + ); |
| 103 | + for field in &rolvaliduntil { |
| 104 | + builder.append_option(field.as_ref().copied()); |
| 105 | + } |
| 106 | + builder.finish() |
| 107 | + }), |
| 108 | + Arc::new(BooleanArray::from(rolbypassrls)), |
| 109 | + Arc::new({ |
| 110 | + let mut builder = ListBuilder::new(StringBuilder::new()); |
| 111 | + for field in &rolconfig { |
| 112 | + match field { |
| 113 | + Some(values) => { |
| 114 | + for s in values { |
| 115 | + builder.values().append_value(s); |
| 116 | + } |
| 117 | + builder.append(true); |
| 118 | + } |
| 119 | + None => builder.append(false), |
| 120 | + } |
| 121 | + } |
| 122 | + builder.finish() |
| 123 | + }), |
| 124 | + Arc::new(Int32Array::from(oid)), |
| 125 | + ]; |
| 126 | + |
| 127 | + Ok(RecordBatch::try_new(this.schema.clone(), arrays)?) |
| 128 | + } |
| 129 | +} |
| 130 | + |
| 131 | +impl PartitionStream for PgRolesTable { |
| 132 | + fn schema(&self) -> &SchemaRef { |
| 133 | + &self.schema |
| 134 | + } |
| 135 | + |
| 136 | + fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { |
| 137 | + let this = self.clone(); |
| 138 | + Box::pin(RecordBatchStreamAdapter::new( |
| 139 | + this.schema.clone(), |
| 140 | + futures::stream::once(async move { PgRolesTable::get_data(this).await }), |
| 141 | + )) |
| 142 | + } |
| 143 | +} |
0 commit comments