Skip to content
75 changes: 75 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,81 @@ go get github.com/memphisdev/memphis.go
import "github.com/memphisdev/memphis.go"
```

### Quickstart - Producing and Consuming

The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages.

First, a connection to Memphis must be made:

```js
const { memphis } = require('memphis-dev');

// Connecting to the broker
memphis = Memphis()

let conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountid>), //You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored
memphis.Password(<memphis-password>),
)

if err != nil{
return
}

defer conn.Close()

```

Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function:

```go
message := make(map[string]any)

message["Hello"] = "World"

err := conn.Produce(
"<station-name>",
"<producer-name>",
message,
[]memphis.ProducerOpt{},
[]memphis.ProduceOpt{},
)

if err != nil{
return
}
```

Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function:

```go
messages, err := conn.FetchMessages(
"<station-name>",
"<consumer-name>",
)

if err != nil{
return
}

var msg_map map[string]any
for _, message := range messages{
err = json.Unmarshal(message.Data(), &msg_map)
if err != nil{
continue
}

// Do something with the message

err = message.Ack()
if err != nil{
continue
}
}
```

### Connecting to Memphis
```go
c, err := memphis.Connect("<memphis-host>",
Expand Down
68 changes: 39 additions & 29 deletions examples/consumer.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,57 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/memphisdev/memphis.go"
)

func main() {
conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("<broker-token>"))
if err != nil {
fmt.Printf("Connection failed: %v", err)
os.Exit(1)
func main(){
conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountid>),
memphis.Password(<mempis-password>),
)

if err != nil{
fmt.Print(err)
return
}

defer conn.Close()

consumer, err := conn.CreateConsumer("<station-name>", "<consumer-name>", memphis.PullInterval(15*time.Second))
consumer, _ := conn.CreateConsumer("<station-name>", "<consumer-name>")

if err != nil {
fmt.Printf("Consumer creation failed: %v\n", err)
os.Exit(1)
}
for true {
messages, err := consumer.Fetch()

handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
if err != nil {
fmt.Printf("Fetch failed: %v\n", err)
return
if len(messages) == 0 {
continue
}

for _, msg := range msgs {
fmt.Println(string(msg.Data()))
msg.Ack()
headers := msg.GetHeaders()
fmt.Println(headers)
if err != nil{
fmt.Print(err)
return
}

var msg_map map[string]any
for _, message := range messages{
err = json.Unmarshal(message.Data(), &msg_map)
if err != nil{
fmt.Print(err)
continue
}

// Do something with the message

err = message.Ack()
if err != nil{
fmt.Print(err)
continue
}
}
}

consumer.Consume(handler)

// The program will close the connection after 30 seconds,
// the message handler may be called after the connection closed
// so the handler may receive a timeout error
time.Sleep(30 * time.Second)
}
}
47 changes: 25 additions & 22 deletions examples/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,36 @@ import (
"github.com/memphisdev/memphis.go"
)

func main() {
conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("<broker-token>"))
if err != nil {
fmt.Printf("Connection failed: %v", err)
os.Exit(1)
func main(){
conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountID>),
memphis.Password(<memphis-password>),
)

if err != nil{
fmt.Print(err)
return
}
defer conn.Close()
p, err := conn.CreateProducer("<station-name>", "<producer-name>")

if err != nil {
fmt.Printf("Create Producer failed: %v", err)
os.Exit(1)
}
defer conn.Close()

hdrs := memphis.Headers{}
hdrs.New()
err = hdrs.Add("key", "value")
producer, err := conn.CreateProducer("<station-name>", "<producer-name>")

if err != nil {
fmt.Printf("Header failed: %v", err)
os.Exit(1)
if err != nil{
fmt.Print(err)
return
}

err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
message := make(map[string]any)

if err != nil {
fmt.Printf("Produce failed: %v", err)
os.Exit(1)
message["Hello"] = "World"
for i := 0; i < 3; i++{
err = producer.Produce(message)
}

if err != nil{
return
}
}
}