|  | 
|  | 1 | +import { Readable } from "stream"; | 
|  | 2 | +import { fileTypeFromBuffer } from "file-type"; | 
|  | 3 | + | 
|  | 4 | +export default { | 
|  | 5 | +  props: { | 
|  | 6 | +    includeLink: { | 
|  | 7 | +      label: "Include Link", | 
|  | 8 | +      type: "boolean", | 
|  | 9 | +      description: "Upload file to your File Stash and emit temporary download link to the file. See [the docs](https://pipedream.com/docs/connect/components/files) to learn more about working with files in Pipedream.", | 
|  | 10 | +      default: false, | 
|  | 11 | +      optional: true, | 
|  | 12 | +    }, | 
|  | 13 | +    dir: { | 
|  | 14 | +      type: "dir", | 
|  | 15 | +      accessMode: "write", | 
|  | 16 | +      optional: true, | 
|  | 17 | +    }, | 
|  | 18 | +  }, | 
|  | 19 | +  methods: { | 
|  | 20 | +    async streamToBuffer(stream) { | 
|  | 21 | +      const chunks = []; | 
|  | 22 | +      for await (const chunk of stream) { | 
|  | 23 | +        chunks.push(chunk); | 
|  | 24 | +      } | 
|  | 25 | +      return Buffer.concat(chunks); | 
|  | 26 | +    }, | 
|  | 27 | +    async stashFile(item) { | 
|  | 28 | +      const { Body } = await this.getObject({ | 
|  | 29 | +        Bucket: item.bucket.name, | 
|  | 30 | +        Key: item.object.key.replace(/\+/g, " "), | 
|  | 31 | +      }); | 
|  | 32 | +      const filepath = `${item.bucket.name}/${item.object.key}`; | 
|  | 33 | +      const buffer = await this.streamToBuffer(Body); | 
|  | 34 | +      const type = await fileTypeFromBuffer(buffer); | 
|  | 35 | +      // Upload the attachment to the configured directory (File Stash) so it | 
|  | 36 | +      // can be accessed later. | 
|  | 37 | +      const file = await this.dir.open(filepath).fromReadableStream( | 
|  | 38 | +        Readable.from(buffer), | 
|  | 39 | +        type?.mime, | 
|  | 40 | +        buffer.length, | 
|  | 41 | +      ); | 
|  | 42 | +      // Return file details and temporary download link: | 
|  | 43 | +      // { path, get_url, s3Key, type } | 
|  | 44 | +      return await file.withoutPutUrl().withGetUrl(); | 
|  | 45 | +    }, | 
|  | 46 | +    async processEvent(event) { | 
|  | 47 | +      const { Message: rawMessage } = event.body; | 
|  | 48 | +      const { | 
|  | 49 | +        Records: s3Events = [], | 
|  | 50 | +        Event: eventType, | 
|  | 51 | +      } = JSON.parse(rawMessage); | 
|  | 52 | + | 
|  | 53 | +      if (eventType === "s3:TestEvent") { | 
|  | 54 | +        console.log("Received initial test event. Skipping..."); | 
|  | 55 | +        return; | 
|  | 56 | +      } | 
|  | 57 | + | 
|  | 58 | +      for (const s3Event of s3Events) { | 
|  | 59 | +        const meta = this.generateMeta(s3Event); | 
|  | 60 | +        let { s3: item } = s3Event; | 
|  | 61 | +        if (this.includeLink) { | 
|  | 62 | +          item.file = await this.stashFile(item); | 
|  | 63 | +        } | 
|  | 64 | +        this.$emit(item, meta); | 
|  | 65 | +      } | 
|  | 66 | +    }, | 
|  | 67 | +  }, | 
|  | 68 | +}; | 
0 commit comments