Skip to content

Commit f2516ae

Browse files
authored
Merge pull request #27 from kush124k/adding-blackbox
Feat: Generic Parquet Recorder Node (dora-parquet-recorder)
2 parents 58d380b + 8d01e58 commit f2516ae

File tree

6 files changed

+288
-0
lines changed

6 files changed

+288
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# dora-dataset-record
2+
3+
Node for recording robot datasets in LeRobot format. You can captures synchronized camera feeds and robot poses to create high-quality datasets for imitation learning and robot training.
4+
5+
- **Robot pose recording** - Capture both state and action data
6+
- **Multi-camera support** - Record from multiple cameras simultaneously
7+
- **LeRobot dataset format (v2.1)** - Direct integration with HuggingFace LeRobot datasets
8+
- **Episode management** - Automatic episode segmentation with reset phases
9+
10+
## Quick Start
11+
12+
### 1. Installation
13+
14+
```bash
15+
# Source your venv
16+
cd dora/node-hub/dora-dataset-record
17+
uv pip install -e .
18+
```
19+
20+
### 2. Usage Guide
21+
22+
Create a dataflow file, see `examples/lerobot-dataset-record/dataset_record.yml`:
23+
24+
```yaml
25+
nodes:
26+
# Dataset recorder
27+
- id: dataset_recorder
28+
build: pip install -e ../../dora-dataset-record
29+
path: dora-dataset-record
30+
inputs:
31+
laptop: laptop_cam/image
32+
front: front_cam/image
33+
robot_state: robot_follower/pose
34+
robot_action: leader_interface/pose
35+
outputs:
36+
- text
37+
env:
38+
# Required settings
39+
REPO_ID: "your_username/your_dataset_name"
40+
SINGLE_TASK: "Pick up the cube and place it in the box"
41+
ROBOT_TYPE: "your_robot_type"
42+
43+
# Recording settings
44+
FPS: "30"
45+
TOTAL_EPISODES: "50"
46+
EPISODE_DURATION_S: "60"
47+
RESET_DURATION_S: "15"
48+
49+
# Camera configuration
50+
CAMERA_NAMES: "laptop,front"
51+
CAMERA_LAPTOP_RESOLUTION: "480,640,3"
52+
CAMERA_FRONT_RESOLUTION: "480,640,3"
53+
54+
# Robot configuration
55+
ROBOT_JOINTS: "joint1,joint2,joint3,joint4,joint5,gripper"
56+
57+
# Optional settings
58+
USE_VIDEOS: "true"
59+
SAVE_AVIF_FRAMES: "true" # This will additionally save frames
60+
PUSH_TO_HUB: "false"
61+
PRIVATE: "false"
62+
TAGS: "robotics,manipulation,imitation_learning"
63+
64+
# Visualization with rerun
65+
- id: plot
66+
build: pip install dora-rerun
67+
path: dora-rerun
68+
inputs:
69+
text: dataset_recorder/text
70+
```
71+
72+
### 3. Start Recording the dataset
73+
74+
```bash
75+
dora build dataset_record.yml
76+
dora run dataset_record.yml
77+
```
78+
79+
The node will send instructions on dora-rerun, about episode starting, reset time, Saving episodes etc.
80+
81+
## Configuration
82+
83+
### Required Environment Variables
84+
85+
| Variable | Description | Example |
86+
| --------------------- | ---------------------------- | -------------------------- |
87+
| `REPO_ID` | HuggingFace dataset repo | `"username/dataset_name"` |
88+
| `SINGLE_TASK` | Task description | `"Pick and place objects"` |
89+
| `CAMERA_NAMES` | Comma-separated camera names | `"laptop,front,top"` |
90+
| `CAMERA_*_RESOLUTION` | Resolution for each camera | `"480,640,3"` |
91+
| `ROBOT_JOINTS` | Comma-separated joint names | `"joint1,joint2,gripper"` |
92+
93+
### Optional Settings
94+
95+
| Variable | Default | Description |
96+
| -------------------- | ------------------------------------------- | ----------------------------------------------------- |
97+
| `FPS` | `30` | Recording frame rate (match camera fps) |
98+
| `TOTAL_EPISODES` | `10` | Number of episodes to record |
99+
| `EPISODE_DURATION_S` | `60` | Episode length in seconds |
100+
| `RESET_DURATION_S` | `15` | Break between episodes to reset the environment |
101+
| `USE_VIDEOS` | `true` | Encode as MP4 videos, else saves images |
102+
| `PUSH_TO_HUB` | `false` | Upload to HuggingFace Hub |
103+
| `PRIVATE` | `false` | Make dataset private |
104+
| `ROOT_PATH` | `~/.cache/huggingface/lerobot/your_repo_id` | Local storage path where you want to save the dataset |
105+
106+
## License
107+
108+
This project is released under the MIT License.

node-hub/dora-parquet-recorder/dora_parquet_recorder/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .main import main
2+
3+
if __name__ == "__main__":
4+
main()
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""
2+
High-Performance Batched Parquet Recorder
3+
"""
4+
5+
import os
6+
import queue
7+
import threading
8+
import json
9+
import pyarrow as pa
10+
import pyarrow.parquet as pq
11+
from dora import Node
12+
from datetime import datetime
13+
from typing import Any
14+
15+
# CONFIGURATION
16+
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "30"))
17+
LOG_DIR = os.getenv("LOG_DIR", "data_logs")
18+
19+
class DoraParquetRecorder:
20+
def __init__(self):
21+
self.write_queue = queue.Queue()
22+
self.writers = {}
23+
self.shutdown_flag = False
24+
25+
os.makedirs(LOG_DIR, exist_ok=True)
26+
27+
# Start the background writer
28+
self.writer_thread = threading.Thread(target=self._writer_loop, daemon=True)
29+
self.writer_thread.start()
30+
print(f"[Recorder] Online. Batch Size: {BATCH_SIZE}", flush=True)
31+
32+
def _writer_loop(self):
33+
"""
34+
Collects small tables and writes them in big chunks.
35+
"""
36+
# Buffer to hold tables for each input_id: { "cam_feed": [table1, table2...] }
37+
buffers = {}
38+
39+
while not self.shutdown_flag or not self.write_queue.empty():
40+
try:
41+
# 1. Get data (Wait up to 0.1s so we can check shutdown flag often)
42+
data = self.write_queue.get(timeout=0.1)
43+
input_id, table = data
44+
45+
# 2. Add to local buffer
46+
if input_id not in buffers:
47+
buffers[input_id] = []
48+
buffers[input_id].append(table)
49+
50+
# 3. Check if bucket is full
51+
if len(buffers[input_id]) >= BATCH_SIZE:
52+
self._flush_buffer(input_id, buffers[input_id])
53+
buffers[input_id] = [] # Empty the bucket
54+
55+
except queue.Empty:
56+
continue
57+
except Exception as e:
58+
print(f"[Recorder] Write error: {e}", flush=True)
59+
60+
# FINAL CLEANUP: Flush whatever is left in the buckets
61+
print("[Recorder] Flushing remaining data...", flush=True)
62+
for input_id, buf in buffers.items():
63+
if buf:
64+
self._flush_buffer(input_id, buf)
65+
66+
# Close files
67+
for w in self.writers.values():
68+
w.close()
69+
70+
def _flush_buffer(self, input_id, table_list):
71+
"""Merges small tables into one big table and writes it."""
72+
try:
73+
if not table_list:
74+
return
75+
76+
# Combine 30 small tables into 1 big table (Very fast)
77+
batch_table = pa.concat_tables(table_list)
78+
79+
# Create writer if it doesn't exist
80+
if input_id not in self.writers:
81+
file_path = os.path.join(LOG_DIR, f"{input_id}.parquet")
82+
# 'compression=None' is faster for CPU, 'snappy' saves disk space
83+
self.writers[input_id] = pq.ParquetWriter(file_path, batch_table.schema, compression='NONE')
84+
print(f"[Recorder] Created log: {file_path}", flush=True)
85+
86+
# One single write for 30 frames!
87+
self.writers[input_id].write_table(batch_table)
88+
89+
except Exception as e:
90+
print(f"[Recorder] Flush failed: {e}", flush=True)
91+
92+
def handle_input(self, input_id: str, value: Any, metadata: Any):
93+
if self.shutdown_flag:
94+
return
95+
96+
try:
97+
# 1. Fast Metadata Serialize
98+
meta_json = json.dumps(metadata)
99+
100+
# 2. Fast Binary Copy (Zero-Copyish)
101+
# Try to get raw C-buffer bytes if possible
102+
if hasattr(value, "buffers"):
103+
try:
104+
data_blob = value.buffers()[1].to_pybytes()
105+
except:
106+
data_blob = value.to_string().encode('utf-8')
107+
else:
108+
# Fallback for strings/other types
109+
if not isinstance(value, (pa.Array, pa.ChunkedArray)):
110+
value = pa.array([value])
111+
data_blob = value.to_pylist()[0] # Fallback (slower but safe)
112+
113+
# 3. Queue it up
114+
timestamp = datetime.now().isoformat()
115+
116+
table = pa.Table.from_pydict({
117+
"timestamp": [timestamp],
118+
"data": [data_blob],
119+
"metadata": [meta_json]
120+
})
121+
122+
self.write_queue.put((input_id, table))
123+
124+
except Exception as e:
125+
print(f"[Recorder] Serialize error: {e}", flush=True)
126+
127+
def _shutdown(self):
128+
self.shutdown_flag = True
129+
if self.writer_thread.is_alive():
130+
self.writer_thread.join(timeout=5.0)
131+
132+
def main():
133+
node = Node()
134+
recorder = DoraParquetRecorder()
135+
136+
# --- HANDSHAKE ---
137+
print("[Recorder] Ready. Sending Signal...", flush=True)
138+
node.send_output("status", pa.array(["READY"]))
139+
# -----------------
140+
141+
for event in node:
142+
if event["type"] == "INPUT":
143+
recorder.handle_input(
144+
event["id"], event["value"], event.get("metadata", {})
145+
)
146+
elif event["type"] == "STOP":
147+
break
148+
149+
recorder._shutdown()
150+
151+
if __name__ == "__main__":
152+
main()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[build-system]
2+
requires = ["setuptools", "wheel"]
3+
build-backend = "setuptools.build_meta"
4+
5+
[project]
6+
name = "dora-parquet-recorder"
7+
version = "0.1.0"
8+
description = "A generic, zero-copy data recorder for dora."
9+
readme = "README.md"
10+
requires-python = ">=3.8"
11+
dependencies = [
12+
"dora-rs",
13+
"pyarrow",
14+
"pandas",
15+
"numpy",
16+
"opencv-python"
17+
]
18+
19+
[project.scripts]
20+
# This lets users run "dora-parquet-recorder" from command line
21+
dora-parquet-recorder = "dora_parquet_recorder.main:main"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
def test_import():
2+
from dora_parquet_recorder.main import DoraParquetRecorder
3+
assert DoraParquetRecorder is not None

0 commit comments

Comments
 (0)