Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* [Python] Add YAML Editor and Visualization Panel ([#35947](https://github.com/apache/beam/pull/35947)).

## I/Os

Expand Down Expand Up @@ -96,7 +97,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Licensed under the Apache License, Version 2.0 (the 'License'); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import json
from typing import Dict, List, TypedDict, Any
import dataclasses
from dataclasses import dataclass

from apache_beam.yaml.main import build_pipeline_components_from_yaml
import apache_beam as beam
import yaml

# ======================== Type Definitions ========================


@dataclass
class NodeData:
id: str
label: str
type: str = ""

def __post_init__(self):
# Ensure ID is not empty
if not self.id:
raise ValueError("Node ID cannot be empty")


@dataclass
class EdgeData:
source: str
target: str
label: str = ""

def __post_init__(self):
if not self.source or not self.target:
raise ValueError("Edge source and target cannot be empty")


class FlowGraph(TypedDict):
nodes: List[Dict[str, Any]]
edges: List[Dict[str, Any]]


# ======================== Main Function ========================


def parse_beam_yaml(yaml_str: str, isDryRunMode: bool = False) -> str:
"""
Parse Beam YAML and convert to flow graph data structure

Args:
yaml_str: Input YAML string

Returns:
Standardized response format:
- Success: {'status': 'success', 'data': {...}, 'error': None}
- Failure: {'status': 'error', 'data': None, 'error': 'message'}
"""
# Phase 1: YAML Parsing
try:
parsed_yaml = yaml.safe_load(yaml_str)
if not parsed_yaml or 'pipeline' not in parsed_yaml:
return build_error_response(
"Invalid YAML structure: missing 'pipeline' section")
except yaml.YAMLError as e:
return build_error_response(f"YAML parsing error: {str(e)}")

# Phase 2: Pipeline Validation
try:
options, constructor = build_pipeline_components_from_yaml(
yaml_str,
[],
validate_schema='per_transform'
)
if isDryRunMode:
with beam.Pipeline(options=options) as p:
constructor(p)
except Exception as e:
return build_error_response(f"Pipeline validation failed: {str(e)}")

# Phase 3: Graph Construction
try:
pipeline = parsed_yaml['pipeline']
transforms = pipeline.get('transforms', [])

nodes: List[NodeData] = []
edges: List[EdgeData] = []

nodes.append(NodeData(id='0', label='Input', type='input'))
nodes.append(NodeData(id='1', label='Output', type='output'))

# Process transform nodes
for idx, transform in enumerate(transforms):
if not isinstance(transform, dict):
continue

payload = {k: v for k, v in transform.items() if k not in {"type"}}

node_id = f"t{idx}"
node_data = NodeData(
id=node_id,
label=transform.get('type', 'unnamed'),
type='default',
**payload)
nodes.append(node_data)

# Create connections between nodes
if idx > 0:
edges.append(
EdgeData(source=f"t{idx-1}", target=node_id, label='chain'))

if transforms:
edges.append(EdgeData(source='0', target='t0', label='start'))
edges.append(EdgeData(source=node_id, target='1', label='stop'))

def to_dict(node):
if hasattr(node, '__dataclass_fields__'):
return dataclasses.asdict(node)
return node

nodes_serializable = [to_dict(n) for n in nodes]

return build_success_response(
nodes=nodes_serializable, edges=[dataclasses.asdict(e) for e in edges])

except Exception as e:
return build_error_response(f"Graph construction failed: {str(e)}")


# ======================== Utility Functions ========================


def build_success_response(
nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> str:
"""Build success response"""
return json.dumps({'data': {'nodes': nodes, 'edges': edges}, 'error': None})


def build_error_response(error_msg: str) -> str:
"""Build error response"""
return json.dumps({'data': None, 'error': error_msg})


if __name__ == "__main__":
# Example usage
example_yaml = """
pipeline:
transforms:
- type: ReadFromCsv
name: A
config:
path: /path/to/input*.csv
- type: WriteToJson
name: B
config:
path: /path/to/output.json
input: ReadFromCsv
- type: Join
input: [A, B]
"""

response = parse_beam_yaml(example_yaml, isDryRunMode=False)
print(response)
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,37 @@
"@jupyterlab/launcher": "^4.3.6",
"@jupyterlab/mainmenu": "^4.3.6",
"@lumino/widgets": "^2.2.1",
"@monaco-editor/react": "^4.7.0",
"@rmwc/base": "^14.0.0",
"@rmwc/button": "^8.0.6",
"@rmwc/card": "^14.3.5",
"@rmwc/data-table": "^8.0.6",
"@rmwc/dialog": "^8.0.6",
"@rmwc/drawer": "^8.0.6",
"@rmwc/fab": "^8.0.6",
"@rmwc/grid": "^14.3.5",
"@rmwc/list": "^8.0.6",
"@rmwc/ripple": "^14.0.0",
"@rmwc/textfield": "^8.0.6",
"@rmwc/tooltip": "^8.0.6",
"@rmwc/top-app-bar": "^8.0.6",
"@rmwc/touch-target": "^14.3.5",
"@xyflow/react": "^12.8.2",
"dagre": "^0.8.5",
"lodash": "^4.17.21",
"material-design-icons": "^3.0.1",
"react": "^18.2.0",
"react-dom": "^18.2.0"
"react-dom": "^18.2.0",
"react-split": "^2.0.14"
},
"devDependencies": {
"@jupyterlab/builder": "^4.3.6",
"@testing-library/dom": "^9.3.0",
"@testing-library/jest-dom": "^6.1.4",
"@testing-library/react": "^14.0.0",
"@types/dagre": "^0.7.53",
"@types/jest": "^29.5.14",
"@types/lodash": "^4.17.20",
"@types/react": "^18.2.0",
"@types/react-dom": "^18.2.0",
"@typescript-eslint/eslint-plugin": "^7.3.1",
Expand Down Expand Up @@ -97,5 +107,6 @@
"test": "jest",
"resolutions": {
"@types/react": "^18.2.0"
}
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ export class SidePanel extends BoxPanel {
const sessionModelItr = manager.sessions.running();
const firstModel = sessionModelItr.next();
let onlyOneUniqueKernelExists = true;
if (firstModel === undefined) {
// There is zero unique running kernel.

if (firstModel.done) {
// No Running kernel
onlyOneUniqueKernelExists = false;
} else {
// firstModel.value is the first session
let sessionModel = sessionModelItr.next();
while (sessionModel !== undefined) {

while (!sessionModel.done) {
// Check if there is more than one unique kernel
if (sessionModel.value.kernel.id !== firstModel.value.kernel.id) {
// There is more than one unique running kernel.
onlyOneUniqueKernelExists = false;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import { SidePanel } from './SidePanel';
import {
InteractiveInspectorWidget
} from './inspector/InteractiveInspectorWidget';
import { YamlWidget } from './yaml/YamlWidget';

namespace CommandIDs {
export const open_inspector =
'apache-beam-jupyterlab-sidepanel:open_inspector';
export const open_clusters_panel =
'apache-beam-jupyterlab-sidepanel:open_clusters_panel';
export const open_yaml_editor =
'apache-beam-jupyterlab-sidepanel:open_yaml_editor';
}

/**
Expand Down Expand Up @@ -67,6 +70,7 @@ function activate(
const category = 'Interactive Beam';
const inspectorCommandLabel = 'Open Inspector';
const clustersCommandLabel = 'Manage Clusters';
const yamlCommandLabel = 'Edit YAML Pipeline';
const { commands, shell, serviceManager } = app;

async function createInspectorPanel(): Promise<SidePanel> {
Expand Down Expand Up @@ -105,6 +109,24 @@ function activate(
return panel;
}

async function createYamlPanel(): Promise<SidePanel> {
const sessionContext = new SessionContext({
sessionManager: serviceManager.sessions,
specsManager: serviceManager.kernelspecs,
name: 'Interactive Beam YAML Session'
});
const yamlEditor = new YamlWidget(sessionContext);
const panel = new SidePanel(
serviceManager,
rendermime,
sessionContext,
'Interactive Beam YAML Editor',
yamlEditor
);
activatePanel(panel);
return panel;
}

function activatePanel(panel: SidePanel): void {
shell.add(panel, 'main');
shell.activateById(panel.id);
Expand All @@ -122,6 +144,12 @@ function activate(
execute: createClustersPanel
});

// The open_yaml_editor command is also used by the below entry points.
commands.addCommand(CommandIDs.open_yaml_editor, {
label: yamlCommandLabel,
execute: createYamlPanel
});

// Entry point in launcher.
if (launcher) {
launcher.add({
Expand All @@ -132,6 +160,10 @@ function activate(
command: CommandIDs.open_clusters_panel,
category: category
});
launcher.add({
command: CommandIDs.open_yaml_editor,
category: category
});
}

// Entry point in top menu.
Expand All @@ -140,10 +172,11 @@ function activate(
mainMenu.addMenu(menu);
menu.addItem({ command: CommandIDs.open_inspector });
menu.addItem({ command: CommandIDs.open_clusters_panel });
menu.addItem({ command: CommandIDs.open_yaml_editor });

// Entry point in commands palette.
palette.addItem({ command: CommandIDs.open_inspector, category });
palette.addItem({ command: CommandIDs.open_clusters_panel, category });
palette.addItem({ command: CommandIDs.open_yaml_editor, category });
}

export default extension;
Loading
Loading