Replies: 9 comments 14 replies
-
Hey @suityou01 👋 You may find the correct options here: https://roadrunner.dev/docs/queues-kafka/current/en |
Beta Was this translation helpful? Give feedback.
-
This makes sense. I have the worker package installed and yet I get this error
{
"require": {
"spiral/roadrunner-cli": "^2.6",
"spiral/roadrunner-http": "^3.3",
"nyholm/psr7": "^1.8",
"spiral/roadrunner-worker": "^3.3",
"spiral/roadrunner-jobs": "^4.3",
"spiral/roadrunner": "^2023.3"
}
} <?php
use Spiral\RoadRunner\Environment;
use Spiral\RoadRunner\Environment\Mode;
// 1. Using global env variable
$isJobsMode = $_SERVER['RR_MODE'] === 'jobs';
// 2. Using RoadRunner's API
$env = Environment::fromGlobals();
$isJobsMode = $env->getMode() === Mode::MODE_JOBS;
$consumer = new Consumer();
/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
} Any ideas? |
Beta Was this translation helpful? Give feedback.
-
Yep problem exists between chair and monitor. :-) working code looks like <?php
use Spiral\RoadRunner\Environment;
use Spiral\RoadRunner\Environment\Mode;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;
include "vendor/autoload.php";
// 1. Using global env variable
$isJobsMode = $_SERVER['RR_MODE'] === 'jobs';
// 2. Using RoadRunner's API
$env = Environment::fromGlobals();
$isJobsMode = $env->getMode() === Mode::MODE_JOBS;
$consumer = new Consumer();
/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
} |
Beta Was this translation helpful? Give feedback.
-
Not getting any output when the messages are flowing through kafka though... version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
brokers: [ broker:9092 ]
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
logs:
level: error |
Beta Was this translation helpful? Give feedback.
-
So here is what I get in my container log 2023-12-21T12:38:20+0000 DEBUG rpc plugin was started {"address": "tcp://127.0.0.1:6001", "list of the plugins with RPC methods:": ["lock", "resetter", "informer", "jobs", "app"]}
2023-12-21T12:38:20+0000 DEBUG jobs initializing driver {"pipeline": "kafka", "driver": "kafka"}
2023-12-21T12:38:20+0000 DEBUG kafka ping kafka: ok {"driver": "kafka", "pipeline": "kafka"}
2023-12-21T12:38:20+0000 DEBUG jobs driver ready {"pipeline": "kafka", "driver": "kafka", "start": "2023-12-21T12:38:20+0000", "elapsed": "3.843019ms"}
2023-12-21T12:38:20+0000 DEBUG server worker is allocated {"pid": 16, "internal_event_name": "EventWorkerConstruct"}
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
[INFO] RoadRunner server started; version: 2023.3.8, buildtime: 2023-12-14T16:05:26+0000
[INFO] sdnotify: not notified
Loop now looks like /** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
$task->complete();
} Config now looks like version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
logs:
level: debug Maybe something is coming through but I cannot see the output from var_dump if it is. |
Beta Was this translation helpful? Give feedback.
-
I think those debug messages are on startup. So no messages are coming through it seems. |
Beta Was this translation helpful? Give feedback.
-
Kafka does not report it as a consumer so it looks like the handshake is failing. How do I get more output from this black box? |
Beta Was this translation helpful? Give feedback.
-
OK here is the latest. It is not collecting any messages from kafka. version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
auto_create_topics_enable: true
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
group_options:
group_id: connect-debezium
consume:
[
"kafka"
]
logs:
level: debug
output: stdout
I have a sink connector which sinks the messages with elasticsearch so I know that kafka is working. I can see the messages through the kafka-ui Nothing is reaching the kafka driver in roadrunner. I literally have no more clue what else to do to get this to work? I have followed all your instructions I think very well. I really want to use this product in my project but if I cannot get it to work or get help to get it to work then I don't think it's ready for general use. |
Beta Was this translation helpful? Give feedback.
-
An error message! 2023-12-21T15:38:32+0000 DEBUG rpc plugin was started {"address": "tcp://127.0.0.1:6001", "list of the plugins with RPC methods:": ["lock", "app", "informer", "jobs", "resetter"]}
2023-12-21T15:38:32+0000 DEBUG jobs initializing driver {"pipeline": "kafka", "driver": "kafka"}
2023-12-21T15:38:32+0000 DEBUG kafka ping kafka: ok {"driver": "kafka", "pipeline": "kafka"}
2023-12-21T15:38:32+0000 DEBUG jobs driver ready {"pipeline": "kafka", "driver": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "5.560543ms"}
2023-12-21T15:38:32+0000 DEBUG kafka pipeline was started {"driver": "kafka", "pipeline": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "3.234µs"}
2023-12-21T15:38:32+0000 ERROR kafka non-retriable consumer error {"topic": "", "partition": 0, "code": 23, "description": "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.", "message": "INCONSISTENT_GROUP_PROTOCOL"}
2023-12-21T15:38:32+0000 DEBUG kafka kafka listener stopped
2023-12-21T15:38:32+0000 ERROR kafka listener error {"error": "unable to join group session: INCONSISTENT_GROUP_PROTOCOL: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list."}
2023-12-21T15:38:32+0000 DEBUG kafka pipeline was stopped {"driver": "kafka", "pipeline": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "3.450471ms"}
2023-12-21T15:38:32+0000 DEBUG server worker is allocated {"pid": 16, "internal_event_name": "EventWorkerConstruct"}
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
[INFO] RoadRunner server started; version: 2023.3.8, buildtime: 2023-12-14T16:05:26+0000
[INFO] sdnotify: not notified
Process docker container logs smart_planning_engine finished |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am having problems just getting the basic config set up.
Error "Did not find expected key by parsing a block mapping"
NB The example has a key called topic at the jobs.pipelines.queue-name.config level which is confusing
e.g.
So we have to specify a topic here (single topic, which one??)
And for the consumer we have to specify a topics key
How does this work?
Beta Was this translation helpful? Give feedback.
All reactions