Skip to content

Conversation

@binLep
Copy link

@binLep binLep commented Jul 22, 2025

To support the compression algorithm on HTTP, it is need to set the response encoding format when uploading

To support the compression algorithm on HTTP, it is need to set the response encoding format when uploading
@mludvig
Copy link
Contributor

mludvig commented Oct 15, 2025

Can you provide a usecase / testcase that this patch fixes please?

@binLep
Copy link
Author

binLep commented Jan 8, 2026

Can you provide a usecase / testcase that this patch fixes please?

image

We can use tools that support the deflate algorithm to view files.

rrr

The following is the code

# -*- coding: utf-8 -*-
import os
import sys
import time
import errno
import shutil
import socket
import tempfile
import argparse
import datetime

from copy import copy
from logging import debug, info, warning, error

try:
    import S3.ExitCodes
    import S3.Exceptions
    import S3.S3
    import S3.Config
    import S3.FileLists
    import S3.S3Uri
    import S3.Utils
except Exception as e:
    error(u"S3 error: Error loading some components of s3cmd (Import Error)")
    sys.exit(1)


def _build_attr_header(src_obj, src_relative_name, md5=None):
    cfg = S3.Config.Config()
    attrs = {}
    if cfg.preserve_attrs:
        for attr in cfg.preserve_attrs_list:
            val = None
            if attr == 'uname':
                try:
                    val = S3.Utils.urlencode_string(S3.Utils.getpwuid_username(src_obj['uid']), unicode_output=True)
                except (KeyError, TypeError):
                    attr = "uid"
                    val = src_obj.get('uid')
                    if val:
                        warning(u"%s: Owner username not known. Storing UID=%d instead." % (src_relative_name, val))
            elif attr == 'gname':
                try:
                    val = S3.Utils.urlencode_string(S3.Utils.getgrgid_grpname(src_obj.get('gid')), unicode_output=True)
                except (KeyError, TypeError):
                    attr = "gid"
                    val = src_obj.get('gid')
                    if val:
                        warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (src_relative_name, val))
            elif attr != "md5":
                try:
                    val = getattr(src_obj['sr'], 'st_' + attr)
                except Exception:
                    val = None
            if val is not None:
                attrs[attr] = val

    if 'md5' in cfg.preserve_attrs_list and md5:
        attrs['md5'] = md5

    if attrs:
        attr_str_list = []
        for k in sorted(attrs.keys()):
            attr_str_list.append(u"%s:%s" % (k, attrs[k]))
        attr_header = {'x-amz-meta-s3cmd-attrs': u'/'.join(attr_str_list)}
    else:
        attr_header = {}

    return attr_header


def output(message):
    sys.stdout.write(message + "\n")
    sys.stdout.flush()


def cmd_object_put(_cfg: S3.Config.Config, time_start: datetime.datetime, _args: list) -> int:
    s3 = S3.S3.S3(_cfg)
    if len(_args) == 0:
        raise S3.Exceptions.ParameterError(
            "Nothing to upload. Expecting a local file or directory and a S3 URI destination.")

    destination_base_uri = S3.S3Uri.S3Uri(_args.pop())
    if destination_base_uri.type != 's3':
        raise S3.Exceptions.ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
    destination_base = destination_base_uri.uri()

    if len(_args) == 0:
        raise S3.Exceptions.ParameterError("Nothing to upload. Expecting a local file or directory.")

    local_list, single_file_local, exclude_list, total_size_local = S3.FileLists.fetch_local_list(
        _args, is_src=True, with_dirs=_cfg.keep_dirs)

    local_count = len(local_list)

    info(u"Summary: %d local files to upload" % local_count)

    if local_count == 0:
        raise S3.Exceptions.ParameterError("Nothing to upload.")

    if local_count > 0:
        if not single_file_local and '-' in local_list.keys():
            raise S3.Exceptions.ParameterError("Cannot specify multiple local files if uploading from '-' (ie stdin)")
        elif single_file_local and local_list.keys()[0] == "-" and destination_base.endswith("/"):
            raise S3.Exceptions.ParameterError("Destination S3 URI must not end with '/' when uploading from stdin.")
        elif not destination_base.endswith("/"):
            if not single_file_local:
                raise S3.Exceptions.ParameterError(
                    "Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
            local_list[local_list.keys()[0]]['remote_uri'] = destination_base
        else:
            for key in local_list:
                local_list[key]['remote_uri'] = destination_base + key

    seq = 0
    ret = S3.ExitCodes.EX_OK
    for key in local_list:
        seq += 1

        uri_final = S3.S3Uri.S3Uri(local_list[key]['remote_uri'])
        try:
            src_md5 = local_list.get_md5(key)
        except IOError:
            src_md5 = None

        extra_headers = copy(_cfg.extra_headers)
        full_name_orig = local_list[key]['full_name']
        full_name = full_name_orig
        seq_label = "[%d of %d]" % (seq, local_count)
        attr_header = _build_attr_header(local_list[key], key, src_md5)
        debug(u"attr_header: %s" % attr_header)
        extra_headers.update(attr_header)
        try:
            response = s3.object_put(full_name, uri_final, extra_headers, extra_label=seq_label)
            
            # 方法2:日期时间计时
            delta = datetime.datetime.now() - time_start
            elapsed = response['elapsed']
            print(f"time interval: {delta.total_seconds()*1000:.3f} ms; elapsed: {elapsed*1000:.3f} ms")
        except S3.Exceptions.S3UploadError as exc:
            error(u"Upload of '%s' failed too many times (Last reason: %s)" % (full_name_orig, exc))
            ret = S3.ExitCodes.EX_PARTIAL
            continue
        except S3.Exceptions.InvalidFileError as exc:
            error(u"Upload of '%s' is not possible (Reason: %s)" % (full_name_orig, exc))
            ret = S3.ExitCodes.EX_PARTIAL
            continue
    return ret


def s3cmd(file_path: str, time_start: datetime.datetime, extra_headers: dict) -> int:
    try:
        cfg = S3.Config.Config()
        cfg.access_key = "{your_ak}"
        cfg.secret_key = "{your_sk}"
        cfg.host_base = "{your_host}"
        cfg.host_bucket = "{}/%(bucket)".format(cfg.host_base)
        cfg.use_https = False
        cfg.encoding = "utf-8"
        cfg.extra_headers.update(extra_headers)
        
        args = []
        args.append(file_path)
        args.append("s3://path1/path2/")
        args = [S3.Utils.unicodise(arg) for arg in args]
        rc = cmd_object_put(cfg, time_start, args)
        return rc
    except ImportError as e:
        error(u"S3 error: %s" % e)
        return S3.ExitCodes.EX_GENERAL

    except (S3.Exceptions.ParameterError, S3.Exceptions.InvalidFileError) as e:
        error(u"Parameter problem: %s" % e)
        return S3.ExitCodes.EX_USAGE

    except (S3.Exceptions.S3DownloadError, S3.Exceptions.S3UploadError, S3.Exceptions.S3RequestError) as e:
        error(u"S3 Temporary Error: %s.  Please try again later." % e)
        return S3.ExitCodes.EX_TEMPFAIL

    except S3.Exceptions.S3Error as e:
        error(u"S3 error: %s" % e)
        return e.get_error_code()

    except (S3.Exceptions.S3Exception, S3.Exceptions.S3ResponseError, S3.Exceptions.CloudFrontError) as e:
        error(u"S3 error: %s" % e)
        return S3.ExitCodes.EX_SOFTWARE

    except SystemExit as e:
        return e.code

    except KeyboardInterrupt:
        return S3.ExitCodes.EX_BREAK

    except (S3.Exceptions.S3SSLError, S3.Exceptions.S3SSLCertificateError) as e:
        error("SSL certificate verification failure: %s" % e)
        return S3.ExitCodes.EX_ACCESSDENIED

    except ConnectionRefusedError as e:
        error("Could not connect to server: %s" % e)
        return S3.ExitCodes.EX_CONNECTIONREFUSED

    except socket.gaierror as e:
        error(e)
        error("Connection Error: Error resolving a server hostname.\n"
              "Please check the servers address specified in 'host_base', 'host_bucket', 'cloudfront_host', 'website_endpoint'")
        return S3.ExitCodes.EX_IOERR

    except IOError as e:
        if e.errno in (errno.ECONNREFUSED, errno.EHOSTUNREACH):
            error("Could not connect to server: %s" % e)
            return S3.ExitCodes.EX_CONNECTIONREFUSED

        if e.errno == errno.EPIPE:
            return S3.ExitCodes.EX_IOERR

        return S3.ExitCodes.EX_IOERR

    except MemoryError:
        msg = """
MemoryError!  You have exceeded the amount of memory available for this process.
This usually occurs when syncing >750,000 files on a 32-bit python instance.
The solutions to this are:
1) sync several smaller subtrees; or
2) use a 64-bit python on a 64-bit OS with >8GB RAM
        """
        sys.stderr.write(msg)
        return S3.ExitCodes.EX_OSERR

    except UnicodeEncodeError as e:
        lang = S3.Utils.unicodise_s(os.getenv("LANG", "NOTSET"), 'ascii')
        msg = """
You have encountered a UnicodeEncodeError.  Your environment
variable LANG=%s may not specify a Unicode encoding (e.g. UTF-8).
Please set LANG=en_US.UTF-8 or similar in your environment before
invoking s3cmd.
        """ % lang
        error(u"S3 error: %s" % e)
        return S3.ExitCodes.EX_GENERAL

    except Exception as e:
        error(u"S3 error: %s" % e)
        return S3.ExitCodes.EX_GENERAL


import watchdog.events
import watchdog.observers
import zlib
import gzip


def compress_deflate(inp_file, chunk_size=1024 * 1024):
    obj = zlib.compressobj(level=9, wbits=-15)
    
    with open(inp_file, 'rb') as src, tempfile.NamedTemporaryFile(delete=False, suffix='.deflate') as tmp:
        
        while True:
            chunk = src.read(chunk_size)
            if not chunk:
                break
            tmp.write(obj.compress(chunk))
            
        tmp.write(obj.flush())
    return tmp.name


def compress_gzip(inp_file):
    with tempfile.NamedTemporaryFile(delete=False, suffix='.gz') as tmp:
        with open(inp_file, 'rb') as src, gzip.open(tmp.name, 'wb') as dst:
            shutil.copyfileobj(src, dst)
    return tmp.name


class FileEventHandler(watchdog.events.FileSystemEventHandler):

    def __init__(self):
        watchdog.events.FileSystemEventHandler.__init__(self)
        pass

    def on_created(self, event):
        filepath = event.src_path
        if type(filepath) == bytes:
            filepath = filepath.decode()
        
        tmppath = None
        
        time_start = datetime.datetime.now()
        # 创建临时文件,进行数据压缩
        if True:
            tmppath = compress_deflate(filepath)
            headers = {
                "content-encoding": "deflate"
            }
        else:
            tmppath = compress_gzip(filepath)
            headers = {
                "content-encoding": "gzip"
            }
        
        if tmppath is not None:
            print(tmppath)
            try:
                s3cmd(tmppath, time_start, headers)
            except Exception as e:
                error(u"on_created: %s" % e)
        
            os.remove(tmppath)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--path", help="要监控的文件夹路径")
    args = parser.parse_args()
    g_path = args.path
    if g_path is None:
        parser.print_help()
        sys.exit(-1)
    if not os.path.exists(g_path):
        error(u"目录不存在" % g_path)
        sys.exit(-2)
    if not os.path.isdir(g_path):
        error(u"路径名不是文件夹" % g_path)
        sys.exit(-3)
    observer = watchdog.observers.Observer()
    observer.schedule(FileEventHandler(), g_path, recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants