sppd_cli/parser/
parquet_writer.rs1use crate::errors::{AppError, AppResult};
2use crate::models::Entry;
3use crate::utils::{format_duration, mb_from_bytes, round_two_decimals};
4use futures::stream::{self, StreamExt, TryStreamExt};
5use polars::lazy::prelude::{LazyFrame, ScanArgsParquet};
6use polars::prelude::*;
7use rayon::prelude::*;
8use std::collections::BTreeMap;
9use std::fs::{self as std_fs, File};
10use std::path::PathBuf;
11use std::time::Instant;
12use tokio::fs as tokio_fs;
13use tracing::info;
14
15use super::file_finder::find_xmls;
16use super::xml_parser::parse_xml_bytes;
17
18fn entries_to_dataframe(entries: Vec<Entry>) -> AppResult<DataFrame> {
24 if entries.is_empty() {
25 return DataFrame::new(vec![
27 Series::new("id", Vec::<Option<String>>::new()),
28 Series::new("title", Vec::<Option<String>>::new()),
29 Series::new("link", Vec::<Option<String>>::new()),
30 Series::new("summary", Vec::<Option<String>>::new()),
31 Series::new("updated", Vec::<Option<String>>::new()),
32 Series::new("contract_folder_status", Vec::<Option<String>>::new()),
33 ])
34 .map_err(|e| AppError::ParseError(format!("Failed to create DataFrame: {e}")));
35 }
36
37 let len = entries.len();
38 let mut ids = Vec::with_capacity(len);
40 let mut titles = Vec::with_capacity(len);
41 let mut links = Vec::with_capacity(len);
42 let mut summaries = Vec::with_capacity(len);
43 let mut updateds = Vec::with_capacity(len);
44 let mut contract_folder_statuses = Vec::with_capacity(len);
45
46 for entry in entries {
48 ids.push(entry.id);
49 titles.push(entry.title);
50 links.push(entry.link);
51 summaries.push(entry.summary);
52 updateds.push(entry.updated);
53 contract_folder_statuses.push(entry.contract_folder_status);
54 }
55
56 DataFrame::new(vec![
57 Series::new("id", ids),
58 Series::new("title", titles),
59 Series::new("link", links),
60 Series::new("summary", summaries),
61 Series::new("updated", updateds),
62 Series::new("contract_folder_status", contract_folder_statuses),
63 ])
64 .map_err(|e| AppError::ParseError(format!("Failed to create DataFrame: {e}")))
65}
66
67async fn read_xml_contents(paths: &[PathBuf], concurrency: usize) -> AppResult<Vec<Vec<u8>>> {
68 let read_concurrency = concurrency.max(1);
69 stream::iter(paths.iter().cloned())
70 .map(|path| async move {
71 tokio_fs::read(&path)
72 .await
73 .map_err(|e| AppError::IoError(format!("Failed to read XML file {path:?}: {e}")))
74 })
75 .buffered(read_concurrency)
76 .try_collect()
77 .await
78}
79
80pub async fn parse_xmls(
121 target_links: &BTreeMap<String, String>,
122 procurement_type: &crate::models::ProcurementType,
123 batch_size: usize,
124 config: &crate::config::ResolvedConfig,
125) -> AppResult<()> {
126 let extract_dir = procurement_type.extract_dir(config);
127 let parquet_dir = procurement_type.parquet_dir(config);
128
129 std_fs::create_dir_all(&parquet_dir)
131 .map_err(|e| AppError::IoError(format!("Failed to create parquet directory: {e}")))?;
132
133 let subdirs = find_xmls(&extract_dir)?;
135
136 let subdirs_to_process: Vec<_> = subdirs
138 .into_iter()
139 .filter(|(subdir_name, _)| target_links.contains_key(subdir_name))
140 .collect();
141
142 let total_subdirs = subdirs_to_process.len();
143
144 if total_subdirs == 0 {
145 info!("No matching subdirectories found for parsing");
146 return Ok(());
147 }
148
149 let total_xml_files: usize = subdirs_to_process
151 .iter()
152 .map(|(_, files)| files.len())
153 .sum();
154
155 let start = Instant::now();
156 let mut total_parquet_bytes = 0u64;
157
158 info!(total = total_subdirs, "Starting XML parsing");
159
160 let mut processed_count = 0;
161 let mut skipped_count = 0;
162
163 for (subdir_name, xml_files) in subdirs_to_process {
165 let chunk_size = batch_size.max(1);
166 let mut has_entries = false;
167 let mut batch_index = 0;
168 let period_dir = parquet_dir.join(&subdir_name);
169 let mut period_dir_created = false;
170 let mut batch_paths: Vec<PathBuf> = Vec::new();
171
172 for xml_chunk in xml_files.chunks(chunk_size) {
173 let xml_contents = read_xml_contents(xml_chunk, config.read_concurrency).await?;
174 let parsed_entry_batches: Vec<Vec<Entry>> = xml_contents
175 .par_iter()
176 .map(|content| parse_xml_bytes(content))
177 .collect::<AppResult<Vec<_>>>()?;
178
179 let mut chunk_entries = Vec::new();
180 for mut entries in parsed_entry_batches {
181 if entries.is_empty() {
182 continue;
183 }
184 chunk_entries.append(&mut entries);
185 }
186
187 if chunk_entries.is_empty() {
188 continue;
189 }
190
191 if !period_dir_created {
192 if period_dir.exists() {
193 std_fs::remove_dir_all(&period_dir).map_err(|e| {
194 AppError::IoError(format!(
195 "Failed to remove previous parquet directory {period_dir:?}: {e}"
196 ))
197 })?;
198 }
199 std_fs::create_dir_all(&period_dir).map_err(|e| {
200 AppError::IoError(format!(
201 "Failed to create parquet period directory {period_dir:?}: {e}"
202 ))
203 })?;
204 period_dir_created = true;
205 }
206
207 has_entries = true;
208 let mut chunk_df = entries_to_dataframe(chunk_entries)?;
209 let batch_path = period_dir.join(format!("batch_{batch_index}.parquet"));
210 let mut file = File::create(&batch_path).map_err(|e| {
211 AppError::IoError(format!(
212 "Failed to create Parquet batch file {batch_path:?}: {e}"
213 ))
214 })?;
215
216 ParquetWriter::new(&mut file)
217 .finish(&mut chunk_df)
218 .map_err(|e| AppError::ParseError(format!("Failed to write Parquet batch: {e}")))?;
219
220 batch_paths.push(batch_path);
221 batch_index += 1;
222 }
223
224 if !has_entries {
225 skipped_count += 1;
226 if period_dir_created {
227 std_fs::remove_dir_all(&period_dir).map_err(|e| {
228 AppError::IoError(format!(
229 "Failed to remove empty parquet directory {period_dir:?}: {e}"
230 ))
231 })?;
232 }
233 continue;
234 }
235
236 let mut output_paths = Vec::new();
237 if config.concat_batches {
238 let glob_path = period_dir.join("batch_*.parquet");
239 let glob_str = glob_path.to_string_lossy().into_owned();
240 let mut combined = LazyFrame::scan_parquet(&glob_str, ScanArgsParquet::default())
241 .map_err(|e| {
242 AppError::ParseError(format!(
243 "Failed to scan parquet batches for {subdir_name}: {e}"
244 ))
245 })?
246 .collect()
247 .map_err(|e| {
248 AppError::ParseError(format!(
249 "Failed to collect combined DataFrame for {subdir_name}: {e}"
250 ))
251 })?;
252
253 let final_path = parquet_dir.join(format!("{subdir_name}.parquet"));
254 let mut final_file = File::create(&final_path).map_err(|e| {
255 AppError::IoError(format!(
256 "Failed to create final Parquet file {final_path:?}: {e}"
257 ))
258 })?;
259
260 ParquetWriter::new(&mut final_file)
261 .finish(&mut combined)
262 .map_err(|e| {
263 AppError::ParseError(format!("Failed to write final Parquet file: {e}"))
264 })?;
265
266 output_paths.push(final_path);
267 std_fs::remove_dir_all(&period_dir).map_err(|e| {
268 AppError::IoError(format!(
269 "Failed to remove temporary parquet directory {period_dir:?}: {e}"
270 ))
271 })?;
272 } else {
273 output_paths.extend(batch_paths.iter().cloned());
274 }
275
276 for output_path in output_paths {
277 let metadata = std_fs::metadata(&output_path).map_err(|e| {
278 AppError::IoError(format!(
279 "Failed to read Parquet file metadata {output_path:?}: {e}"
280 ))
281 })?;
282 total_parquet_bytes += metadata.len();
283 }
284
285 processed_count += 1;
286 }
287
288 let elapsed = start.elapsed();
289 let elapsed_str = format_duration(elapsed);
290 let total_mb = mb_from_bytes(total_parquet_bytes);
291 let throughput = if elapsed.as_secs_f64() > 0.0 {
292 total_mb / elapsed.as_secs_f64()
293 } else {
294 total_mb
295 };
296 let size_mb = round_two_decimals(total_mb);
297 let throughput_mb_s = round_two_decimals(throughput);
298
299 info!(
300 processed = processed_count,
301 skipped = skipped_count,
302 xml_files = total_xml_files,
303 parquet_files = processed_count,
304 elapsed = elapsed_str,
305 output_size_mb = size_mb,
306 throughput_mb_s = throughput_mb_s,
307 "Parsing completed"
308 );
309
310 Ok(())
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn entries_to_dataframe_empty_yields_zero_rows() {
319 let df = entries_to_dataframe(vec![]).unwrap();
320 assert_eq!(df.height(), 0);
321 assert_eq!(df.width(), 6);
322 }
323
324 #[test]
325 fn entries_to_dataframe_single_entry() {
326 let entry = Entry {
327 id: Some("id".to_string()),
328 title: Some("title".to_string()),
329 link: Some("link".to_string()),
330 summary: Some("summary".to_string()),
331 updated: Some("2023-01-01".to_string()),
332 contract_folder_status: Some("{}".to_string()),
333 };
334
335 let df = entries_to_dataframe(vec![entry]).unwrap();
336 assert_eq!(df.height(), 1);
337 let value = df.column("id").unwrap().get(0).unwrap();
338 assert_eq!(value, AnyValue::String("id"));
339 }
340}