1use base64::{Engine as _, engine::general_purpose};
2use chrono::Utc;
3use futures::{StreamExt as _, stream::FuturesOrdered};
4use haste_fhir_client::FHIRClient;
5use haste_fhir_generated_ops::generated::ViewDefinitionRun;
6use haste_fhir_model::r4::{
7 self,
8 generated::{
9 resources::{Binary, Resource, ResourceType, ViewDefinition},
10 terminology::{IssueType, OutputFormatCodes},
11 types::{FHIRBase64Binary, FHIRBoolean},
12 },
13};
14use haste_fhir_operation_error::OperationOutcomeError;
15use haste_fhirpath::{Config, FPEngine};
16use haste_reflect::MetaValue;
17use itertools::Itertools as _;
18use ordermap::OrderMap;
19use serde::{Deserialize, Serialize};
20use std::{borrow::Cow, collections::HashMap, sync::Arc};
21
22use crate::conversions::primitives::PrimitiveValue;
23
24mod conversions;
25mod output;
26
27async fn resolve_view_definition<
28 'a,
29 CTX: Send + Sync + Clone + 'static,
30 Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
31>(
32 context: CTX,
33 client: &Client,
34 input: &'a ViewDefinitionRun::Input,
35) -> Result<Cow<'a, ViewDefinition>, OperationOutcomeError> {
36 if let Some(view_definition) = &input.viewResource {
37 Ok(Cow::Borrowed(view_definition))
38 } else if let Some(view_definition_reference) = input.viewReference.as_ref() {
39 let view_definition_reference = view_definition_reference
40 .reference
41 .as_ref()
42 .ok_or_else(|| {
43 OperationOutcomeError::error(
44 IssueType::Invalid(None),
45 "viewReference.reference is required".to_string(),
46 )
47 })?
48 .value
49 .as_ref()
50 .ok_or_else(|| {
51 OperationOutcomeError::error(
52 IssueType::Invalid(None),
53 "viewReference.reference.value is required".to_string(),
54 )
55 })?;
56
57 let reference_pieces = view_definition_reference.split('/').collect::<Vec<_>>();
58
59 let view_definition_id = reference_pieces
60 .last()
61 .ok_or_else(|| {
62 OperationOutcomeError::error(
63 IssueType::Invalid(None),
64 "Invalid viewReference.reference format".to_string(),
65 )
66 })?
67 .to_string();
68
69 let result = client
70 .read(
71 context,
72 ResourceType::ViewDefinition,
73 view_definition_id.clone(),
74 )
75 .await?
76 .ok_or_else(|| {
77 OperationOutcomeError::error(
78 IssueType::NotFound(None),
79 format!(
80 "ViewDefinition not found with id '{:?}'",
81 view_definition_id
82 ),
83 )
84 })?;
85
86 if let Resource::ViewDefinition(view_definition) = result {
87 Ok(Cow::Owned(view_definition))
88 } else {
89 Err(OperationOutcomeError::error(
90 IssueType::Invalid(None),
91 "Referenced resource is not a ViewDefinition".to_string(),
92 ))
93 }
94 } else {
95 Err(OperationOutcomeError::error(
96 IssueType::Invalid(None),
97 "Either viewResource or viewReference must be provided".to_string(),
98 ))
99 }
100}
101
102async fn get_resources_to_process<
103 CTX: Send + Sync + Clone + 'static,
104 Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
105>(
106 context: CTX,
107 client: &Client,
108 view_definition: &ViewDefinition,
109 input: &ViewDefinitionRun::Input,
110) -> Result<Vec<Resource>, OperationOutcomeError> {
111 if let Some(input_resources) = input.resource.clone() {
112 Ok(input_resources)
113 } else {
114 let since = input
115 ._since
116 .as_ref()
117 .and_then(|since| since.value.clone())
118 .unwrap_or(r4::datetime::Instant::Iso8601(Utc::now()));
119
120 let Some(resource_type): Option<String> = view_definition.resource.as_ref().into() else {
121 return Err(OperationOutcomeError::error(
122 IssueType::Invalid(None),
123 "ViewDefinition.resource is required".to_string(),
124 ));
125 };
126
127 let resource_type = ResourceType::try_from(resource_type).map_err(|e| {
128 OperationOutcomeError::error(
129 IssueType::Invalid(None),
130 format!("Invalid resource type: {}", e),
131 )
132 })?;
133
134 let result = client
135 .history_type(
136 context,
137 resource_type,
138 vec![
139 ("_since".to_string(), vec![since.to_string()]),
140 ("_count".to_string(), vec!["1000".to_string()]),
141 ]
142 .into(),
143 )
144 .await?;
145
146 Ok(vec![Resource::Bundle(result)])
147 }
148}
149
150fn build_hashmap_fp_variables<'a>(
151 viewdefinition: &'a ViewDefinition,
152) -> HashMap<String, &'a dyn MetaValue> {
153 let mut hashmap = HashMap::new();
154
155 if let Some(constants) = &viewdefinition.constant {
156 for constant in constants {
157 if let Some(name) = &constant.name.value.as_ref() {
158 hashmap.insert((*name).clone(), &constant.value as &dyn MetaValue);
159 }
160 }
161 }
162
163 hashmap
164}
165
166fn cartesian_product(
167 select_statement_results: Vec<Vec<OrderMap<String, OutputResults>>>,
168) -> Vec<OrderMap<String, OutputResults>> {
169 let mut output_results = Vec::new();
170
171 for combination in select_statement_results
172 .into_iter()
173 .multi_cartesian_product()
174 {
175 let mut combined_result = OrderMap::new();
176
177 for result in combination {
178 for (key, value) in result {
179 combined_result.insert(key, value);
180 }
181 }
182
183 output_results.push(combined_result);
184 }
185
186 output_results
187}
188
189#[derive(Debug, Clone, Deserialize, Serialize)]
191#[serde(untagged)]
192enum OutputResults {
193 Scalar(Option<PrimitiveValue>),
194 Collection(Vec<Option<PrimitiveValue>>),
195}
196
197async fn process_resource<
198 CTX: Send + Sync + Clone + 'static,
199 Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
200>(
201 _context: CTX,
202 _client: Arc<Client>,
203 variables: Arc<HashMap<String, &dyn MetaValue>>,
204 view_definition: &ViewDefinition,
205 input: Box<Resource>,
206) -> Result<Vec<OrderMap<String, OutputResults>>, OperationOutcomeError> {
207 let fp_engine = FPEngine::new();
208
209 let mut select_statement_results = Vec::with_capacity(view_definition.select.len());
210
211 for select_statement in view_definition.select.iter() {
212 let fp_config = Arc::new(
213 Config::builder()
214 .with_variable_resolver(haste_fhirpath::ExternalConstantResolver::Variable(
215 variables.clone(),
216 ))
217 .with_resource_id(input.id().clone().unwrap_or("".to_string())),
218 );
219
220 let mut iterable_context = None;
221 let mut set_null = false;
222
223 if let Some(for_each_fp) = select_statement
224 .forEach
225 .as_ref()
226 .and_then(|f| f.value.as_ref())
227 {
228 iterable_context = Some(vec![
229 fp_engine
230 .evaluate_with_config(for_each_fp, vec![&input], fp_config.clone())
231 .await
232 .map_err(|e| {
233 OperationOutcomeError::error(
234 IssueType::Exception(None),
235 format!("Error evaluating forEach expression: {}", e),
236 )
237 })?,
238 ]);
239 } else if let Some(for_each_or_null_fp) = select_statement
240 .forEachOrNull
241 .as_ref()
242 .and_then(|f| f.value.as_ref())
243 {
244 iterable_context = Some(vec![
245 fp_engine
246 .evaluate_with_config(for_each_or_null_fp, vec![&input], fp_config.clone())
247 .await
248 .map_err(|e| {
249 OperationOutcomeError::error(
250 IssueType::Exception(None),
251 format!("Error evaluating forEachOrNull expression: {}", e),
252 )
253 })?,
254 ]);
255 set_null = true;
256 } else if let Some(_repeat) = select_statement
257 .repeat
258 .as_ref()
259 .map(|r| r.iter().filter_map(|r| r.value.as_ref()))
260 {
261 let mut repeat_fps = vec![];
262 for repeat_fp in _repeat {
263 let repeat = format!("$this.repeat({})", repeat_fp);
264 repeat_fps.push(
265 fp_engine
266 .evaluate_with_config(&repeat, vec![&input], fp_config.clone())
267 .await
268 .map_err(|e| {
269 OperationOutcomeError::error(
270 IssueType::Exception(None),
271 format!("Error evaluating repeat expression: {}", e),
272 )
273 })?,
274 );
275 }
276
277 iterable_context = Some(repeat_fps);
278 }
279
280 let select_context: Vec<&dyn MetaValue> = if let Some(iterable) = iterable_context.as_ref()
281 {
282 iterable
283 .iter()
284 .flat_map(|item| item.iter())
285 .collect::<Vec<&dyn MetaValue>>()
286 } else {
287 vec![&input]
288 };
289
290 let mut select_results = Vec::with_capacity(select_context.len());
291
292 if set_null && select_context.is_empty() {
293 let mut output_result = OrderMap::new();
294 for column in select_statement.column.as_ref().into_iter().flatten() {
295 let Some(name) = column.name.value.as_ref().map(|n| n.as_str()) else {
296 return Err(OperationOutcomeError::error(
297 IssueType::Invalid(None),
298 "Column name is required".to_string(),
299 ));
300 };
301 output_result.insert(name.to_string(), OutputResults::Scalar(None));
302 }
303 select_results.push(output_result);
304 }
305
306 for context in select_context {
307 let mut output_result = OrderMap::new();
308 for column in select_statement.column.as_ref().into_iter().flatten() {
309 let Some(path) = column.path.value.as_ref().map(|p| p.as_str()) else {
310 return Err(OperationOutcomeError::error(
311 IssueType::Invalid(None),
312 "Column path is required".to_string(),
313 ));
314 };
315
316 let Some(name) = column.name.value.as_ref().map(|n| n.as_str()) else {
317 return Err(OperationOutcomeError::error(
318 IssueType::Invalid(None),
319 "Column name is required".to_string(),
320 ));
321 };
322
323 let result = fp_engine
324 .evaluate_with_config(path, vec![context; 1], fp_config.clone())
325 .await
326 .map_err(|e| {
327 OperationOutcomeError::error(
328 IssueType::Exception(None),
329 format!("Error evaluating expression: {}", e),
330 )
331 })?;
332
333 let column_type = column
334 .type_
335 .as_ref()
336 .and_then(|t| t.value.as_ref())
337 .map(|t| t.as_str())
338 .unwrap_or(
340 result
343 .iter()
344 .peekable()
345 .next()
346 .map(|v| v.fhir_type())
347 .unwrap_or("string"),
348 );
349
350 let mut column_result = result
351 .iter()
352 .map(|value| conversions::primitives::convert_meta_value(column_type, value))
353 .collect::<Result<Vec<Option<PrimitiveValue>>, OperationOutcomeError>>()?;
354
355 let is_collection = column
356 .collection
357 .as_ref()
358 .and_then(|c| c.value)
359 .unwrap_or(false);
360
361 let insert_value = if is_collection {
362 OutputResults::Collection(column_result)
363 } else {
364 if column_result.len() > 1 {
365 return Err(OperationOutcomeError::error(
366 IssueType::Invalid(None),
367 "Column result is a collection but the column is not marked as a collection"
368 .to_string(),
369 ));
370 }
371
372 let mut singular_value = None;
373
374 if let Some(first_value) = column_result.get_mut(0) {
375 std::mem::swap(&mut singular_value, first_value);
376 }
377
378 OutputResults::Scalar(singular_value)
379 };
380
381 output_result.insert(name.to_string(), insert_value);
382 }
383 select_results.push(output_result);
384 }
385
386 select_statement_results.push(select_results);
387 }
388
389 let output_results = cartesian_product(select_statement_results);
390
391 Ok(output_results)
392}
393
394fn flatten_results(resource: Vec<Resource>) -> Vec<Box<Resource>> {
395 let mut resources = Vec::new();
396 for resource in resource {
397 match resource {
398 Resource::Bundle(bundle) => {
399 for entry in bundle.entry.into_iter().flatten() {
400 if let Some(resource) = entry.resource {
401 resources.push(resource);
402 }
403 }
404 }
405 _ => {
406 resources.push(Box::new(resource));
407 }
408 }
409 }
410
411 resources
412}
413
414async fn passes_where_clauses(
415 fp_engine: &FPEngine,
416 variables: Arc<HashMap<String, &dyn MetaValue>>,
417 where_clauses: &[&str],
418 resource: &Resource,
419) -> Result<bool, OperationOutcomeError> {
420 for where_clause in where_clauses {
421 let result = fp_engine
422 .evaluate_with_config(
423 where_clause,
424 vec![resource],
425 Arc::new(Config::builder().with_variable_resolver(
426 haste_fhirpath::ExternalConstantResolver::Variable(variables.clone()),
427 )),
428 )
429 .await
430 .map_err(|e| {
431 OperationOutcomeError::error(
432 IssueType::Exception(None),
433 format!("Error evaluating where clause expression: {}", e),
434 )
435 })?;
436
437 let bool_results = result
438 .iter()
439 .map(|v| match v.fhir_type() {
440 "boolean" => Ok(v
441 .as_any()
442 .downcast_ref::<FHIRBoolean>()
443 .and_then(|b| b.value.as_ref())
444 .unwrap_or(&false)),
445 "http://hl7.org/fhirpath/System.Boolean" => {
446 Ok(v.as_any().downcast_ref::<bool>().unwrap_or(&false))
447 }
448 _ => Err(OperationOutcomeError::error(
449 IssueType::Invalid(None),
450 format!(
451 "Where clause expression must evaluate to a boolean, got: {}",
452 v.fhir_type()
453 ),
454 )),
455 })
456 .collect::<Result<Vec<_>, _>>()?;
457
458 if bool_results.iter().any(|v| **v == false) {
459 return Ok(false);
460 }
461 }
462
463 Ok(true)
464}
465
466async fn process_view_definition<
467 CTX: Send + Sync + Clone + 'static,
468 Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
469>(
470 context: CTX,
471 output_format: &OutputFormatCodes,
472 client: Arc<Client>,
473 view_definition: &ViewDefinition,
474 input: &ViewDefinitionRun::Input,
475) -> Result<Binary, OperationOutcomeError> {
476 let variables = Arc::new(build_hashmap_fp_variables(view_definition));
477 let _limit = input
478 ._limit
479 .as_ref()
480 .and_then(|limit| limit.value.clone())
481 .unwrap_or(1000);
482
483 let input_ = flatten_results(
484 get_resources_to_process(context.clone(), client.as_ref(), view_definition, input).await?,
485 );
486
487 let mut tasks = FuturesOrdered::new();
488
489 let where_clauses = view_definition
490 .where_
491 .as_ref()
492 .map(|w| Cow::Borrowed(w))
493 .unwrap_or_else(|| Cow::Owned(vec![]));
494
495 let where_fp_clauses = where_clauses
496 .iter()
497 .filter_map(|w| w.path.value.as_ref())
498 .map(|s| s.as_str())
499 .collect::<Vec<_>>();
500
501 for resource in input_ {
502 if passes_where_clauses(
503 &FPEngine::new(),
504 variables.clone(),
505 where_fp_clauses.as_slice(),
506 resource.as_ref(),
507 )
508 .await?
509 {
510 tasks.push_back(async {
511 process_resource(
512 context.clone(),
513 client.clone(),
514 variables.clone(),
515 view_definition,
516 resource,
517 )
518 .await
519 });
520 }
521 }
522
523 let mut results = Vec::with_capacity(tasks.len());
524
525 while let Some(result) = tasks.next().await {
526 results.push(result?);
527 }
528
529 let results = results.into_iter().flatten().collect::<Vec<_>>();
530
531 match output_format {
532 OutputFormatCodes::Csv(_) => {
533 let data = output::csv::csv(results)?;
534
535 let base64_string: String = general_purpose::STANDARD.encode(&data);
536
537 Ok(Binary {
538 data: Some(Box::new(FHIRBase64Binary {
539 value: Some(base64_string),
540 ..Default::default()
541 })),
542 ..Default::default()
543 })
544 }
545 OutputFormatCodes::Json(_) => {
546 let data = output::json::json(results)?;
547
548 let base64_string: String = general_purpose::STANDARD.encode(&data);
549
550 Ok(Binary {
551 data: Some(Box::new(FHIRBase64Binary {
552 value: Some(base64_string),
553 ..Default::default()
554 })),
555 ..Default::default()
556 })
557 }
558 OutputFormatCodes::Ndjson(_) => {
559 let data = output::ndjson::ndjson(results)?;
560 let base64_string: String = general_purpose::STANDARD.encode(&data);
561
562 Ok(Binary {
563 data: Some(Box::new(FHIRBase64Binary {
564 value: Some(base64_string),
565 ..Default::default()
566 })),
567 ..Default::default()
568 })
569 }
570 _ => Err(OperationOutcomeError::error(
571 IssueType::NotSupported(None),
572 format!("Output format '{:?}' is not supported", output_format),
573 )),
574 }
575}
576
577pub async fn view_definition_run<
578 CTX: Send + Sync + Clone + 'static,
579 Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
580>(
581 context: CTX,
582 client: Arc<Client>,
583 input: &ViewDefinitionRun::Input,
584) -> Result<ViewDefinitionRun::Output, OperationOutcomeError> {
585 let output_format = input
586 ._format
587 .as_ref()
588 .and_then(|v| v.value.as_ref())
589 .and_then(|s| OutputFormatCodes::try_from(s.to_string()).ok())
590 .unwrap_or(OutputFormatCodes::Csv(None));
591
592 let view_definition =
593 Arc::new(resolve_view_definition(context.clone(), client.as_ref(), &input).await?);
594
595 let output = process_view_definition(
596 context,
597 &output_format,
598 client,
599 view_definition.as_ref(),
600 &input,
601 )
602 .await?;
603
604 Ok(ViewDefinitionRun::Output { return_: output })
605}