Skip to content

Can't get a proxy to work (XSUB/XPUB) #136

@bartekpacia

Description

@bartekpacia

I'm trying to recreate this system (from the ZMQ Guide), but I can't get it to work:

Screenshot 2022-11-06 at 6 30 06 PM

I've got 3 programs: a publisher in pub/main.go, a proxy in proxy/main.go, and a subscriber in sub/main.go. Below are full codes of these programs:

pub

package main

import (
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/docker/distribution/context"
	zmq "github.com/go-zeromq/zmq4"
)

var defaultMsg = `{` +
	`"action": 2,` +
	`"container_id": "0123456789AB"` +
	`}`

var (
	port *int
	msg  *string
)

func init() {
	port = flag.Int("port", 8080, "port to dial to")
	msg = flag.String("msg", defaultMsg, "default message to send")
	flag.Parse()
}

func main() {
	addr := fmt.Sprintf("tcp://localhost:%d", *port)
	socket := zmq.NewPub(context.Background())
	err := socket.Dial(addr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}
	defer socket.Close()

	log.Printf("listening on %s\n", addr)
	for {
		msg := zmq.NewMsgFromString([]string{*msg})
		err := socket.Send(msg)
		if err != nil {
			panic(err) //  Interrupted
		}

		log.Println("message sent")
		time.Sleep(2 * time.Second) //  Wait for 2 seconds
	}
}

proxy

package main

import (
	"context"
	"flag"
	"fmt"
	"log"

	zmq "github.com/go-zeromq/zmq4"
)

var (
	pubPort *int
	subPort *int
)

func init() {
}

func main() {
	subPort = flag.Int("port-sub", 8080, "port to listen for publisher input on")
	pubPort = flag.Int("port-pub", 8081, "port to publish output on")
	flag.Parse()

	ctx := context.Background()

	subSocket := zmq.NewXSub(ctx)
	subAddr := fmt.Sprintf("tcp://localhost:%v", *subPort)
	err := subSocket.Listen(subAddr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}
	err = subSocket.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	pubSocket := zmq.NewXPub(ctx)
	pubAddr := fmt.Sprintf("tcp://localhost:%v", *pubPort)
	err = pubSocket.Listen(pubAddr)
	if err != nil {
		log.Fatalf("failed to listen: %v\n", err)
	}

	listener := zmq.NewPair(ctx)
	listener.Listen("inproc://pipe")
	err = listener.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	go func() {
		for {
			log.Println("listener is waiting for a message")
			msg, err := listener.Recv()
			if err != nil {
				log.Fatalf("failed to recv from socket: %v\n", err)
			}
			b := msg.Bytes()
			fmt.Printf("message: %s, len: %v\n", b, len(b))
		}
	}()

	proxy := zmq.NewProxy(ctx, subSocket, pubSocket, listener)

	log.Printf("listening for publishers on %s\n", subAddr)
	log.Printf("listening for subscribers on %s\n", pubAddr)

	fmt.Println("running proxy...")
	err = proxy.Run()
	if err != nil {
		log.Fatalln(err)
	}
}

sub

package main

import (
	"context"
	"flag"
	"fmt"
	"log"

	zmq "github.com/go-zeromq/zmq4"
)

func main() {
	port := flag.Int("port", 8081, "port to listen on")
	flag.Parse()

	socket := zmq.NewSub(context.Background())
	defer socket.Close()

	addr := fmt.Sprintf("tcp://localhost:%d", *port)
	err := socket.Dial(addr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}

	err = socket.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	fmt.Printf("dialing to %s\n", addr)

	for {
		msg, err := socket.Recv()
		if err != nil {
			log.Fatalf("failed to recv from socket: %v\n", err)
		}

		b := msg.Bytes()
		fmt.Printf("message: %s, len: %v\n", b, len(b))
	}
}

I'm pretty sure I'm using correct socket types and also enable subscribing all topics using socket.SetOption(zmq.OptionSubscribe, ""). I don't have any more ideas on how to get this simple example to work.

Might be related to #108

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions