| 
 | 1 | +import { Kafka } from "kafkajs";  | 
 | 2 | +import constants from "./common/constants.mjs";  | 
 | 3 | + | 
1 | 4 | export default {  | 
2 | 5 |   type: "app",  | 
3 | 6 |   app: "kafka",  | 
4 |  | -  propDefinitions: {},  | 
 | 7 | +  propDefinitions: {  | 
 | 8 | +    topic: {  | 
 | 9 | +      type: "string",  | 
 | 10 | +      label: "Topic",  | 
 | 11 | +      description: "The topic to interact with.",  | 
 | 12 | +      options() {  | 
 | 13 | +        return this.listTopics();  | 
 | 14 | +      },  | 
 | 15 | +    },  | 
 | 16 | +  },  | 
5 | 17 |   methods: {  | 
6 |  | -    // this.$auth contains connected account data  | 
7 |  | -    authKeys() {  | 
8 |  | -      console.log(Object.keys(this.$auth));  | 
 | 18 | +    getBrokers() {  | 
 | 19 | +      const {  | 
 | 20 | +        host,  | 
 | 21 | +        port,  | 
 | 22 | +      } = this.$auth;  | 
 | 23 | +      return [  | 
 | 24 | +        `${host}:${port}`,  | 
 | 25 | +      ];  | 
 | 26 | +    },  | 
 | 27 | +    getClient() {  | 
 | 28 | +      return new Kafka({  | 
 | 29 | +        clientId: "Pipedream",  | 
 | 30 | +        brokers: this.getBrokers(),  | 
 | 31 | +      });  | 
 | 32 | +    },  | 
 | 33 | +    getApiClient(api, config) {  | 
 | 34 | +      return this.getClient()[api](config);  | 
 | 35 | +    },  | 
 | 36 | +    async withApi(fn, api = constants.API.ADMIN, config) {  | 
 | 37 | +      const apiClient = this.getApiClient(api, config);  | 
 | 38 | +      await apiClient.connect();  | 
 | 39 | +      try {  | 
 | 40 | +        return await fn(apiClient);  | 
 | 41 | +      } finally {  | 
 | 42 | +        await apiClient.disconnect();  | 
 | 43 | +      }  | 
 | 44 | +    },  | 
 | 45 | +    listTopics() {  | 
 | 46 | +      return this.withApi((admin) => admin.listTopics());  | 
 | 47 | +    },  | 
 | 48 | +    listGroups() {  | 
 | 49 | +      return this.withApi((admin) => admin.listGroups());  | 
 | 50 | +    },  | 
 | 51 | +    createTopics(args = {}) {  | 
 | 52 | +      return this.withApi((admin) => admin.createTopics(args));  | 
 | 53 | +    },  | 
 | 54 | +    deleteTopics(args = {}) {  | 
 | 55 | +      return this.withApi((admin) => admin.deleteTopics(args));  | 
 | 56 | +    },  | 
 | 57 | +    deleteGroups(args = {}) {  | 
 | 58 | +      return this.withApi((admin) => admin.deleteGroups(args));  | 
 | 59 | +    },  | 
 | 60 | +    sendMessages(args = {}) {  | 
 | 61 | +      return this.withApi((producer) => producer.send(args), constants.API.PRODUCER);  | 
 | 62 | +    },  | 
 | 63 | +    async messageListener({  | 
 | 64 | +      topic, fromBeginning = true, onMessage, groupId,  | 
 | 65 | +    } = {}) {  | 
 | 66 | +      const config = {  | 
 | 67 | +        groupId,  | 
 | 68 | +      };  | 
 | 69 | +      const consumer = this.getApiClient(constants.API.CONSUMER, config);  | 
 | 70 | +      await consumer.connect();  | 
 | 71 | +      await consumer.subscribe({  | 
 | 72 | +        topic,  | 
 | 73 | +        fromBeginning,  | 
 | 74 | +      });  | 
 | 75 | +      await consumer.run({  | 
 | 76 | +        eachMessage: onMessage,  | 
 | 77 | +      });  | 
 | 78 | +      return consumer;  | 
9 | 79 |     },  | 
10 | 80 |   },  | 
11 | 81 | };  | 
0 commit comments