Demo ứng dụng đơn giản sử dụng Spark Streaming (DStreams) để đếm tần suất các từ trong luồng dữ liệu văn bản thời gian thực. Dự án này cung cấp các phiên bản:
word_count.py: Đếm tất cả các từ (cho Docker)word_count_windows.py: Đếm tất cả các từ (cho Windows)word_count_filtered.py: Đếm các từ có độ dài > 4 ký tựword_count_number.py: Đếm các từ có định dạng số (Number Formats)word_count_top5.py: In top 5 những từ xuất hiện nhiều lần nhất
- Windows 10/11
- Java 21
- Python 3.11
- Apache Spark
- Netcat cho Windows
- Hadoop winutils
- Windows 10/11
- Docker Desktop for Windows (bao gồm Docker Compose)
- Terminal/Command Prompt hoặc PowerShell
Nếu đã cài đặt Java 21, hãy kiểm tra biến môi trường:
java -version
Đảm bảo biến môi trường JAVA_HOME đã được thiết lập:
JAVA_HOME=C:\Program Files\Java\jdk-21
PATH=%JAVA_HOME%\bin;%PATH%
-
Tải Apache Spark từ https://spark.apache.org/downloads.html
Lưu ý quan trọng: Chọn đúng phiên bản prebuilt để phù hợp với winutils:
- Spark prebuilt for Apache Hadoop 2.7: Tương thích với winutils
hadoop-2.7.1 - Spark prebuilt for Apache Hadoop 3.2 and later: Tương thích với winutils
hadoop-3.0.0trở lên
- Spark prebuilt for Apache Hadoop 2.7: Tương thích với winutils
-
Giải nén vào
C:\spark -
Thiết lập biến môi trường:
SPARK_HOME=C:\spark
PATH=%SPARK_HOME%\bin;%PATH%
Cài đặt winutils phù hợp với phiên bản Spark prebuilt đã chọn ở Bước 2:
-
Tải
winutils.exetừ https://github.com/cdarlint/winutils- Chọn thư mục phiên bản Hadoop tương ứng với Spark prebuilt
- Tải file
winutils.exetừ thư mụcbin/
-
Tạo thư mục
C:\hadoop\bin -
Copy
winutils.exevàoC:\hadoop\bin -
Thiết lập biến môi trường:
HADOOP_HOME=C:\hadoop
PATH=%HADOOP_HOME%\bin;%PATH%
Lưu ý: Sử dụng đúng phiên bản winutils để tránh lỗi khi chạy Spark trên Windows.
Sử dụng ncat (từ Nmap):
- Tải Nmap từ https://nmap.org/download.html
- Cài đặt Nmap (bao gồm ncat)
Nếu đã có Python 3.11, chỉ cần cài đặt PySpark:
pip install pyspark==3.3.0
Thiết lập biến môi trường để PySpark sử dụng Python 3.11 (tương thích với PySpark 3.3.0):
PYSPARK_PYTHON=C:\Python311\python.exe
PYSPARK_DRIVER_PYTHON=C:\Python311\python.exe
ncat -lk 9999
Có hai lựa chọn:
Tùy chọn 1: Đếm tất cả các từ
cd spark-streaming-word-count-demo/Windows
spark-submit word_count_windows.py
Tùy chọn 2: Đếm chỉ các từ có độ dài > 4 ký tự
cd spark-streaming-word-count-demo/Windows
spark-submit word_count_filtered.py
Tùy chọn 3: Đếm chỉ các từ có định dạng số
cd spark-streaming-word-count-demo/Windows
spark-submit word_count_number.py
Tùy chọn 4: In top 5 những từ xuất hiện nhiều lần nhất
cd spark-streaming-word-count-demo/Windows
spark-submit word_count_top5.py
Quay lại terminal netcat (Terminal 1), gõ một vài câu và nhấn Enter:
hello spark hello world
spark streaming is fun
hello world again
Quan sát terminal spark-submit (Terminal 2). Sau mỗi 2 giây (batch interval), kết quả đếm từ sẽ được in ra màn hình.
Lưu ý về kết quả:
- Nếu sử dụng
word_count_windows.py: Tất cả các từ sẽ được đếm - Nếu sử dụng
word_count_filtered.py: Chỉ những từ có độ dài > 4 ký tự sẽ được đếm (ví dụ: "hello", "spark", "streaming", "world", "again" nhưng không có "is") - Nếu sử dụng
word_count_number.py: Chỉ những từ có định dạng số sẽ được đếm (ví dụ: "123", "3.14", "42") - Nếu sử dụng
word_count_top5.py: Hiển thị 5 từ có tần suất xuất hiện nhiều nhất
Nhấn Ctrl+C để dừng cả hai terminal.
Mở terminal, điều hướng đến thư mục Docker trong dự án và chạy:
cd Docker
docker-compose up -d
Mở một terminal mới và chạy lệnh sau:
docker-compose exec spark-master spark-submit /app/word_count.py
Mở một terminal khác và kết nối tới netcat server:
docker attach netcat-server
Bây giờ có thể gõ dữ liệu trực tiếp để gửi tới Spark streaming:
hello spark hello world
spark streaming is fun
hello world again
Quan sát terminal spark-submit (Bước 2). Sau mỗi 2 giây (batch interval), kết quả đếm từ sẽ được in ra màn hình.
Lưu ý: Để thoát khỏi netcat server, nhấn Ctrl+P sau đó Ctrl+Q.
Khi hoàn thành demo, nhấn Ctrl+C để dừng docker-compose, sau đó chạy:
docker-compose down
spark-streaming-word-count-demo/
│
├── data/ # Thư mục chứa dữ liệu mẫu
│ └── sample_text.txt # File văn bản mẫu để kiểm thử
│
├── Docker/ # Cấu hình cho môi trường Docker
│ ├── docker-compose.yml # Định nghĩa services Docker (Spark, Netcat)
│ └── word_count.py # Script đếm từ cho Docker
│
├── Windows/ # Scripts cho Windows native
│ ├── word_count_windows.py # Đếm tất cả các từ
│ ├── word_count_filtered.py # Đếm các từ có độ dài > 4
│ ├── word_count_number.py # Đếm các từ dạng số
│ └── word_count_top5.py # Lấy top 5 từ xuất hiện nhiều nhất
│
├── README.md # Tài liệu hướng dẫn
└── requirements.txt # Dependencies Python cần thiết
Tất cả các script đều tuân theo cùng một luồng xử lý cơ bản:
- Khởi tạo Spark Context: Tạo SparkContext và StreamingContext
- Tạo DStream: Kết nối đến nguồn dữ liệu streaming (netcat trên port 9999)
- Xử lý dữ liệu: Áp dụng các phép biến đổi trên DStream (flatMap, filter, map, reduce)
- Tính toán và hiển thị kết quả: Đếm tần suất và in kết quả
- Khởi động streaming: Bắt đầu quá trình xử lý và đợi cho đến khi kết thúc
- Chức năng: Đếm tần suất xuất hiện của tất cả các từ trong dòng dữ liệu
- Đặc điểm: Mã nguồn cơ bản nhất, thực hiện word count truyền thống
- Biến đổi chính: flatMap → map → reduceByKey
- Chức năng: Đếm tần suất xuất hiện của các từ có độ dài > 4 ký tự
- Đặc điểm: Thêm bước lọc (filter) sau khi tách từ
- Biến đổi chính: flatMap → filter → map → reduceByKey
- Điểm quan trọng: Sử dụng lambda để lọc từ dựa trên độ dài (len(w) > 4)
- Chức năng: Đếm tần suất xuất hiện của các từ có định dạng số
- Đặc điểm: Sử dụng hàm trợ giúp is_number() để kiểm tra định dạng số
- Biến đổi chính: flatMap → filter → map → reduceByKey
- Điểm quan trọng: Sử dụng try-except để xác định liệu một chuỗi có thể chuyển thành số hay không
- Chức năng: In ra 5 từ xuất hiện nhiều nhất
- Đặc điểm: Sử dụng foreachRDD để xử lý RDD trong mỗi batch
- Biến đổi chính: flatMap → map → reduceByKey → foreachRDD
- Điểm quan trọng: Trong hàm process_rdd, thu thập kết quả về driver, sắp xếp và lấy top 5
- DStream (Discretized Stream): Trừu tượng hóa luồng dữ liệu liên tục thành các batch nhỏ
- Batch Interval: Khoảng thời gian giữa các lần xử lý (1-2 giây trong ví dụ)
- Transformations: Các phép biến đổi không trạng thái (stateless) như map, filter
- Output Operations: Các thao tác đầu ra như pprint(), foreachRDD()
- Driver và Worker: Driver quản lý quy trình, Worker thực hiện tính toán phân tán