8
8
"""
9
9
import itertools
10
10
import os
11
+ import re
11
12
import shutil
12
13
import sys
13
14
import tempfile
@@ -47,11 +48,11 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
47
48
self .extract_file_rpm : {".rpm" },
48
49
self .extract_file_deb : {".deb" , ".ipk" },
49
50
self .extract_file_cab : {".cab" },
51
+ self .extract_file_apk : {".apk" },
50
52
self .extract_file_zip : {
51
53
".exe" ,
52
54
".zip" ,
53
55
".jar" ,
54
- ".apk" ,
55
56
".msi" ,
56
57
".egg" ,
57
58
".whl" ,
@@ -78,13 +79,13 @@ async def extract_file_rpm(self, filename, extraction_path):
78
79
if not await aio_inpath ("rpm2cpio" ) or not await aio_inpath ("cpio" ):
79
80
await rpmextract ("-xC" , extraction_path , filename )
80
81
else :
81
- stdout , stderr = await aio_run_command (["rpm2cpio" , filename ])
82
+ stdout , stderr , _ = await aio_run_command (["rpm2cpio" , filename ])
82
83
if stderr or not stdout :
83
84
return 1
84
85
cpio_path = os .path .join (extraction_path , "data.cpio" )
85
86
async with FileIO (cpio_path , "wb" ) as f :
86
87
await f .write (stdout )
87
- stdout , stderr = await aio_run_command (
88
+ stdout , stderr , _ = await aio_run_command (
88
89
["cpio" , "-idm" , "--file" , cpio_path ]
89
90
)
90
91
if stdout or not stderr :
@@ -94,13 +95,13 @@ async def extract_file_rpm(self, filename, extraction_path):
94
95
with ErrorHandler (mode = self .error_mode , logger = self .logger ):
95
96
raise Exception ("7z is required to extract rpm files" )
96
97
else :
97
- stdout , stderr = await aio_run_command (["7z" , "x" , filename ])
98
+ stdout , stderr , _ = await aio_run_command (["7z" , "x" , filename ])
98
99
if stderr or not stdout :
99
100
return 1
100
101
filenames = await aio_glob (os .path .join (extraction_path , "*.cpio" ))
101
102
filename = filenames [0 ]
102
103
103
- stdout , stderr = await aio_run_command (["7z" , "x" , filename ])
104
+ stdout , stderr , _ = await aio_run_command (["7z" , "x" , filename ])
104
105
if stderr or not stdout :
105
106
return 1
106
107
return 0
@@ -111,45 +112,82 @@ async def extract_file_deb(self, filename, extraction_path):
111
112
with ErrorHandler (mode = self .error_mode , logger = self .logger ):
112
113
raise Exception ("'ar' is required to extract deb files" )
113
114
else :
114
- stdout , stderr = await aio_run_command (["ar" , "x" , filename ])
115
+ stdout , stderr , _ = await aio_run_command (["ar" , "x" , filename ])
115
116
if stderr :
116
117
return 1
117
118
datafile = await aio_glob (os .path .join (extraction_path , "data.tar.*" ))
118
119
with ErrorHandler (mode = ErrorMode .Ignore ) as e :
119
120
await aio_unpack_archive (datafile [0 ], extraction_path )
120
121
return e .exit_code
121
122
123
+ async def extract_file_apk (self , filename , extraction_path ):
124
+ """Check whether it is alpine or android package"""
125
+
126
+ is_tar = True
127
+ process_can_fail = True
128
+ if await aio_inpath ("unzip" ):
129
+ stdout , stderr , return_code = await aio_run_command (
130
+ ["unzip" , "-l" , filename ], process_can_fail
131
+ )
132
+ if return_code == 0 :
133
+ is_tar = False
134
+ elif await aio_inpath ("7z" ):
135
+ stdout , stderr , return_code = await aio_run_command (
136
+ ["7z" , "t" , filename ], process_can_fail
137
+ )
138
+ if re .search (b"Type = Zip" , stdout ):
139
+ is_tar = False
140
+ elif await aio_inpath ("zipinfo" ):
141
+ stdout , stderr , return_code = await aio_run_command (
142
+ ["zipinfo" , filename ], process_can_fail
143
+ )
144
+ if return_code == 0 :
145
+ is_tar = False
146
+ elif await aio_inpath ("file" ):
147
+ stdout , stderr , return_code = await aio_run_command (
148
+ ["file" , filename ], process_can_fail
149
+ )
150
+ if re .search (b"Zip archive data" , stdout ):
151
+ is_tar = False
152
+ if is_tar :
153
+ self .logger .debug (f"Extracting { filename } as a tar.gzip file" )
154
+ with ErrorHandler (mode = ErrorMode .Ignore ) as e :
155
+ await aio_unpack_archive (filename , extraction_path , format = "gztar" )
156
+ return e .exit_code
157
+ else :
158
+ return await self .extract_file_zip (filename , extraction_path )
159
+
122
160
@staticmethod
123
161
async def extract_file_cab (filename , extraction_path ):
124
162
"""Extract cab files"""
125
163
if sys .platform .startswith ("linux" ):
126
164
if not await aio_inpath ("cabextract" ):
127
165
raise Exception ("'cabextract' is required to extract cab files" )
128
166
else :
129
- stdout , stderr = await aio_run_command (
167
+ stdout , stderr , _ = await aio_run_command (
130
168
["cabextract" , "-d" , extraction_path , filename ]
131
169
)
132
170
if stderr or not stdout :
133
171
return 1
134
172
else :
135
- stdout , stderr = await aio_run_command (
173
+ stdout , stderr , _ = await aio_run_command (
136
174
["Expand" , filename , "-R -F:*" , extraction_path ]
137
175
)
138
176
if stderr or not stdout :
139
177
return 1
140
178
return 0
141
179
142
180
@staticmethod
143
- async def extract_file_zip (filename , extraction_path ):
181
+ async def extract_file_zip (filename , extraction_path , process_can_fail = False ):
144
182
"""Extract zip files"""
145
183
if await aio_inpath ("unzip" ):
146
- stdout , stderr = await aio_run_command (
184
+ stdout , stderr , _ = await aio_run_command (
147
185
["unzip" , "-n" , "-d" , extraction_path , filename ]
148
186
)
149
187
if stderr or not stdout :
150
188
return 1
151
189
elif await aio_inpath ("7z" ):
152
- stdout , stderr = await aio_run_command (["7z" , "x" , filename ])
190
+ stdout , stderr , _ = await aio_run_command (["7z" , "x" , filename ])
153
191
if stderr or not stdout :
154
192
return 1
155
193
else :
0 commit comments