Skip to content

Commit 4e41bc1

Browse files
authored
update notebooks (#869)
1 parent f2adc5c commit 4e41bc1

File tree

5 files changed

+566
-85
lines changed

5 files changed

+566
-85
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "3ea6bd8d",
6+
"metadata": {},
7+
"source": [
8+
"## Pipeline Example\n",
9+
"\n",
10+
"1. Create input and processor chain"
11+
]
12+
},
13+
{
14+
"cell_type": "code",
15+
"execution_count": 16,
16+
"id": "c2f1dd6d",
17+
"metadata": {},
18+
"outputs": [],
19+
"source": [
20+
"from logprep.factory import Factory\n",
21+
"from logprep.ng.abc.processor import Processor\n",
22+
"from logprep.ng.event.log_event import LogEvent\n",
23+
"from logprep.ng.event.event_state import EventStateType\n",
24+
"from logprep.ng.pipeline import Pipeline\n",
25+
"\n",
26+
"\n",
27+
"input_connector = iter(\n",
28+
" [\n",
29+
" LogEvent({\"message\": \"Log message 1\"}, original=b\"\", state=EventStateType.RECEIVED),\n",
30+
" LogEvent({\"message\": \"Log message 2\"}, original=b\"\", state=EventStateType.RECEIVED),\n",
31+
" LogEvent({\"user\": {\"name\": \"John Doe\"}}, original=b\"\", state=EventStateType.RECEIVED),\n",
32+
" ]\n",
33+
")\n",
34+
"\n",
35+
"def get_processors() -> list[Processor]:\n",
36+
" processors = [\n",
37+
" Factory.create(\n",
38+
" {\n",
39+
" \"processor\": {\n",
40+
" \"type\": \"ng_generic_adder\",\n",
41+
" \"rules\": [\n",
42+
" {\n",
43+
" \"filter\": \"*\",\n",
44+
" \"generic_adder\": {\"add\": {\"event.tags\": \"generic added tag\"}},\n",
45+
" }\n",
46+
" ],\n",
47+
" }\n",
48+
" }\n",
49+
" ),\n",
50+
" Factory.create(\n",
51+
" {\n",
52+
" \"pseudo_this\": {\n",
53+
" \"type\": \"ng_pseudonymizer\",\n",
54+
" \"pubkey_analyst\": \"../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem\",\n",
55+
" \"pubkey_depseudo\": \"../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem\",\n",
56+
" \"regex_mapping\": \"../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml\",\n",
57+
" \"hash_salt\": \"a_secret_tasty_ingredient\",\n",
58+
" \"outputs\": [{\"opensearch\": \"pseudonyms\"}],\n",
59+
" \"rules\": [\n",
60+
" {\n",
61+
" \"filter\": \"user.name\",\n",
62+
" \"pseudonymizer\": {\n",
63+
" \"id\": \"pseudonymizer-1a3c69b2-5d54-4b6b-ab07-c7ddbea7917c\",\n",
64+
" \"mapping\": {\"user.name\": \"RE_WHOLE_FIELD\"},\n",
65+
" },\n",
66+
" }\n",
67+
" ],\n",
68+
" \"max_cached_pseudonyms\": 1000000,\n",
69+
" }\n",
70+
" }\n",
71+
" ),\n",
72+
" ]\n",
73+
" for processor in processors:\n",
74+
" processor.setup()\n",
75+
" return processors\n",
76+
"\n",
77+
"processors = get_processors()\n"
78+
]
79+
},
80+
{
81+
"cell_type": "markdown",
82+
"id": "ec064e0b",
83+
"metadata": {},
84+
"source": [
85+
"2. create the pipeline"
86+
]
87+
},
88+
{
89+
"cell_type": "code",
90+
"execution_count": 17,
91+
"id": "454c2e8d",
92+
"metadata": {},
93+
"outputs": [],
94+
"source": [
95+
"\n",
96+
"pipeline = Pipeline(input_connector, processors)"
97+
]
98+
},
99+
{
100+
"cell_type": "markdown",
101+
"id": "47780fd6",
102+
"metadata": {},
103+
"source": [
104+
"3. run the pipeline"
105+
]
106+
},
107+
{
108+
"cell_type": "code",
109+
"execution_count": 18,
110+
"id": "90dd7cfe",
111+
"metadata": {},
112+
"outputs": [
113+
{
114+
"name": "stdout",
115+
"output_type": "stream",
116+
"text": [
117+
"Processed event: {'message': 'Log message 1', 'event': {'tags': 'generic added tag'}}\n",
118+
"Event state: processed\n",
119+
"generated extra_data: []\n",
120+
"----------------------------------------\n",
121+
"Processed event: {'message': 'Log message 2', 'event': {'tags': 'generic added tag'}}\n",
122+
"Event state: processed\n",
123+
"generated extra_data: []\n",
124+
"----------------------------------------\n",
125+
"Processed event: {'user': {'name': '<pseudonym:12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b>'}, 'event': {'tags': 'generic added tag'}}\n",
126+
"Event state: processed\n",
127+
"generated extra_data: [PseudonymEvent(data={'pseudonym': '12f7a0505314df2259513546a7e3da518098b6999443ff681a56fb752afa998b', 'origin': 'KeHunzkJWuA27pZ8jicAHPXGoRYc27Ko+uQlQQTS9KT1V84dnApS0tCI3vTRhMawSD+ZTS+HRqt1nIYiKX4B3pVAlghK3PlOzVwxO6Gkktg12GNp++aTW5b7+aIClmHa8IoiBr/Nhg5ld9ctkmndbkm149zohKQlox67rellfEY=:XzZybCLehuWhWJD+JXDOQw==:DU6nasNCIDsMsc86gQWjES7k2Zmv++2gnuaXmX9DzwOjE2B5PY4pTvPJe54hvKn2RqL2IPX1q0cAjr5zWzexNEAKjcrNCyCjQRTUWgTLUhwC/Jx7COQrxjpfMEPOfDwVgdQHHMV7VJ+ErGf80ETFU0GD3jupBA0GyH5OJNr45qB3lVgUfwpHzazhMBQ2IRx2FpVYyymANecfeFjz/inWmxcrr6AueoM7lj4wJhzMVizcHmHEDzqw7Smo4Gv6DV2YGG/7HqpZiCF+ky5A7ukAf3reC3YzzsdCb/y5DH5/NJzPJKcR3Dio3W8TYQw/VP0jd9AwJlKxidrSCh342nLh0Q==:piBPxAWte9b2zMnjppX3uA==:Ht1eEIEr+xg='}, state=processed)]\n",
128+
"----------------------------------------\n"
129+
]
130+
}
131+
],
132+
"source": [
133+
"for event in pipeline:\n",
134+
" print(f\"Processed event: {event.data}\")\n",
135+
" print(f\"Event state: {event.state}\")\n",
136+
" print(f\"generated extra_data: {event.extra_data}\")\n",
137+
" print(\"-\" * 40)"
138+
]
139+
}
140+
],
141+
"metadata": {
142+
"kernelspec": {
143+
"display_name": ".venv",
144+
"language": "python",
145+
"name": "python3"
146+
},
147+
"language_info": {
148+
"codemirror_mode": {
149+
"name": "ipython",
150+
"version": 3
151+
},
152+
"file_extension": ".py",
153+
"mimetype": "text/x-python",
154+
"name": "python",
155+
"nbconvert_exporter": "python",
156+
"pygments_lexer": "ipython3",
157+
"version": "3.12.3"
158+
}
159+
},
160+
"nbformat": 4,
161+
"nbformat_minor": 5
162+
}

doc/source/development/notebooks/new_architecture_examples/7_processors.ipynb renamed to doc/source/development/notebooks/new_architecture_examples/8_processors.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
"source": [
148148
"print(f\"Event before processing: {json.dumps(event.data, indent=2)}\")\n",
149149
"\n",
150-
"# Pseudonymization\n",
150+
"# Predetection example\n",
151151
"config = {\n",
152152
" \"almighty pre_detector\": {\n",
153153
" \"type\": \"ng_pre_detector\",\n",
@@ -279,7 +279,7 @@
279279
"name": "python",
280280
"nbconvert_exporter": "python",
281281
"pygments_lexer": "ipython3",
282-
"version": "3.12.3"
282+
"version": "3.11.11"
283283
},
284284
"orig_nbformat": 4
285285
},
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "3ea6bd8d",
6+
"metadata": {},
7+
"source": [
8+
"## Input Connectors -> ConfluentKafka Input\n",
9+
"\n",
10+
"1. prepare the kafka cluster"
11+
]
12+
},
13+
{
14+
"cell_type": "code",
15+
"execution_count": null,
16+
"id": "c7a86700",
17+
"metadata": {
18+
"vscode": {
19+
"languageId": "shellscript"
20+
}
21+
},
22+
"outputs": [],
23+
"source": [
24+
"%%bash\n",
25+
"# starting kafka container\n",
26+
"docker compose -f ../../../../../examples/compose/docker-compose.yml down -v \n",
27+
"docker compose -f ../../../../../examples/compose/docker-compose.yml up -d kafka\n",
28+
"# creating the topic\n",
29+
"docker exec -i kafka /bin/bash -c \"/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic consumer --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1\"\n",
30+
"# waiting for kafka to be ready\n",
31+
"sleep 10\n"
32+
]
33+
},
34+
{
35+
"cell_type": "markdown",
36+
"id": "3c7a70d0",
37+
"metadata": {},
38+
"source": [
39+
"2. produce messages"
40+
]
41+
},
42+
{
43+
"cell_type": "code",
44+
"execution_count": null,
45+
"id": "458b38c1",
46+
"metadata": {
47+
"vscode": {
48+
"languageId": "shellscript"
49+
}
50+
},
51+
"outputs": [],
52+
"source": [
53+
"%%bash\n",
54+
"# producing 3 events to kafka topic consumer\n",
55+
"docker exec -i kafka /bin/bash -c \"echo '{\\\"message\\\": \\\"the message\\\"}' | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic consumer \"\n",
56+
"# showing events in kafka\n",
57+
"# docker exec -i kafka /bin/bash -c \"/opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer --from-beginning --max-messages 10\"\n"
58+
]
59+
},
60+
{
61+
"cell_type": "markdown",
62+
"id": "3f58d924",
63+
"metadata": {},
64+
"source": [
65+
"3. initializing the confluentkafka_input connector"
66+
]
67+
},
68+
{
69+
"cell_type": "code",
70+
"execution_count": null,
71+
"id": "e3cd8f43",
72+
"metadata": {},
73+
"outputs": [],
74+
"source": [
75+
"import sys\n",
76+
"import logging\n",
77+
"from logprep.factory import Factory\n",
78+
"from logprep.ng.connector.confluent_kafka.input import ConfluentKafkaInput\n",
79+
"\n",
80+
"# Configure logging\n",
81+
"logging.basicConfig(\n",
82+
" level=logging.DEBUG, \n",
83+
" stream=sys.stdout\n",
84+
")\n",
85+
"\n",
86+
"# Create a Kafka input connector configuration\n",
87+
"\n",
88+
"kafka_config = {\n",
89+
" \"kafka\": {\n",
90+
" \"type\": \"ng_confluentkafka_input\",\n",
91+
" \"topic\": \"consumer\",\n",
92+
" \"kafka_config\": {\n",
93+
" \"bootstrap.servers\": \"127.0.0.1:9092\",\n",
94+
" \"group.id\": \"cgroup3\",\n",
95+
" \"enable.auto.commit\": \"true\",\n",
96+
" \"auto.commit.interval.ms\": \"10000\",\n",
97+
" \"enable.auto.offset.store\": \"false\",\n",
98+
" \"queued.min.messages\": \"100000\",\n",
99+
" \"queued.max.messages.kbytes\": \"65536\",\n",
100+
" \"statistics.interval.ms\": \"60000\"\n",
101+
" }\n",
102+
" }\n",
103+
"}\n",
104+
"\n",
105+
"kafka_connector: ConfluentKafkaInput = Factory.create(kafka_config)\n",
106+
"\n",
107+
"# Start the connector\n",
108+
"kafka_connector.setup()\n",
109+
"\n",
110+
"# show the current backlog\n",
111+
"print(f\"{kafka_connector.event_backlog.backlog=}\")\n",
112+
"\n",
113+
"# Consume 3 messages from the Kafka topic\n",
114+
"event = next(kafka_connector(timeout=10))\n",
115+
"event = next(kafka_connector(timeout=10))\n",
116+
"event = next(kafka_connector(timeout=10))\n",
117+
"\n",
118+
"# show the backlog after consuming an event\n",
119+
"print(f\"{kafka_connector.event_backlog.backlog=}\")\n",
120+
"\n",
121+
"# Shut down the connector to close the producer\n",
122+
"kafka_connector.shut_down()\n"
123+
]
124+
}
125+
],
126+
"metadata": {
127+
"kernelspec": {
128+
"display_name": ".venv",
129+
"language": "python",
130+
"name": "python3"
131+
},
132+
"language_info": {
133+
"codemirror_mode": {
134+
"name": "ipython",
135+
"version": 3
136+
},
137+
"file_extension": ".py",
138+
"mimetype": "text/x-python",
139+
"name": "python",
140+
"nbconvert_exporter": "python",
141+
"pygments_lexer": "ipython3",
142+
"version": "3.12.3"
143+
}
144+
},
145+
"nbformat": 4,
146+
"nbformat_minor": 5
147+
}

0 commit comments

Comments
 (0)