Skip to content

Commit 65ccbc3

Browse files
committed
feat(addon): Kafka Subscriber Peer & Kafka Producer Peer.
1 parent 23f5aae commit 65ccbc3

File tree

4 files changed

+499
-0
lines changed

4 files changed

+499
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#pragma once
2+
#include <godot_cpp/godot.hpp>
3+
4+
void initialize_gdextension_types(godot::ModuleInitializationLevel p_level);
5+
void uninitialize_gdextension_types(godot::ModuleInitializationLevel p_level);

project/includes/kafka_peer.hpp

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#pragma once
2+
3+
#include <kafkalib.hpp>
4+
5+
#include <godot_cpp/classes/packet_peer.hpp>
6+
#include <godot_cpp/classes/resource.hpp>
7+
8+
class Kafka : public godot::RefCounted
9+
{
10+
GDCLASS(Kafka, godot::RefCounted);
11+
protected:
12+
static void _bind_methods();
13+
public:
14+
enum Severity {
15+
SEVERITY_EMERGENCY = RdKafka::Severity::EVENT_SEVERITY_EMERG,
16+
SEVERITY_ALERT = RdKafka::Severity::EVENT_SEVERITY_ALERT,
17+
SEVERITY_CRITICAL = RdKafka::Severity::EVENT_SEVERITY_CRITICAL,
18+
SEVERITY_ERROR = RdKafka::Severity::EVENT_SEVERITY_ERROR,
19+
SEVERITY_WARNING = RdKafka::Severity::EVENT_SEVERITY_WARNING,
20+
SEVERITY_NOTICE = RdKafka::Severity::EVENT_SEVERITY_NOTICE,
21+
SEVERITY_INFO = RdKafka::Severity::EVENT_SEVERITY_INFO,
22+
SEVERITY_DEBUG = RdKafka::Severity::EVENT_SEVERITY_DEBUG
23+
};
24+
enum DebugFlags {
25+
DEBUG_NONE = (1 << 0), // 0
26+
DEBUG_GENERIC = (1 << 1), // 1
27+
DEBUG_BROKER = (1 << 2), // 2
28+
DEBUG_TOPIC = (1 << 3), // 4
29+
DEBUG_METADATA = (1 << 4), // 8
30+
DEBUG_FEATURE = (1 << 5), // 16
31+
DEBUG_QUEUE = (1 << 6), // 32
32+
DEBUG_MSG = (1 << 7), // 64
33+
DEBUG_PROTOCOL = (1 << 8), // 128
34+
DEBUG_CGRP = (1 << 9), // 256
35+
DEBUG_SECURITY = (1 << 10), // 512
36+
DEBUG_FETCH = (1 << 11), // 1024
37+
DEBUG_INTERCEPTOR = (1 << 12), // 2048
38+
DEBUG_PLUGIN = (1 << 13), // 4096
39+
DEBUG_CONSUMER = (1 << 14), // 8192
40+
DEBUG_ADMIN = (1 << 15), // 16384
41+
DEBUG_EOS = (1 << 16), // 32768
42+
DEBUG_MOCK = (1 << 17), // 65536
43+
DEBUG_ASSIGNOR = (1 << 18), // 131072
44+
DEBUG_CONF = (1 << 19), // 262144
45+
DEBUG_TELEMETRY = (1 << 20), // 524288
46+
// All debug flags combined
47+
DEBUG_ALL = (1 << 21) - 1 // 1048575
48+
};
49+
enum Offset
50+
{
51+
OFFSET_BEGINNING = RdKafka::OffsetSpec_t::OFFSET_BEGINNING,
52+
OFFSET_END = RdKafka::OffsetSpec_t::OFFSET_END
53+
};
54+
};
55+
56+
VARIANT_ENUM_CAST(Kafka::Severity);
57+
VARIANT_BITFIELD_CAST(Kafka::DebugFlags);
58+
VARIANT_ENUM_CAST(Kafka::Offset);
59+
60+
class KafkaSubscriberPeerConfiguration final : public godot::Resource
61+
{
62+
GDCLASS(KafkaSubscriberPeerConfiguration, godot::Resource);
63+
protected:
64+
static void _bind_methods();
65+
public:
66+
KafkaSubscriberPeerConfiguration() {}
67+
~KafkaSubscriberPeerConfiguration() {}
68+
69+
godot::String get_brokers() const;
70+
void set_brokers(godot::String &p_brokers);
71+
72+
godot::String get_group_id() const;
73+
void set_group_id(godot::String &p_group_id);
74+
75+
bool is_group_generate_unique() const;
76+
void set_group_generate_unique(bool p_generate_unique);
77+
78+
godot::PackedStringArray get_topics() const;
79+
void set_topics(godot::PackedStringArray &p_topics);
80+
81+
Kafka::Offset get_offset() const;
82+
void set_offset(Kafka::Offset p_offset);
83+
84+
bool is_auto_commit() const;
85+
void set_auto_commit(bool p_auto_commit);
86+
87+
bool is_partition_eof_enabled() const;
88+
void set_partition_eof_enabled(bool p_partition_eof_enabled);
89+
90+
Kafka::DebugFlags get_debug_flags() const;
91+
void set_debug_flags(Kafka::DebugFlags p_debug_flags);
92+
93+
Kafka::Severity get_log_severity() const;
94+
void set_log_severity(Kafka::Severity p_log_severity);
95+
96+
int get_session_timeout() const;
97+
void set_session_timeout(int p_session_timeout);
98+
99+
int get_max_poll_interval() const;
100+
void set_max_poll_interval(int p_max_poll_interval);
101+
private:
102+
GodotStreaming::KafkaSubscriberMetadata m_subscriber_metadata;
103+
};
104+
105+
class KafkaPublisherPeerConfiguration final : public godot::Resource
106+
{
107+
GDCLASS(KafkaPublisherPeerConfiguration, godot::Resource);
108+
protected:
109+
static void _bind_methods();
110+
public:
111+
KafkaPublisherPeerConfiguration() {}
112+
~KafkaPublisherPeerConfiguration() {}
113+
const godot::String get_brokers() const;
114+
private:
115+
GodotStreaming::KafkaPublisherMetadata m_publisher_metadata;
116+
};
117+
118+
class KafkaSubscriberPeer final : public godot::PacketPeer
119+
{
120+
GDCLASS(KafkaSubscriberPeer, godot::PacketPeer);
121+
protected:
122+
static void _bind_methods();
123+
public:
124+
static KafkaSubscriberPeer *create(const godot::Ref<KafkaSubscriberPeerConfiguration> &p_configuration);
125+
126+
KafkaSubscriberPeer() {}
127+
~KafkaSubscriberPeer() {}
128+
129+
const godot::Error Subscribe(const godot::String &p_topic_name) const;
130+
131+
const godot::Error Receive(godot::PackedByteArray &data) const;
132+
private:
133+
std::unique_ptr<GodotStreaming::KafkaSubscriber> m_subscriber = nullptr;
134+
};
135+
136+
class KafkaPublisherPeer final : public godot::PacketPeer
137+
{
138+
GDCLASS(KafkaPublisherPeer, godot::PacketPeer);
139+
protected:
140+
static void _bind_methods();
141+
public:
142+
static KafkaPublisherPeer *create(const godot::Ref<KafkaPublisherPeerConfiguration> &p_configuration);
143+
144+
KafkaPublisherPeer() {}
145+
~KafkaPublisherPeer() {}
146+
147+
const godot::Error Publish(const godot::String &p_topic_name, const godot::PackedByteArray &p_data, const godot::String &p_key = "") const;
148+
const godot::Error Purge();
149+
const godot::Error Flush(const uint32_t p_timeout_ms) const;
150+
private:
151+
std::unique_ptr<GodotStreaming::KafkaPublisher> m_publisher = nullptr;
152+
};

project/src/godot_register.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "godot_register.hpp"
2+
#include <gdextension_interface.h>
3+
#include <godot_cpp/core/class_db.hpp>
4+
#include <godot_cpp/core/defs.hpp>
5+
#include <godot_cpp/godot.hpp>
6+
#include <godot_cpp/variant/utility_functions.hpp>
7+
8+
void initialize_gdextension_types(godot::ModuleInitializationLevel p_level) {
9+
if (p_level != godot::MODULE_INITIALIZATION_LEVEL_SCENE) {
10+
return;
11+
}
12+
13+
// Register the class
14+
// GDREGISTER_CLASS(KafkaMultiplayerPeer);
15+
// GDREGISTER_CLASS(KafkaSubscriberMetadata);
16+
// GDREGISTER_CLASS(KafkaPublisherMetadata);
17+
}
18+
19+
void uninitialize_gdextension_types(godot::ModuleInitializationLevel p_level) {
20+
if (p_level != godot::MODULE_INITIALIZATION_LEVEL_SCENE) {
21+
return;
22+
}
23+
}
24+
25+
extern "C"
26+
{
27+
// Initialization
28+
GDExtensionBool GDE_EXPORT GDExtensionInit(GDExtensionInterfaceGetProcAddress p_get_proc_address, GDExtensionClassLibraryPtr p_library, GDExtensionInitialization *r_initialization)
29+
{
30+
godot::GDExtensionBinding::InitObject init_obj(p_get_proc_address, p_library, r_initialization);
31+
init_obj.register_initializer(initialize_gdextension_types);
32+
init_obj.register_terminator(uninitialize_gdextension_types);
33+
init_obj.set_minimum_library_initialization_level(godot::MODULE_INITIALIZATION_LEVEL_SCENE);
34+
35+
return init_obj.init();
36+
}
37+
}

0 commit comments

Comments
 (0)