Skip to content

Commit d09cdcb

Browse files
committed
[Server] add unit transport protocol
1 parent d3aad37 commit d09cdcb

File tree

26 files changed

+1188
-3523
lines changed

26 files changed

+1188
-3523
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# unitdb [![GoDoc](https://godoc.org/github.com/unit-io/unitdb?status.svg)](https://pkg.go.dev/github.com/unit-io/unitdb) [![Go Report Card](https://goreportcard.com/badge/github.com/unit-io/unitdb)](https://goreportcard.com/report/github.com/unit-io/unitdb) [![Build Status](https://travis-ci.org/unit-io/unitdb.svg?branch=master)](https://travis-ci.org/unit-io/unitdb) [![Coverage Status](https://coveralls.io/repos/github/unit-io/unitdb/badge.svg?branch=master)](https://coveralls.io/github/unit-io/unitdb?branch=master)
22

3-
Unitdb is blazing fast specialized time-series database for microservices, IoT, and realtime internet connected devices. As Unitdb satisfy the requirements for low latency and binary messaging, it is a perfect time-series database for applications such as internet of things and internet connected devices.
3+
Unitdb is blazing fast specialized time-series database for microservices, IoT, and realtime internet connected devices. As Unitdb satisfy the requirements for low latency and binary messaging, it is a perfect time-series database for applications such as internet of things and internet connected devices. The Unitdb Server uses uTP (unit Transport Protocol) for the Client Server messaging. Read [uTP Specification](https://github.com/unit-io/unitdb/tree/master/docs/utp.md).
44

55
```
66
Don't forget to ⭐ this repo if you like Unitdb!

db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func Open(path string, opts ...Options) (*DB, error) {
5151
options := &_Options{}
5252
WithDefaultOptions().set(options)
5353
WithDefaultFlags().set(options)
54+
WithDefaultQueryOptions().set(options)
5455
for _, opt := range opts {
5556
if opt != nil {
5657
opt.set(options)

db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestSimple(t *testing.T) {
9292
var ids [][]byte
9393

9494
entry := NewEntry(topic, nil)
95-
entry.WithContract(contract).WithTTL([]byte("1m"))
95+
entry.WithContract(contract).WithTTL("1m")
9696
for i = 0; i < n; i++ {
9797
messageID := db.NewID()
9898
entry.WithID(messageID)

docs/utp.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Unit Transport Protocol (uTP)
2+
3+
## About uTP
4+
The uTP (unit Tranport Protocol) specification defines the Client Server message transport protocol. It is light weight, open, simple and designed for communication in Machine to Machine (M2M) and Internet connected devices (IoT) contexts. The uTP protocol runs over TCP/IP, WebSocket, GRPC or other network protocols that provide bi-directional connections.
5+
6+
## Message Flow
7+
An application transport the data by uTP across network, it contains payload data, delivey mode and topic with optional collection of properties.
8+
9+
### Client
10+
A Client opens the network connection to the Server using TCP/IP, WebSocket, GRPC or other bi-direction network protocols.
11+
- Pubslihes Application Mesasges to a topic that other Clients subscribes in.
12+
- Subscribes to a topic to receive Application Messages.
13+
- Unsubcribe to remove a topic subscription.
14+
- Closes the network connection to the Server.
15+
16+
### Server
17+
- Accepts the network connections from Clients.
18+
- Recieves and store Application Messages published by Clients.
19+
- Processes topic subscription requests from Clients.
20+
- Route Application Messages that match Client subscriptions.
21+
- Closes the network connection from the Client.
22+
23+
## Application Message
24+
The Application Messages are transported between Client and Server in the form of data Packets. A data Packet consist of Fixed Header and a uTP Message Type defined in the specification.
25+
26+
### Fixed Header
27+
Each uTP data packet contains a Fixed Header as shown below:
28+
29+
| Name | Type |
30+
| :--- | :--- |
31+
| MessageType | enum |
32+
| MessageLength | int32 |
33+
34+
### Message Type
35+
36+
| Name | Value | Direction of Flow |
37+
| :--- | :--- | :--- |
38+
| RERSERVED | 0 | Forbidden |
39+
| CONNECT | 1 | Client to Server |
40+
| CONNACK | 2 | Server to Client |
41+
| PUBLISH | 3 | Client to Server or Server to Client |
42+
| PUBNEW | 4 | Server to Client |
43+
| PUBRECEIVE | 5 | Client to Server |
44+
| PUBRECEIPT | 6 | Client to Server or Server to Client |
45+
| PUBCOMPLETE | 7 | Client to Server or Server to Client |
46+
| SUBSCRIBE | 8 | Client to Server |
47+
| SUBACK | 9 | Server to Client |
48+
| UNSUBSCRIBE | 10 | Client to Server |
49+
| UNSUBACK | 11 | Server to Client |
50+
| PINGREQ | 12 | Client to Server |
51+
| PINGRESP | 13 | Server to Client |
52+
| DISCONNECT | 14 | Client to Server or Server to Client |
53+
54+
## Delivery Mode
55+
The uTP delivers Application Messages as per the Delivery Mode defined in the publish and subscribe Message type. The Client and Server both can be a publisher or subcriber of messages.
56+
Th Delivery Mode in the oubound Application Message to the Client could differ from that of inbound Application Message.
57+
58+
### Express Delivery
59+
The Express Delivery Mode ensures that the Application Message arrives at the receiver at least once. An Express Delivery Mode PUBLISH Message has a Message Identifier and get the receipt acknowledgement by a PUBRECEIPT Message.
60+
61+
In the Express Delivery Mode, the sender
62+
- Must send a PUBLISH Message containing a Message Identifier with Delivery Mode Express
63+
- Must treat a PUBLISH Message as "unacknowledged" until it has received the corresponding PUBCOMPLETE Message from the receiver.
64+
65+
A sender is permitted to send further PUBLISH Messages with different Message Identifier while it is waiting to receive publish receipts.
66+
67+
In the Express Delivery Mode, the receiver
68+
- Must respond with a PUBCOMPLETE Message containing the Message Identifier from the incoming PUBLISH Message.
69+
70+
### Reliable Delivery
71+
The Reliable Delivery Mode ensures no duplication of Application Messages delivered to the receivers. There is an increased overhead associated to the Reliable Delivery Mode.
72+
73+
### Inbound Message Flow from a Client
74+
In the Reliable Delivery Mode, the sender
75+
- Must send a PUBLISH Message containing a Message Identifier with Delivery Mode Reliable
76+
- Must treat a PUBLISH Message as "unacknowledged" until it has received the corresponding PUBRECEIPT Message from the receiver.
77+
- Must send a PUBCOMPLETE Message when it receive a PUBRECEIPT Message from the receiver. This PUBCOMPLETE Message must contain the same Message Identifier from original PUBLISH Message.
78+
- Must NOT re-send the PUBLISH Message once it has received the corresponding PUBRECEIPT Message.
79+
80+
A sender is permitted to send further PUBLISH Messages with different Message Identifier while it is waiting to receive publish receipts.
81+
82+
In the Reliable Delivery Mode, the receiver
83+
- Must respond with a PUBRECEIPT containing the Message Identifier from the incoming PUBLISH Message.
84+
- Until it has received a corresponding PUBCOMPLETE Message, the receiver MUST acknowledge any subsequent PUBLISH Message with the same Message Identifier by sending a PUBRECEIPT Message.
85+
86+
### Outbound Message Flow to the Client
87+
In the Reliable Delivery Mode, the sender
88+
- Must send a PUBNEW Message containing a Message Identifier
89+
- Must send PUBLISH Message when it receive a PUBRECEIVE Packet from the receiver. This PUBLISH Message must conrain the same Message Identifier from PUBNEW Message.
90+
- Must treat a PUBLISH Message as "unacknowledged" until it has received the corresponding PUBRECEIPT Message from the receiver.
91+
- Must send a PUBCOMPLETE Message when it receive a PUBRECEIPT Message from the receiver. This PUBCOMPLETE Message must contain the same Message Identifier from original PUBNEW Message.
92+
- Must NOT re-send the PUBLISH Message once it has received the corresponding PUBRECEIPT Message.
93+
94+
A sender is permitted to send further PUBLISH Messages with different Message Identifier while it is waiting to receive publish receipts.
95+
96+
In the Reliable Delivery Mode, the receiver
97+
- Must respond with a PUBRECEIVE containing the Message Identifier from the incoming PUBNEW Message.
98+
- Must respond with a PUBRECEIPT Message when it receive a PUBLISH Message from the sender containing the Message Identifier from the incoming PUBNEW Message.
99+
- Until it has received a corresponding PUBCOMPLETE Message, the receiver MUST acknowledge any subsequent PUBLISH Message with the same Message Identifier by sending a PUBRECEIPT Message.
100+

entry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ func (e *Entry) WithContract(contract uint32) *Entry {
7777
}
7878

7979
// WithTTL sets TTL for message expiry for the entry.
80-
func (e *Entry) WithTTL(ttl []byte) *Entry {
81-
val, err := strconv.ParseInt(unsafeToString(ttl), 10, 64)
80+
func (e *Entry) WithTTL(ttl string) *Entry {
81+
val, err := strconv.ParseInt(ttl, 10, 64)
8282
if err == nil {
8383
e.ExpiresAt = uint32(time.Now().Add(time.Duration(int(val)) * time.Second).Unix())
8484
}
8585
var duration time.Duration
86-
duration, _ = time.ParseDuration(unsafeToString(ttl))
86+
duration, _ = time.ParseDuration(ttl)
8787
e.ExpiresAt = uint32(time.Now().Add(duration).Unix())
8888
return e
8989
}

examples/pubsub/main.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ var f unitdb.MessageHandler = func(client unitdb.Client, msg unitdb.Message) {
1616
}
1717

1818
func main() {
19-
// Create a new client to open a network connection using the protocol indicated in the URL.
2019
client, err := unitdb.NewClient(
2120
//"tcp://localhost:6060",
2221
// "ws://localhost:6080",
@@ -36,23 +35,23 @@ func main() {
3635
log.Fatalf("err: %s", err)
3736
}
3837

39-
r := client.Subscribe("teams.alpha.user1")
38+
r := client.Subscribe([]byte("teams.alpha.user1"), unitdb.WithLast("1h"))
4039
if _, err := r.Get(ctx, 1*time.Second); err != nil {
4140
fmt.Println(err)
4241
os.Exit(1)
4342
}
4443

4544
for i := 0; i < 5; i++ {
4645
msg := fmt.Sprintf("Hi #%d time!", i)
47-
r := client.Publish("teams.alpha.user1", msg)
46+
r := client.Publish([]byte("teams.alpha.user1"), []byte(msg), unitdb.WithTTL("1m"))
4847
if _, err := r.Get(ctx, 1*time.Second); err != nil {
4948
log.Fatalf("err: %s", err)
5049
}
5150
}
5251

5352
wait := time.NewTicker(1 * time.Second)
5453
<-wait.C
55-
r = client.Unsubscribe("teams.alpha.user1")
54+
r = client.Unsubscribe([]byte("teams.alpha.user1"))
5655
if _, err := r.Get(ctx, 1*time.Second); err != nil {
5756
fmt.Println(err)
5857
os.Exit(1)

options.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ func WithBatchWriteInterval(dur time.Duration) Options {
163163
})
164164
}
165165

166+
// WithDefaultQueryOptions will set some default values for Query operation.
167+
// defaultQueryLimit: 1000
168+
// maxQueryLimit: 100000
169+
func WithDefaultQueryOptions() Options {
170+
return newFuncOption(func(o *_Options) {
171+
o.queryOptions.defaultQueryLimit = 1000
172+
o.queryOptions.maxQueryLimit = 100000
173+
})
174+
}
175+
166176
// WithDefaultOptions will open DB with some default values.
167177
func WithDefaultOptions() Options {
168178
return newFuncOption(func(o *_Options) {
@@ -172,12 +182,6 @@ func WithDefaultOptions() Options {
172182
if o.syncDurationType == 0 {
173183
o.syncDurationType = time.Second
174184
}
175-
if o.queryOptions.defaultQueryLimit == 0 {
176-
o.queryOptions.defaultQueryLimit = 1000
177-
}
178-
if o.queryOptions.maxQueryLimit == 0 {
179-
o.queryOptions.maxQueryLimit = 100000
180-
}
181185
if o.bufferSize == 0 {
182186
o.bufferSize = 1 << 32 // maximum size of a buffer to use in bufferpool (4GB).
183187
}

query.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package unitdb
1818

1919
import (
20+
"time"
21+
2022
"github.com/unit-io/unitdb/message"
2123
)
2224

@@ -46,8 +48,11 @@ type (
4648

4749
// NewQuery creates a new query structure from the topic.
4850
func NewQuery(topic []byte) *Query {
51+
opts := &_Options{}
52+
WithDefaultQueryOptions().set(opts)
4953
return &Query{
50-
Topic: topic,
54+
internal: _InternalQuery{opts: &opts.queryOptions},
55+
Topic: topic,
5156
}
5257
}
5358

@@ -63,6 +68,25 @@ func (q *Query) WithLimit(limit int) *Query {
6368
return q
6469
}
6570

71+
// WithLast sets query duration to fetch stored messages.
72+
func (q *Query) WithLast(dur string) *Query {
73+
base := time.Now()
74+
duration, err := time.ParseDuration(dur)
75+
if err != nil {
76+
return q
77+
}
78+
// In case of last, include it to the query.
79+
q.internal.cutoff = base.Add(-duration).Unix()
80+
switch {
81+
case (q.Limit == 0):
82+
q.Limit = q.internal.opts.defaultQueryLimit
83+
case q.Limit > q.internal.opts.maxQueryLimit:
84+
q.Limit = q.internal.opts.maxQueryLimit
85+
}
86+
87+
return q
88+
}
89+
6690
func (q *Query) parse() error {
6791
if q.Contract == 0 {
6892
q.Contract = message.MasterContract

server/internal/cluster.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,11 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
317317

318318
switch msg.Type {
319319
case message.SUBSCRIBE:
320-
conn.handle(msg.MsgSub)
320+
conn.handler(msg.MsgSub)
321321
case message.UNSUBSCRIBE:
322-
conn.handle(msg.MsgUnsub)
322+
conn.handler(msg.MsgUnsub)
323323
case message.PUBLISH:
324-
conn.handle(msg.MsgPub)
324+
conn.handler(msg.MsgPub)
325325
}
326326
} else {
327327
// Reject the request: wrong signature, cluster is out of sync.

0 commit comments

Comments
 (0)