1use crate::errors::{AppError, AppResult};
2use crate::models::ProcurementType;
3use crate::utils::{format_duration, mb_from_bytes, round_two_decimals};
4use rayon::{prelude::*, ThreadPoolBuilder};
5use std::collections::{BTreeMap, HashSet};
6use std::fs::{self, File};
7use std::io::{copy, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Instant;
11use tracing::{debug, info, warn};
12use zip::ZipArchive;
13
14pub async fn extract_all_zips(
47 target_links: &BTreeMap<String, String>,
48 procurement_type: &ProcurementType,
49 config: &crate::config::ResolvedConfig,
50) -> AppResult<()> {
51 let extract_dir = procurement_type.extract_dir(config);
52 if !extract_dir.exists() {
53 return Err(AppError::IoError(format!(
54 "Directory does not exist: {}",
55 extract_dir.display()
56 )));
57 }
58
59 let capacity = target_links.len();
62 let mut zips_to_extract: Vec<PathBuf> = Vec::with_capacity(capacity);
63 let mut missing_zips = Vec::with_capacity(capacity);
64
65 for period in target_links.keys() {
66 let zip_path = extract_dir.join(format!("{period}.zip"));
67 if !zip_path.exists() {
68 missing_zips.push((period.clone(), zip_path));
69 continue;
70 }
71
72 let extract_dir_path = zip_path
74 .parent()
75 .ok_or_else(|| {
76 AppError::InvalidInput(format!(
77 "ZIP file has no parent directory: {}",
78 zip_path.display()
79 ))
80 })?
81 .join(period);
82
83 if !extract_dir_path.exists() {
84 zips_to_extract.push(zip_path);
85 }
86 }
87
88 let total_zips = zips_to_extract.len();
89 let skipped_count = target_links.len() - total_zips - missing_zips.len();
90
91 if total_zips == 0 {
92 info!(
93 total = target_links.len(),
94 skipped = skipped_count,
95 missing = missing_zips.len(),
96 "All ZIP files already extracted, skipping extraction"
97 );
98 return Ok(());
99 }
100
101 for (period, zip_path) in &missing_zips {
103 warn!(
104 zip_file = %zip_path.display(),
105 period = period,
106 "ZIP file not found, skipping"
107 );
108 }
109
110 info!(
111 total = total_zips,
112 skipped = skipped_count,
113 missing = missing_zips.len(),
114 "Starting extraction"
115 );
116
117 let start = Instant::now();
118
119 let cpu_count = std::thread::available_parallelism()
120 .map(|p| p.get())
121 .unwrap_or(1);
122 let thread_count = cpu_count.saturating_mul(2);
123 let rayon_pool = ThreadPoolBuilder::new()
124 .num_threads(thread_count)
125 .build()
126 .map_err(|e| AppError::IoError(format!("Failed to configure rayon thread pool: {e}")))?;
127
128 let results = tokio::task::spawn_blocking(move || {
130 rayon_pool.install(|| {
131 zips_to_extract
132 .par_iter()
133 .map(|zip_path| {
134 let result = extract_zip_sync(zip_path);
135 (zip_path.clone(), result)
136 })
137 .collect::<Vec<(PathBuf, AppResult<()>)>>()
138 })
139 })
140 .await
141 .map_err(|e| AppError::IoError(format!("Task join error: {e}")))?;
142
143 let mut errors = Vec::new();
145 let mut extracted_bytes = 0u64;
146 for (zip_path, result) in results {
147 if let Err(e) = result {
148 let error_msg = format!("Failed to extract {}: {}", zip_path.display(), e);
149 warn!(
150 zip_file = %zip_path.display(),
151 error = %e,
152 "Failed to extract ZIP file"
153 );
154 errors.push(error_msg);
155 continue;
156 }
157
158 if let Some(extract_dir) = extracted_dir_for_zip(&zip_path) {
159 extracted_bytes += directory_size(&extract_dir);
160 }
161 }
162
163 if !errors.is_empty() {
164 return Err(AppError::IoError(format!(
165 "Failed to extract {} ZIP file(s): {}",
166 errors.len(),
167 errors.join("; ")
168 )));
169 }
170
171 if skipped_count > 0 {
172 debug!(skipped = skipped_count, "Skipped already extracted files");
173 }
174
175 let elapsed = start.elapsed();
176 let elapsed_str = format_duration(elapsed);
177 let total_mb = mb_from_bytes(extracted_bytes);
178 let throughput = if elapsed.as_secs_f64() > 0.0 {
179 total_mb / elapsed.as_secs_f64()
180 } else {
181 total_mb
182 };
183 let size_mb = round_two_decimals(total_mb);
184 let throughput_mb_s = round_two_decimals(throughput);
185
186 info!(
187 extracted = total_zips,
188 skipped = skipped_count,
189 missing = missing_zips.len(),
190 elapsed = elapsed_str,
191 size_mb = size_mb,
192 throughput_mb_s = throughput_mb_s,
193 "Extraction completed"
194 );
195
196 Ok(())
197}
198
199fn extract_zip_sync(zip_path: &Path) -> AppResult<()> {
202 let zip_file_name = zip_path
203 .file_stem()
204 .and_then(|s| s.to_str())
205 .ok_or_else(|| {
206 AppError::InvalidInput(format!("Invalid ZIP file name: {}", zip_path.display()))
207 })?;
208
209 let extract_dir = zip_path
210 .parent()
211 .ok_or_else(|| {
212 AppError::InvalidInput(format!(
213 "ZIP file has no parent directory: {}",
214 zip_path.display()
215 ))
216 })?
217 .join(zip_file_name);
218
219 if extract_dir.exists() {
221 debug!(
222 zip_file = %zip_path.display(),
223 extract_dir = %extract_dir.display(),
224 "Skipping extraction, directory already exists"
225 );
226 return Ok(());
227 }
228
229 std::fs::create_dir_all(&extract_dir).map_err(|e| {
231 AppError::IoError(format!(
232 "Failed to create extraction directory {}: {}",
233 extract_dir.display(),
234 e
235 ))
236 })?;
237
238 let file = File::open(zip_path).map_err(|e| {
240 AppError::IoError(format!(
241 "Failed to open ZIP file {}: {}",
242 zip_path.display(),
243 e
244 ))
245 })?;
246
247 let mut archive = ZipArchive::new(file).map_err(|e| {
248 AppError::ParseError(format!(
249 "Failed to read ZIP archive {}: {}",
250 zip_path.display(),
251 e
252 ))
253 })?;
254
255 let mut entries = Vec::with_capacity(archive.len());
256 let mut created_dirs = HashSet::new();
257
258 for i in 0..archive.len() {
259 let file = archive.by_index(i).map_err(|e| {
260 AppError::ParseError(format!(
261 "Failed to read file {} from ZIP {}: {}",
262 i,
263 zip_path.display(),
264 e
265 ))
266 })?;
267
268 let out_path = match file.enclosed_name() {
269 Some(path) => extract_dir.join(path),
270 None => continue,
271 };
272
273 if file.name().ends_with('/') {
274 continue;
275 }
276
277 if let Some(parent) = out_path.parent() {
278 if created_dirs.insert(parent.to_path_buf()) {
279 std::fs::create_dir_all(parent).map_err(|e| {
280 AppError::IoError(format!(
281 "Failed to create directory {}: {}",
282 parent.display(),
283 e
284 ))
285 })?;
286 }
287 }
288
289 entries.push((i, out_path));
290 }
291
292 drop(archive);
293
294 let zip_path_arc = Arc::new(zip_path.to_path_buf());
295 entries
296 .par_iter()
297 .map(|(index, out_path)| {
298 let zip_path = zip_path_arc.clone();
299 let file = File::open(&*zip_path).map_err(|e| {
300 AppError::IoError(format!(
301 "Failed to open ZIP file {}: {}",
302 zip_path.display(),
303 e
304 ))
305 })?;
306
307 let mut archive = ZipArchive::new(file).map_err(|e| {
308 AppError::ParseError(format!(
309 "Failed to read ZIP archive {}: {}",
310 zip_path.display(),
311 e
312 ))
313 })?;
314
315 let mut file = archive.by_index(*index).map_err(|e| {
316 AppError::ParseError(format!(
317 "Failed to read file {} from ZIP {}: {}",
318 index,
319 zip_path.display(),
320 e
321 ))
322 })?;
323
324 let out_file = std::fs::File::create(out_path).map_err(|e| {
325 AppError::IoError(format!(
326 "Failed to create file {}: {}",
327 out_path.display(),
328 e
329 ))
330 })?;
331
332 let mut writer = BufWriter::with_capacity(32 * 1024, out_file);
333 copy(&mut file, &mut writer).map_err(|e| {
334 AppError::IoError(format!(
335 "Failed to copy file from ZIP {} to {}: {}",
336 zip_path.display(),
337 out_path.display(),
338 e
339 ))
340 })?;
341 writer.flush().map_err(|e| {
342 AppError::IoError(format!(
343 "Failed to flush file {}: {}",
344 out_path.display(),
345 e
346 ))
347 })?;
348
349 Ok(())
350 })
351 .collect::<AppResult<Vec<()>>>()?;
352
353 Ok(())
354}
355
356fn extracted_dir_for_zip(zip_path: &Path) -> Option<PathBuf> {
357 let parent = zip_path.parent()?;
358 let stem = zip_path.file_stem()?;
359 Some(parent.join(stem))
360}
361
362fn directory_size(dir: &Path) -> u64 {
363 let mut total = 0;
364 if let Ok(entries) = fs::read_dir(dir) {
365 for entry in entries.flatten() {
366 let path = entry.path();
367 if path.is_dir() {
368 total += directory_size(&path);
369 } else if let Ok(metadata) = entry.metadata() {
370 total += metadata.len();
371 }
372 }
373 }
374 total
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use std::fs;
381 use tempfile::TempDir;
382
383 #[test]
384 fn extracted_dir_for_zip_returns_parent_stem() {
385 let zip = PathBuf::from("/tmp/data/202401.zip");
386 assert_eq!(
387 extracted_dir_for_zip(&zip),
388 Some(PathBuf::from("/tmp/data/202401"))
389 );
390 }
391
392 #[test]
393 fn directory_size_counts_nested_files() {
394 let tmp = TempDir::new().unwrap();
395 let base = tmp.path().join("dir");
396 fs::create_dir_all(base.join("nested")).unwrap();
397 fs::write(base.join("file1.txt"), vec![0u8; 10]).unwrap();
398 fs::write(base.join("nested/file2.txt"), vec![0u8; 20]).unwrap();
399
400 let size = directory_size(&base);
401 assert_eq!(size, 30);
402 }
403}