Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、

如果您使用 WebSocket,您无需担心心跳请求/响应,因为 Getty 在 session.go 的 (Session)handleLoop 方法内通过发送和接收 WebSocket ping/pong 帧来处理此任务。您只需在 codec.go 的 (Codec)OnCron 方法内使用 session.go 的 (Session)GetActive 方法检查 WebSocket 会话是否已超时。

有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples
有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples

## 关于 Getty 中的网络传输

Expand Down
116 changes: 116 additions & 0 deletions transport/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

// callbackCommon represents a node in the callback linked list
// Each node contains handler identifier, key, callback function and pointer to next node
type callbackCommon struct {
handler interface{} // Handler identifier, used to identify the source or type of callback
key interface{} // Unique identifier key for callback, used in combination with handler
call func() // Actual callback function to be executed
next *callbackCommon // Pointer to next node, forming linked list structure
}

// callbacks is a singly linked list structure for managing multiple callback functions
// Supports dynamic addition, removal and execution of callbacks
type callbacks struct {
first *callbackCommon // Pointer to the first node of the linked list
last *callbackCommon // Pointer to the last node of the linked list, used for quick addition of new nodes
}

// Add adds a new callback function to the callback linked list
// Parameters:
// - handler: Handler identifier, can be any type
// - key: Unique identifier key for callback, used in combination with handler
// - callback: Callback function to be executed, ignored if nil
func (t *callbacks) Add(handler, key interface{}, callback func()) {
// Prevent adding empty callback function
if callback == nil {
return
}

// Create new callback node
newItem := &callbackCommon{handler, key, callback, nil}

if t.first == nil {
// If linked list is empty, new node becomes the first node
t.first = newItem
} else {
// Otherwise add new node to the end of linked list
t.last.next = newItem
}
// Update pointer to last node
t.last = newItem
}

// Remove removes the specified callback function from the callback linked list
// Parameters:
// - handler: Handler identifier of the callback to be removed
// - key: Unique identifier key of the callback to be removed
// Note: If no matching callback is found, this method has no effect
func (t *callbacks) Remove(handler, key interface{}) {
var prev *callbackCommon

// Traverse linked list to find the node to be removed
for callback := t.first; callback != nil; prev, callback = callback, callback.next {
// Found matching node
if callback.handler == handler && callback.key == key {
if t.first == callback {
// If it's the first node, update first pointer
t.first = callback.next
} else if prev != nil {
// If it's a middle node, update the next pointer of the previous node
prev.next = callback.next
}

if t.last == callback {
// If it's the last node, update last pointer
t.last = prev
}

// Return immediately after finding and removing
return
}
}
}

// Invoke executes all registered callback functions in the linked list
// Executes each callback in the order they were added
// Note: If a callback function is nil, it will be skipped
func (t *callbacks) Invoke() {
// Traverse the entire linked list starting from the head node
for callback := t.first; callback != nil; callback = callback.next {
// Ensure callback function is not nil before executing
if callback.call != nil {
callback.call()
}
}
}

// Count returns the number of callback functions in the linked list
// Return value: Total number of currently registered callback functions
func (t *callbacks) Count() int {
var count int

// Traverse linked list to count
for callback := t.first; callback != nil; callback = callback.next {
count++
}

return count
}
92 changes: 92 additions & 0 deletions transport/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package getty

import (
"testing"
)

func TestCallback(t *testing.T) {
// Test empty list
cb := &callbacks{}
if cb.Count() != 0 {
t.Errorf("Expected count for empty list is 0, but got %d", cb.Count())
}

// Test adding callback functions
var count, expected, remove, totalCount int
totalCount = 10
remove = 5

// Add multiple callback functions
for i := 1; i < totalCount; i++ {
expected = expected + i
func(ii int) {
cb.Add(ii, ii, func() { count = count + ii })
}(i)
}

// Verify count after adding
expectedCallbacks := totalCount - 1
if cb.Count() != expectedCallbacks {
t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Count())
}

// Test adding nil callback
cb.Add(remove, remove, nil)
if cb.Count() != expectedCallbacks {
t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count())
}

// Remove specified callback
cb.Remove(remove, remove)

// Try to remove non-existent callback
cb.Remove(remove+1, remove+2)

// Execute all callbacks
cb.Invoke()

// Verify execution result
expectedCount := expected - remove
if count != expectedCount {
t.Errorf("Expected execution result is %d, but got %d", expectedCount, count)
}

// Test string type handler and key
cb2 := &callbacks{}

// Add callbacks
cb2.Add("handler1", "key1", func() {})
cb2.Add("handler2", "key2", func() {})
cb2.Add("handler3", "key3", func() {})

if cb2.Count() != 3 {
t.Errorf("Expected callback count is 3, but got %d", cb2.Count())
}

// Remove middle callback
cb2.Remove("handler2", "key2")
if cb2.Count() != 2 {
t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Count())
}

// Remove first callback
cb2.Remove("handler1", "key1")
if cb2.Count() != 1 {
t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Count())
}

// Remove last callback
cb2.Remove("handler3", "key3")
if cb2.Count() != 0 {
t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Count())
}

// Test removing non-existent callback
cb2.Add("handler1", "key1", func() {})
cb2.Remove("handler2", "key2") // Try to remove non-existent callback

// Should still have 1 callback
if cb2.Count() != 1 {
t.Errorf("Expected callback count is 1, but got %d", cb2.Count())
}
}
17 changes: 17 additions & 0 deletions transport/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type Session interface {
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()

AddCloseCallback(handler, key any, callback CallBackFunc)
RemoveCloseCallback(handler, key any)
}

// getty base session
Expand Down Expand Up @@ -135,6 +138,10 @@ type session struct {
grNum uatomic.Int32
lock sync.RWMutex
packetLock sync.RWMutex

// callbacks
closeCallback callbacks
closeCallbackMutex sync.RWMutex
}

func newSession(endPoint EndPoint, conn Connection) *session {
Expand Down Expand Up @@ -861,6 +868,16 @@ func (s *session) stop() {
}
}
close(s.done)

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("invokeCloseCallbacks panic: %v", r)
}
}()
s.invokeCloseCallbacks()
}()

clt, cltFound := s.GetAttribute(sessionClientKey).(*client)
ignoreReconnect, flagFound := s.GetAttribute(ignoreReconnectKey).(bool)
if cltFound && flagFound && !ignoreReconnect {
Expand Down
68 changes: 68 additions & 0 deletions transport/session_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package getty

// AddCloseCallback adds a close callback function to the Session
//
// Parameters:
// - handler: handler identifier, used to identify the source or type of the callback
// - key: unique identifier key for the callback, used in combination with handler
// - f: callback function to be executed when the session is closed
//
// Notes:
// - If the session is already closed, this addition will be ignored
// - The combination of handler and key must be unique, otherwise it will override previous callbacks
// - Callback functions will be executed in the order they were added when the session closes
func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) {
if s.IsClosed() {
return
}
s.closeCallbackMutex.Lock()
s.closeCallback.Add(handler, key, f)
s.closeCallbackMutex.Unlock()
}

// RemoveCloseCallback removes the specified Session close callback function
//
// Parameters:
// - handler: handler identifier of the callback to be removed
// - key: unique identifier key of the callback to be removed
//
// Return value: none
//
// Notes:
// - If the session is already closed, this removal operation will be ignored
// - If no matching callback is found, this operation will have no effect
// - The removal operation is thread-safe
func (s *session) RemoveCloseCallback(handler, key any) {
if s.IsClosed() {
return
}
s.closeCallbackMutex.Lock()
s.closeCallback.Remove(handler, key)
s.closeCallbackMutex.Unlock()
}

// invokeCloseCallbacks executes all registered close callback functions
//
// Function description:
// - Executes all registered close callbacks in the order they were added
// - Uses read lock to protect the callback list, ensuring concurrency safety
// - This method is typically called automatically when the session closes
//
// Notes:
// - This is an internal method, not recommended for external direct calls
// - If panic occurs during callback execution, it will be caught and logged
// - Callback functions should avoid long blocking operations, async processing is recommended for time-consuming tasks
func (s *session) invokeCloseCallbacks() {
s.closeCallbackMutex.RLock()
s.closeCallback.Invoke()
s.closeCallbackMutex.RUnlock()
}

// CallBackFunc defines the callback function type when Session closes
//
// Usage notes:
// - Callback function accepts no parameters
// - Callback function returns no values
// - Callback function should handle resource cleanup, state updates, etc.
// - It's recommended to avoid accessing closed session state in callback functions
type CallBackFunc func()
Loading
Loading