-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdadjokes.js
More file actions
161 lines (145 loc) · 4.63 KB
/
dadjokes.js
File metadata and controls
161 lines (145 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
require('dotenv').config();
const fs = require('fs');
const { parse } = require('csv-parse');
const { Client } = require('@opensearch-project/opensearch');
// Create OpenSearch client
// Note: Adjust these settings according to your local OpenSearch configuration
const client = new Client({
node: 'https://localhost:9200',
auth: {
username: process.env.OPENSEARCH_USERNAME,
password: process.env.OPENSEARCH_PASSWORD
},
ssl: {
verify_certs: false,
rejectUnauthorized: false
}
});
// Function to create the index if it doesn't exist
async function createIndexIfNotExists() {
try {
const indexExists = await client.indices.exists({
index: 'dadjokes'
});
if (!indexExists.body) {
await client.indices.create({
index: 'dadjokes',
body: {
settings: {
"index.knn": true,
"default_pipeline": "dad-joke-pipeline"
},
mappings: {
properties: {
id: {
type: 'text'
},
"joke-embedding": {
type: 'knn_vector',
dimension: 768,
space_type: 'l2'
},
joke: {
type: 'keyword'
}
}
}
}
});
console.log('Index created successfully');
}
} catch (error) {
console.error('Error creating index:', error);
throw error;
}
}
async function createPipelineIfNotExists() {
try {
// Check if pipeline exists
const pipelineExists = await client.ingest.getPipeline({
id: 'dad-joke-pipeline'
}).catch(err => {
if (err.meta.statusCode === 404) {
return false;
}
throw err;
});
if (!pipelineExists) {
await client.ingest.putPipeline({
id: 'dad-joke-pipeline',
body: {
description: "An NLP ingest pipeline",
processors: [
{
text_embedding: {
model_id: process.env.MODEL_ID,
field_map: {
joke: "joke-embedding"
}
}
}
]
}
});
console.log('Pipeline created successfully');
} else {
console.log('Pipeline already exists');
}
} catch (error) {
console.error('Error handling pipeline:', error);
throw error;
}
}
async function initializeOpenSearch() {
try {
// First create the pipeline
await createPipelineIfNotExists();
// Then create the index
await createIndexIfNotExists();
console.log('OpenSearch initialization completed successfully');
} catch (error) {
console.error('Error during OpenSearch initialization:', error);
throw error;
}
}
// Function to index the jokes
async function indexJokes() {
try {
await initializeOpenSearch();
const parser = fs
.createReadStream('dad_jokes.csv')
.pipe(parse({
columns: true,
skip_empty_lines: true
}));
let bulkBody = [];
let count = 0;
for await (const record of parser) {
bulkBody.push(
{ index: { _index: 'dadjokes' } },
{ joke: record.joke, jid: record.jid },
);
count++;
if (bulkBody.length >= 4000) {
await client.bulk({ body: bulkBody });
bulkBody = [];
await new Promise(resolve => setTimeout(resolve, 1000));
}
process.stdout.write(`\rProcessed ${count} jokes`);
}
if (bulkBody.length > 0) {
await client.bulk({ body: bulkBody });
}
console.log('\nFinished indexing jokes');
} catch (error) {
console.error('Error indexing jokes:', error);
}
}
// Run the indexing process
indexJokes().then(() => {
console.log('Process completed');
client.close();
}).catch(error => {
console.error('Failed to complete the process:', error);
client.close();
});