Skip to content

Commit 8071391

Browse files
committed
Fix worker_threads support
Signed-off-by: Stephen Belanger <[email protected]>
1 parent a950a56 commit 8071391

26 files changed

+833
-329
lines changed

binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
'src/connection.cc',
1616
'src/errors.cc',
1717
'src/kafka-consumer.cc',
18+
'src/per-isolate-data.cc',
1819
'src/producer.cc',
1920
'src/topic.cc',
2021
'src/workers.cc',

package-lock.json

Lines changed: 496 additions & 270 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/admin.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212
#include <math.h>
1313

14+
#include "src/per-isolate-data.h"
1415
#include "src/workers.h"
1516
#include "src/admin.h"
1617

@@ -33,6 +34,10 @@ AdminClient::AdminClient(Conf* gconfig):
3334
rkqu = NULL;
3435
}
3536

37+
void AdminClient::delete_instance(void* arg) {
38+
delete (static_cast<AdminClient*>(arg));
39+
}
40+
3641
AdminClient::~AdminClient() {
3742
Disconnect();
3843
}
@@ -90,8 +95,6 @@ Baton AdminClient::Disconnect() {
9095
return Baton(RdKafka::ERR_NO_ERROR);
9196
}
9297

93-
Nan::Persistent<v8::Function> AdminClient::constructor;
94-
9598
void AdminClient::Init(v8::Local<v8::Object> exports) {
9699
Nan::HandleScope scope;
97100

@@ -108,7 +111,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
108111
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
109112
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);
110113

111-
constructor.Reset(
114+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor().Reset(
112115
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
113116
Nan::Set(exports, Nan::New("AdminClient").ToLocalChecked(),
114117
tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked());
@@ -155,7 +158,8 @@ v8::Local<v8::Object> AdminClient::NewInstance(v8::Local<v8::Value> arg) {
155158
const unsigned argc = 1;
156159

157160
v8::Local<v8::Value> argv[argc] = { arg };
158-
v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
161+
v8::Local<v8::Function> cons = Nan::New<v8::Function>(
162+
PerIsolateData::For(v8::Isolate::GetCurrent())->AdminClientConstructor());
159163
v8::Local<v8::Object> instance =
160164
Nan::NewInstance(cons, argc, argv).ToLocalChecked();
161165

src/admin.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
#include <nan.h>
1414
#include <uv.h>
15-
#include <iostream>
1615
#include <string>
1716
#include <vector>
1817

@@ -59,6 +58,8 @@ class AdminClient : public Connection {
5958
explicit AdminClient(Conf* globalConfig);
6059
~AdminClient();
6160

61+
static void delete_instance(void* arg);
62+
6263
rd_kafka_queue_t* rkqu;
6364

6465
private:

src/binding.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10-
#include <iostream>
1110
#include "src/binding.h"
1211

1312
using NodeKafka::Producer;
@@ -71,4 +70,4 @@ void Init(v8::Local<v8::Object> exports, v8::Local<v8::Value> m_, void* v_) {
7170
Nan::New(RdKafka::version_str().c_str()).ToLocalChecked());
7271
}
7372

74-
NODE_MODULE(kafka, Init)
73+
NODE_MODULE_CONTEXT_AWARE(kafka, Init)

src/callbacks.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ Dispatcher::~Dispatcher() {
6060
callbacks[i].Reset();
6161
}
6262

63+
Deactivate();
64+
6365
uv_mutex_destroy(&async_lock);
6466
}
6567

6668
// Only run this if we aren't already listening
6769
void Dispatcher::Activate() {
6870
if (!async) {
6971
async = new uv_async_t;
70-
uv_async_init(uv_default_loop(), async, AsyncMessage_);
72+
uv_async_init(Nan::GetCurrentEventLoop(), async, AsyncMessage_);
7173

7274
async->data = this;
7375
}

src/common.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
#include <nan.h>
1414

15-
#include <iostream>
1615
#include <string>
1716
#include <vector>
1817

src/config.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10+
#include <iostream>
1011
#include <string>
1112
#include <vector>
1213
#include <list>
@@ -36,7 +37,14 @@ void Conf::DumpConfig(std::list<std::string> *dump) {
3637

3738
Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object, std::string &errstr) { // NOLINT
3839
v8::Local<v8::Context> context = Nan::GetCurrentContext();
39-
Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));
40+
Conf* rdconf = new Conf(type);
41+
42+
if (type == CONF_GLOBAL)
43+
rdconf->rk_conf_ = rd_kafka_conf_new();
44+
else
45+
rdconf->rkt_conf_ = rd_kafka_topic_conf_new();
46+
47+
// Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));
4048

4149
v8::MaybeLocal<v8::Array> _property_names = object->GetOwnPropertyNames(
4250
Nan::GetCurrentContext());
@@ -150,6 +158,10 @@ Conf::~Conf() {
150158
if (m_rebalance_cb) {
151159
delete m_rebalance_cb;
152160
}
161+
162+
if (m_offset_commit_cb) {
163+
delete m_offset_commit_cb;
164+
}
153165
}
154166

155167
} // namespace NodeKafka

src/config.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@
1111
#define SRC_CONFIG_H_
1212

1313
#include <nan.h>
14-
#include <iostream>
1514
#include <vector>
1615
#include <list>
1716
#include <string>
1817

1918
#include "rdkafkacpp.h"
19+
#include "rdkafkacpp_int.h"
2020
#include "src/common.h"
2121
#include "src/callbacks.h"
2222

2323
namespace NodeKafka {
2424

25-
class Conf : public RdKafka::Conf {
25+
class Conf : public RdKafka::ConfImpl {
26+
private:
27+
Conf(RdKafka::Conf::ConfType type) : RdKafka::ConfImpl(type) {} // NOLINT
2628
public:
2729
~Conf();
2830

src/connection.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,16 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
5454
// Perhaps node new methods should report this as an error? But there
5555
// isn't anything the user can do about it.
5656
m_gconfig->set("event_cb", &m_event_cb, errstr);
57+
58+
node::AddEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
5759
}
5860

61+
void Connection::delete_instance(void* arg) {
62+
delete (static_cast<Connection*>(arg));
63+
}
64+
5965
Connection::~Connection() {
66+
node::RemoveEnvironmentCleanupHook(v8::Isolate::GetCurrent(), delete_instance, this);
6067
uv_rwlock_destroy(&m_connection_lock);
6168

6269
if (m_tconfig) {

0 commit comments

Comments
 (0)