Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/carnot/funcs/metadata/metadata_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void RegisterMetadataOpsOrDie(px::carnot::udf::Registry* registry) {
registry->RegisterOrDie<HasServiceNameUDF>("has_service_name");
registry->RegisterOrDie<HasValueUDF>("has_value");
registry->RegisterOrDie<IPToPodIDUDF>("ip_to_pod_id");
registry->RegisterOrDie<UPIDtoPodNameLocalAddrFallback>("_upid_to_podname_local_addr_fallback");
registry->RegisterOrDie<IPToPodIDAtTimeUDF>("ip_to_pod_id");
registry->RegisterOrDie<PodIDToPodNameUDF>("pod_id_to_pod_name");
registry->RegisterOrDie<PodIDToPodLabelsUDF>("pod_id_to_pod_labels");
Expand Down
32 changes: 32 additions & 0 deletions src/carnot/funcs/metadata/metadata_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,38 @@ class IPToPodIDUDF : public ScalarUDF {
static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_KELVIN; }
};

/**
* This UDF is a compiler internal function. It should only be used on local IP addresses
* since this function is forced to run on PEMs. In cases where the IP could be a remote address,
* then it is more correct to have the function run on Kelvin (IPToPodIDUDF or IPToPodIDAtTimeUDF).
*/
class UPIDtoPodNameLocalAddrFallback : public ScalarUDF {
public:
/**
* @brief Gets the pod name from UPID or from local addr if first lookup fails
*/
StringValue Exec(FunctionContext* ctx, UInt128Value upid_value, StringValue pod_ip,
Time64NSValue time) {
auto md = GetMetadataState(ctx);
auto pod_info = UPIDtoPod(md, upid_value);
if (pod_info == nullptr) {
auto pod_id = md->k8s_metadata_state().PodIDByIPAtTime(pod_ip, time.val);
pod_info = md->k8s_metadata_state().PodInfoByID(pod_id);
if (pod_info == nullptr) {
return "";
}
}
return absl::Substitute("$0/$1", pod_info->ns(), pod_info->name());
}

static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_PEM; }

static udf::InfRuleVec SemanticInferenceRules() {
return {udf::ExplicitRule::Create<UPIDtoPodNameLocalAddrFallback>(
types::ST_POD_NAME, {types::ST_NONE, types::ST_NONE, types::ST_NONE})};
}
};

class IPToPodIDAtTimeUDF : public ScalarUDF {
public:
/**
Expand Down
37 changes: 32 additions & 5 deletions src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ StatusOr<std::string> ConvertMetadataRule::FindKeyColumn(std::shared_ptr<TableTy
absl::StrJoin(parent_type->ColumnNames(), ","));
}

bool CheckBackupConversionAvailable(std::shared_ptr<TableType> parent_type,
const std::string& func_name) {
return parent_type->HasColumn("time_") && parent_type->HasColumn("local_addr") &&
func_name == "upid_to_pod_name";
}

StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
if (!Match(ir_node, Metadata())) {
return false;
Expand All @@ -85,17 +91,38 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
PX_ASSIGN_OR_RETURN(auto parent, metadata->ReferencedOperator());
PX_ASSIGN_OR_RETURN(auto containing_ops, metadata->ContainingOperators());

auto resolved_table_type = parent->resolved_table_type();
PX_ASSIGN_OR_RETURN(std::string key_column_name,
FindKeyColumn(parent->resolved_table_type(), md_property, ir_node));
FindKeyColumn(resolved_table_type, md_property, ir_node));

PX_ASSIGN_OR_RETURN(ColumnIR * key_column,
graph->CreateNode<ColumnIR>(ir_node->ast(), key_column_name, parent_op_idx));

PX_ASSIGN_OR_RETURN(std::string func_name, md_property->UDFName(key_column_name));
PX_ASSIGN_OR_RETURN(
FuncIR * conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
std::vector<ExpressionIR*>{key_column}));
auto backup_conversion_available = CheckBackupConversionAvailable(resolved_table_type, func_name);

FuncIR* conversion_func;

// TODO(ddelnano): Until the short lived process issue (gh#1638) is resolved, use a
// conversion function that uses local_addr for pod lookups when the upid based default
// (upid_to_pod_name) fails.
if (backup_conversion_available) {
func_name = "_upid_to_podname_local_addr_fallback";
PX_ASSIGN_OR_RETURN(ColumnIR * local_addr_column,
graph->CreateNode<ColumnIR>(ir_node->ast(), "local_addr", parent_op_idx));
PX_ASSIGN_OR_RETURN(ColumnIR * time_column,
graph->CreateNode<ColumnIR>(ir_node->ast(), "time_", parent_op_idx));
PX_ASSIGN_OR_RETURN(
conversion_func,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
std::vector<ExpressionIR*>{key_column, local_addr_column, time_column}));
} else {
PX_ASSIGN_OR_RETURN(
conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
std::vector<ExpressionIR*>{key_column}));
}
for (int64_t parent_id : graph->dag().ParentsOf(metadata->id())) {
// For each container node of the metadata expression, update it to point to the
// new conversion func instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using table_store::schema::Relation;

using ConvertMetadataRuleTest = RulesTest;

TEST_F(ConvertMetadataRuleTest, multichild) {
TEST_F(ConvertMetadataRuleTest, multichild_without_fallback_func) {
auto relation = Relation(cpu_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
Expand Down Expand Up @@ -114,6 +114,73 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) {
skip_check_stray_nodes_ = true;
}

TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) {
auto relation = Relation(cpu_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
relation.AddColumn(types::DataType::UINT128, conversion_column_str);
relation.AddColumn(types::DataType::STRING, "local_addr");
relation.AddColumn(types::DataType::TIME64NS, "time_");
compiler_state_->relation_map()->emplace("table", relation);

auto metadata_name = "pod_name";
MetadataProperty* property = md_handler->GetProperty(metadata_name).ValueOrDie();
MetadataIR* metadata_ir = MakeMetadataIR(metadata_name, /* parent_op_idx */ 0);
metadata_ir->set_property(property);

auto src = MakeMemSource(relation);
auto map1 = MakeMap(src, {{"md", metadata_ir}});
auto map2 = MakeMap(src, {{"other_col", MakeInt(2)}, {"md", metadata_ir}});
auto filter = MakeFilter(src, MakeEqualsFunc(metadata_ir, MakeString("pl/foobar")));

ResolveTypesRule type_rule(compiler_state_.get());
ASSERT_OK(type_rule.Execute(graph.get()));

ConvertMetadataRule rule(compiler_state_.get());
auto result = rule.Execute(graph.get());
ASSERT_OK(result);
EXPECT_TRUE(result.ValueOrDie());

EXPECT_EQ(0, graph->FindNodesThatMatch(Metadata()).size());

// Check the contents of the new func.
EXPECT_MATCH(filter->filter_expr(), Equals(Func(), String()));
auto converted_md = static_cast<FuncIR*>(filter->filter_expr())->all_args()[0];
EXPECT_MATCH(converted_md, Func());
auto converted_md_func = static_cast<FuncIR*>(converted_md);
EXPECT_EQ("_upid_to_podname_local_addr_fallback", converted_md_func->func_name());
EXPECT_EQ(3, converted_md_func->all_args().size());
auto upid_col = converted_md_func->all_args()[0];
auto local_addr_col = converted_md_func->all_args()[1];
auto time_col = converted_md_func->all_args()[2];
EXPECT_MATCH(upid_col, ColumnNode("upid"));
EXPECT_MATCH(local_addr_col, ColumnNode("local_addr"));
EXPECT_MATCH(time_col, ColumnNode("time_"));

EXPECT_MATCH(converted_md, ResolvedExpression());
EXPECT_MATCH(upid_col, ResolvedExpression());
EXPECT_MATCH(local_addr_col, ResolvedExpression());
EXPECT_MATCH(time_col, ResolvedExpression());
EXPECT_EQ(types::DataType::STRING, converted_md->EvaluatedDataType());
EXPECT_EQ(types::DataType::UINT128, upid_col->EvaluatedDataType());
EXPECT_EQ(types::DataType::STRING, local_addr_col->EvaluatedDataType());
EXPECT_EQ(types::DataType::TIME64NS, time_col->EvaluatedDataType());
EXPECT_EQ(ExpressionIR::Annotations(MetadataType::POD_NAME), converted_md->annotations());
EXPECT_EQ(1, converted_md_func->func_id());

// Check to make sure that all of the operators and expressions depending on the metadata
// now have an updated reference to the func.
EXPECT_EQ(converted_md, map1->col_exprs()[0].node);
EXPECT_EQ(converted_md, map2->col_exprs()[1].node);

// Check that the semantic type of the conversion func is propagated properly.
auto type_or_s = map2->resolved_table_type()->GetColumnType("md");
ASSERT_OK(type_or_s);
auto type = std::static_pointer_cast<ValueType>(type_or_s.ConsumeValueOrDie());
EXPECT_EQ(types::STRING, type->data_type());
EXPECT_EQ(types::ST_POD_NAME, type->semantic_type());
}

} // namespace compiler
} // namespace planner
} // namespace carnot
Expand Down
80 changes: 80 additions & 0 deletions src/carnot/planner/logical_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ class LogicalPlannerTest : public ::testing::Test {
*query_request.mutable_logical_planner_state() = state;
return query_request;
}
plannerpb::QueryRequest MakeQueryRequestWithExecArgs(
const distributedpb::LogicalPlannerState& state, const std::string& query,
const std::vector<std::string>& exec_funcs) {
plannerpb::QueryRequest query_request;
query_request.set_query_str(query);
*query_request.mutable_logical_planner_state() = state;
for (const auto& exec_func : exec_funcs) {
auto f = query_request.add_exec_funcs();
f->set_func_name(exec_func);
f->set_output_table_prefix(exec_func);
}
return query_request;
}
udfspb::UDFInfo info_;
};

Expand Down Expand Up @@ -770,6 +783,73 @@ TEST_F(LogicalPlannerTest, filter_pushdown_bug) {
ASSERT_OK(plan->ToProto());
}

const char kPodNameFallbackConversion[] = R"pxl(
import px

df = px.DataFrame(table='http_events', start_time='-6m')
df.pod = df.ctx['pod']

px.display(df)
)pxl";
TEST_F(LogicalPlannerTest, pod_name_fallback_conversion) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
ASSERT_OK_AND_ASSIGN(auto plan,
planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversion)));
ASSERT_OK(plan->ToProto());
}

const char kPodNameFallbackConversionWithFilter[] = R"pxl(
import px

df = px.DataFrame(table='http_events', start_time='-6m')
df[df.ctx['pod'] != ""]

px.display(df)
)pxl";
TEST_F(LogicalPlannerTest, pod_name_fallback_conversion_with_filter) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
ASSERT_OK_AND_ASSIGN(
auto plan, planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversionWithFilter)));
ASSERT_OK(plan->ToProto());
}

// Use a data table that doesn't contain local_addr to test df.ctx['pod'] conversion without
// the fallback conversion.
const char kPodNameConversionWithoutFallback[] = R"pxl(
import px

def cql_flow_graph():
df = px.DataFrame('cql_events', start_time='-5m')
df.pod = df.ctx['pod']

df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr))
df.is_ra_pod = df.ra_pod != ''
df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr)

df.is_server_tracing = df.trace_role == 2

df.source = px.select(df.is_server_tracing, df.ra_name, df.pod)
df.destination = px.select(df.is_server_tracing, df.pod, df.ra_name)

return df


def cql_summary_with_links():
df = cql_flow_graph()

return df
)pxl";
TEST_F(LogicalPlannerTest, pod_name_conversion_without_fallback) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
ASSERT_OK_AND_ASSIGN(auto plan, planner->Plan(MakeQueryRequestWithExecArgs(
state, kPodNameConversionWithoutFallback,
{"cql_flow_graph", "cql_summary_with_links"})));
ASSERT_OK(plan->ToProto());
}

const char kHttpDataScript[] = R"pxl(
import px

Expand Down
Loading
Loading