Skip to content

chiraag-kakar/lattice

Repository files navigation

Lattice

A production-grade data ingestion pipeline that scrapes HTML tables, infers schemas, and streams data through Kafka into relational databases.

What It Does

Lattice extracts tabular data from any URL, automatically detects column types, and persists it to MySQL or PostgreSQL. The pipeline is built for reliability with circuit breakers, retries, idempotency, and full observability.

Quick Start

# Start infrastructure
make infra-up

# Start consumer (terminal 1)
make run-consumer

# Start producer (terminal 2)
make run-producer

# Ingest a table
curl -X POST http://localhost:8081/ingest \
  -H "Content-Type: application/json" \
  -d '{"url":"https://en.wikipedia.org/wiki/List_of_countries_by_population_(United_Nations)"}'

# Check results
curl http://localhost:8082/tables

Prerequisites

Required

  • Go 1.25+ - Install
  • Docker - Install
  • Docker Compose - Usually bundled with Docker Desktop

Verify Installation

go version        # Should show 1.25 or higher
docker --version
docker-compose --version

Installation

# Clone repository
git clone https://github.com/chiraag-kakar/lattice.git
cd lattice

# Build binaries
make build

# Start infrastructure (Kafka, MySQL, PostgreSQL)
make infra-up

Wait 30 seconds for services to initialize.

Running Locally

Environment Setup

Copy the example environment file:

cp .env.example .env

Default configuration uses MySQL. To switch to PostgreSQL:

# Edit .env
LATTICE_DATABASE_DRIVER=postgres
LATTICE_DATABASE_PORT=5432

Start Services

Terminal 1 - Consumer:

make run-consumer

Expected output:

Connected to database: mysql://localhost:3306/lattice
Consumer API listening on :8082
Starting Kafka consumer...

Terminal 2 - Producer:

make run-producer

Expected output:

Producer listening on port 8081

API Endpoints

Producer (Port 8081)

Ingest Table

POST /ingest
{
  "url": "https://example.com/table"
}

Health Check

GET /health

Metrics

GET /metrics

Consumer (Port 8082)

List Tables

GET /tables

Table Info

GET /tables/{table_name}

Query Data

GET /tables/{table_name}/data?limit=10

Consumer Stats

GET /stats

Kafka Metadata

GET /kafka/brokers
GET /kafka/topic
GET /kafka/consumer-group

See API.md for detailed contracts.

Testing

Using Postman

Import the collection:

postman/Lattice.postman_collection.json

Using curl

1. Ingest Wikipedia table:

curl -X POST http://localhost:8081/ingest \
  -H "Content-Type: application/json" \
  -d '{"url":"https://en.wikipedia.org/wiki/List_of_countries_by_population_(United_Nations)"}'

2. List ingested tables:

curl http://localhost:8082/tables

3. Query table data:

curl "http://localhost:8082/tables/list_of_countries_by_population_united_nations/data?limit=5"

4. Check metrics:

curl http://localhost:8081/metrics
curl http://localhost:8082/stats

Verify in Database

MySQL:

docker exec -it lattice-mysql-1 mysql -ulattice -plattice lattice

PostgreSQL:

docker exec -it lattice-postgres-1 psql -U lattice -d lattice

Project Structure

lattice/
├── cmd/
│   ├── producer/          # HTTP API for ingestion
│   └── consumer/          # Kafka consumer + DB writer
├── internal/
│   ├── api/               # HTTP handlers
│   ├── config/            # Configuration management
│   ├── domain/            # Core entities
│   ├── messaging/         # Kafka producer/consumer
│   ├── repository/        # Database layer
│   └── service/           # Business logic
├── pkg/
│   ├── fetcher/           # HTTP client with retries
│   ├── metrics/           # Metrics tracking
│   ├── parser/            # HTML parsing + type inference
│   └── validator/         # URL validation
├── docker-compose.yml     # Local infrastructure
├── Makefile              # Build commands
└── .env.example          # Configuration template

Configuration

All configuration via environment variables with LATTICE_ prefix.

Key Variables

# Database
LATTICE_DATABASE_DRIVER=mysql          # or postgres
LATTICE_DATABASE_HOST=localhost
LATTICE_DATABASE_PORT=3306
LATTICE_DATABASE_NAME=lattice
LATTICE_DATABASE_USER=lattice
LATTICE_DATABASE_PASSWORD=lattice

# Kafka
LATTICE_KAFKA_BROKERS=localhost:9092
LATTICE_KAFKA_TOPIC=lattice.table.records
LATTICE_KAFKA_CONSUMER_GROUP=lattice-table-writer
LATTICE_KAFKA_BATCH_SIZE=100
LATTICE_KAFKA_PARTITION_STRATEGY=table_name   # table_name, source_url, or round_robin

# Parser
LATTICE_PARSER_SAMPLE_SIZE=100         # Rows to sample for type inference

# Fetcher
LATTICE_FETCHER_TIMEOUT=30s
LATTICE_FETCHER_MAX_RETRIES=3

See .env.example for complete list.

Troubleshooting

Producer won't start

  • Check if port 8081 is free: lsof -i :8081
  • Verify Kafka is running: docker ps | grep kafka

Consumer errors

  • Check database connection in logs
  • Verify LATTICE_DATABASE_DRIVER matches running database

No data in database

  • Check consumer logs for processing errors
  • Verify Kafka topic exists: docker exec lattice-kafka-1 kafka-topics --list --bootstrap-server localhost:9092

Kafka connection refused

  • Wait 30 seconds after make infra-up
  • Restart Kafka: docker-compose restart kafka

Development

Run tests:

make test

Clean build:

make clean
make build

View logs:

make infra-logs

Stop infrastructure:

make infra-down

Architecture

See DESIGN.md for detailed architecture, design decisions, and tradeoffs.

License

MIT

About

A data ingestion pipeline written in Go that ingests HTML tables, infers schema, streams through Kafka and persists to Postgres

Resources

Stars

Watchers

Forks

Contributors