Skip to main content

haste_repository/pg/
fhir.rs

1use crate::{
2    fhir::{CachePolicy, FHIRRepository, ResourceHistoryValue},
3    pg::{
4        PGConnection, StoreError,
5        utilities::{commit_transaction, create_transaction},
6    },
7    types::{FHIRMethod, SupportedFHIRVersions},
8    utilities,
9};
10use haste_fhir_client::{
11    request::HistoryRequest,
12    url::{ParsedParameter, ParsedParameters},
13};
14use haste_fhir_model::r4::{
15    datetime::parse_datetime,
16    generated::{
17        resources::{Resource, ResourceType},
18        terminology::IssueType,
19    },
20    sqlx::{FHIRJson, FHIRJsonRef},
21};
22use haste_fhir_operation_error::OperationOutcomeError;
23use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
24use moka::future::Cache;
25use sqlx::{Acquire, Postgres, QueryBuilder, query_builder::Separated};
26use std::sync::Arc;
27use tokio::sync::Mutex;
28
29#[derive(sqlx::FromRow, Debug)]
30struct ReturnSingularResource {
31    resource: FHIRJson<Resource>,
32}
33
34#[derive(sqlx::FromRow, Debug)]
35struct ReturnVersionedResource {
36    resource: FHIRJson<Resource>,
37    version_id: VersionId,
38}
39
40#[derive(sqlx::FromRow, Debug)]
41struct HistoryValue {
42    pub resource: FHIRJson<Resource>,
43    pub request_method: String,
44}
45
46async fn read_version_ids_from_cache<'a>(
47    cache: &Cache<VersionId, Resource>,
48    version_ids: &'a [&VersionId],
49) -> (Vec<Resource>, Vec<&'a VersionId>) {
50    let mut remaining_version_ids = vec![];
51    let mut cached_resources = vec![];
52    for version_id in version_ids.iter() {
53        if let Some(resource) = cache.get(*version_id).await {
54            cached_resources.push(resource)
55        } else {
56            remaining_version_ids.push(*version_id);
57        }
58    }
59
60    (cached_resources, remaining_version_ids)
61}
62
63impl FHIRRepository for PGConnection {
64    async fn create(
65        &self,
66        tenant: &TenantId,
67        project: &ProjectId,
68        author: &UserTokenClaims,
69        fhir_version: &SupportedFHIRVersions,
70        resource: &mut Resource,
71    ) -> Result<Resource, OperationOutcomeError> {
72        match &self {
73            PGConnection::Pool(_pool, _) => {
74                let tx = create_transaction(self, true).await?;
75                let res = {
76                    let mut conn = tx.lock().await;
77                    let res =
78                        create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
79                    res
80                };
81                commit_transaction(tx).await?;
82                Ok(res)
83            }
84            PGConnection::Transaction(tx, _) => {
85                let mut tx = tx.lock().await;
86                let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
87                Ok(res)
88            }
89        }
90    }
91
92    async fn delete(
93        &self,
94        tenant: &TenantId,
95        project: &ProjectId,
96        author: &UserTokenClaims,
97        fhir_version: &SupportedFHIRVersions,
98        resource: &mut Resource,
99        id: &str,
100    ) -> Result<Resource, OperationOutcomeError> {
101        match self {
102            PGConnection::Pool(_pool, _) => {
103                let tx = create_transaction(self, true).await?;
104                let res = {
105                    let mut conn = tx.lock().await;
106                    let res = delete(
107                        &mut *conn,
108                        tenant,
109                        project,
110                        author,
111                        fhir_version,
112                        resource,
113                        id,
114                    )
115                    .await?;
116                    res
117                };
118                commit_transaction(tx).await?;
119                Ok(res)
120            }
121            PGConnection::Transaction(tx, _) => {
122                let mut conn = tx.lock().await;
123                // Handle PgConnection connection
124                let res = delete(
125                    &mut *conn,
126                    tenant,
127                    project,
128                    author,
129                    fhir_version,
130                    resource,
131                    id,
132                )
133                .await?;
134                Ok(res)
135            }
136        }
137    }
138
139    async fn update(
140        &self,
141        tenant: &TenantId,
142        project: &ProjectId,
143        author: &UserTokenClaims,
144        fhir_version: &SupportedFHIRVersions,
145        resource: &mut Resource,
146        id: &str,
147    ) -> Result<Resource, OperationOutcomeError> {
148        match self {
149            PGConnection::Pool(_pool, _) => {
150                let tx = create_transaction(self, true).await?;
151                let res = {
152                    let mut conn = tx.lock().await;
153                    let res = update(
154                        &mut *conn,
155                        tenant,
156                        project,
157                        author,
158                        fhir_version,
159                        resource,
160                        id,
161                    )
162                    .await?;
163                    res
164                };
165
166                commit_transaction(tx).await?;
167                Ok(res)
168            }
169            PGConnection::Transaction(tx, _) => {
170                let mut conn = tx.lock().await;
171                // Handle PgConnection connection
172                let res = update(
173                    &mut *conn,
174                    tenant,
175                    project,
176                    author,
177                    fhir_version,
178                    resource,
179                    id,
180                )
181                .await?;
182                Ok(res)
183            }
184        }
185    }
186
187    async fn read_by_version_ids(
188        &self,
189        tenant_id: &TenantId,
190        project_id: &ProjectId,
191        version_ids: &[&VersionId],
192        cache_policy: CachePolicy,
193    ) -> Result<Vec<Resource>, OperationOutcomeError> {
194        if version_ids.is_empty() {
195            return Ok(vec![]);
196        }
197
198        let (cached_result, remaining_version_ids) =
199            read_version_ids_from_cache(self.cache(), &version_ids).await;
200
201        if remaining_version_ids.is_empty() {
202            return Ok(cached_result);
203        }
204
205        match self {
206            PGConnection::Pool(pool, cache) => {
207                let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
208                    .await?;
209
210                if cache_policy == CachePolicy::Cache {
211                    for v in res.iter() {
212                        cache
213                            .insert(v.version_id.clone(), v.resource.0.clone())
214                            .await;
215                    }
216                }
217
218                Ok(cached_result
219                    .into_iter()
220                    .chain(res.into_iter().map(|r| r.resource.0))
221                    .collect::<Vec<_>>())
222            }
223            PGConnection::Transaction(tx, cache) => {
224                let mut conn = tx.lock().await;
225                // Handle PgConnection connection
226                let res =
227                    read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
228                        .await?;
229
230                if cache_policy == CachePolicy::Cache {
231                    for v in res.iter() {
232                        cache
233                            .insert(v.version_id.clone(), v.resource.0.clone())
234                            .await;
235                    }
236                }
237
238                Ok(cached_result
239                    .into_iter()
240                    .chain(res.into_iter().map(|r| r.resource.0))
241                    .collect::<Vec<_>>())
242            }
243        }
244    }
245
246    async fn read_latest(
247        &self,
248        tenant_id: &TenantId,
249        project_id: &ProjectId,
250        resource_type: &ResourceType,
251        resource_id: &ResourceId,
252    ) -> Result<Option<Resource>, OperationOutcomeError> {
253        match self {
254            PGConnection::Pool(pool, _) => {
255                let res =
256                    read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
257                Ok(res)
258            }
259            PGConnection::Transaction(tx, _) => {
260                let mut conn = tx.lock().await;
261                // Handle PgConnection connection
262                let res = read_latest(
263                    &mut *conn,
264                    tenant_id,
265                    project_id,
266                    resource_type,
267                    resource_id,
268                )
269                .await?;
270                Ok(res)
271            }
272        }
273    }
274
275    async fn history(
276        &self,
277        tenant_id: &TenantId,
278        project_id: &ProjectId,
279        request: &HistoryRequest,
280    ) -> Result<Vec<ResourceHistoryValue>, OperationOutcomeError> {
281        match self {
282            PGConnection::Pool(pool, _) => {
283                let res = history(pool, tenant_id, project_id, request).await?;
284                Ok(res)
285            }
286            PGConnection::Transaction(tx, _) => {
287                let mut conn = tx.lock().await;
288                // Handle PgConnection connection
289                let res = history(&mut *conn, tenant_id, project_id, request).await?;
290                Ok(res)
291            }
292        }
293    }
294
295    fn in_transaction(&self) -> bool {
296        match self {
297            PGConnection::Transaction(_tx, _) => true,
298            _ => false,
299        }
300    }
301
302    async fn transaction<'a>(
303        &'a self,
304        is_updating_sequence: bool,
305    ) -> Result<Self, OperationOutcomeError> {
306        let tx = create_transaction(self, is_updating_sequence).await?;
307        Ok(PGConnection::Transaction(tx, self.cache().clone()))
308    }
309
310    async fn commit(self) -> Result<(), OperationOutcomeError> {
311        match self {
312            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
313            PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
314        }
315    }
316
317    async fn rollback(self) -> Result<(), OperationOutcomeError> {
318        match self {
319            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
320            PGConnection::Transaction(tx, _) => {
321                let conn = Mutex::into_inner(
322                    Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
323                );
324
325                // Handle PgConnection connection
326                let res = conn.rollback().await.map_err(StoreError::from)?;
327                Ok(res)
328            }
329        }
330    }
331}
332
333fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
334    connection: Connection,
335    tenant: &'a TenantId,
336    project: &'a ProjectId,
337    author: &'a UserTokenClaims,
338    fhir_version: &'a SupportedFHIRVersions,
339    resource: &'a mut Resource,
340) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
341    async move {
342        utilities::set_resource_id(resource, None)?;
343        utilities::set_version_id(resource)?;
344        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
345        let result = sqlx::query_as!(
346                ReturnSingularResource,
347                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
348               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
349                RETURNING resource as "resource: FHIRJson<Resource>""#,
350                tenant.as_ref() as &str,
351                project.as_ref() as &str,
352                author.sub.as_ref() as &str,
353                // Useless cast so that macro has access to the type information.
354                // Otherwise it will not compile on type check.
355                fhir_version as &SupportedFHIRVersions,
356                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
357                false, // deleted
358                "POST",
359                author.resource_type.as_ref() as &str,
360                &FHIRMethod::Create as &FHIRMethod,
361            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
362        Ok(result.resource.0)
363    }
364}
365
366fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
367    connection: Connection,
368    tenant: &'a TenantId,
369    project: &'a ProjectId,
370    author: &'a UserTokenClaims,
371    fhir_version: &'a SupportedFHIRVersions,
372    resource: &'a mut Resource,
373    id: &'a str,
374) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
375    async move {
376        utilities::set_resource_id(resource, Some(id.to_string()))?;
377        utilities::set_version_id(resource)?;
378        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
379        let result = sqlx::query_as!(
380                ReturnSingularResource,
381                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
382                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
383                RETURNING resource as "resource: FHIRJson<Resource>""#,
384                tenant.as_ref() as &str,
385                project.as_ref() as &str,
386                author.sub.as_ref() as &str,
387                // Useless cast so that macro has access to the type information.
388                // Otherwise it will not compile on type check.
389                fhir_version as &SupportedFHIRVersions,
390                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
391                true, // deleted
392                "DELETE",
393                author.resource_type.as_ref() as &str,
394                &FHIRMethod::Delete as &FHIRMethod,
395            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
396
397        Ok(result.resource.0)
398    }
399}
400
401fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
402    connection: Connection,
403    tenant: &'a TenantId,
404    project: &'a ProjectId,
405    author: &'a UserTokenClaims,
406    fhir_version: &'a SupportedFHIRVersions,
407    resource: &'a mut Resource,
408    id: &'a str,
409) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
410    async move {
411        utilities::set_resource_id(resource, Some(id.to_string()))?;
412        utilities::set_version_id(resource)?;
413
414        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
415
416        let query = sqlx::query_as!(
417            ReturnSingularResource,
418            r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
419                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
420                RETURNING resource as "resource: FHIRJson<Resource>""#,
421            tenant.as_ref() as &str,
422            project.as_ref() as &str,
423            author.sub.as_ref() as &str,
424            // Useless cast so that macro has access to the type information.
425            // Otherwise it will not compile on type check.
426            fhir_version as &SupportedFHIRVersions,
427            &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
428            false, // deleted
429            "PUT",
430            author.resource_type.as_ref() as &str,
431            &FHIRMethod::Update as &FHIRMethod,
432        );
433
434        let result = query
435            .fetch_one(&mut *conn)
436            .await
437            .map_err(StoreError::from)?;
438
439        Ok(result.resource.0)
440    }
441}
442
443fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
444    connection: Connection,
445    tenant_id: &'a TenantId,
446    project_id: &'a ProjectId,
447    version_ids: &'a Vec<&'a VersionId>,
448) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
449    async move {
450        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
451
452        let mut query_builder: QueryBuilder<Postgres> =
453            QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
454
455        query_builder
456            .push_bind(tenant_id.as_ref())
457            .push(" AND project =")
458            .push_bind(project_id.as_ref());
459
460        query_builder.push(" AND version_id in (");
461
462        let mut separated = query_builder.separated(", ");
463        for version_id in version_ids.iter() {
464            separated.push_bind(version_id.as_ref());
465        }
466        separated.push_unseparated(")");
467
468        // To preserve sort order.
469        query_builder.push(" ORDER BY  array_position(array[");
470        let mut order_separator = query_builder.separated(", ");
471        for version_id in version_ids.iter() {
472            order_separator.push_bind(version_id.as_ref());
473        }
474        query_builder.push("], version_id)");
475
476        let query = query_builder.build_query_as();
477        let response: Vec<ReturnVersionedResource> = query
478            .fetch_all(&mut *conn)
479            .await
480            .map_err(StoreError::from)?;
481
482        Ok(response)
483    }
484}
485
486fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
487    connection: Connection,
488    tenant_id: &'a TenantId,
489    project_id: &'a ProjectId,
490    resource_type: &'a ResourceType,
491    resource_id: &'a ResourceId,
492) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
493    async move {
494        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
495        let response = sqlx::query!(
496            r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC LIMIT 1"#,
497            tenant_id.as_ref(),
498            project_id.as_ref(),
499            resource_id.as_ref(),
500            resource_type.as_ref(),
501        ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
502
503        // For deletes entry will contain deleted = true.
504        // In that case return None.
505        if let Some(true) = response.as_ref().map(|r| r.deleted) {
506            Ok(None)
507        } else {
508            Ok(response.map(|r| r.resource.0))
509        }
510    }
511}
512
513fn process_history_parameters<'a>(
514    parameters: &'a ParsedParameters,
515    clauses: &mut Separated<'_, 'a, Postgres, &str>,
516) -> Result<(), OperationOutcomeError> {
517    for parameter in parameters.parameters() {
518        match parameter {
519            ParsedParameter::Result(result_param) => match result_param.name.as_str() {
520                "_since" => {
521                    if let Some(value) = &result_param.value.get(0) {
522                        let date_time = parse_datetime(value.as_str()).map_err(|e| {
523                            OperationOutcomeError::fatal(
524                                IssueType::Invalid(None),
525                                format!("Invalid _since parameter datetime: {:?}", e),
526                            )
527                        })?;
528
529                        clauses.push(" created_at >= ").push_bind_unseparated(
530                            chrono::DateTime::try_from(date_time).map_err(|e| {
531                                OperationOutcomeError::fatal(
532                                    IssueType::Invalid(None),
533                                    format!("Invalid _since parameter datetime: {:?}", e),
534                                )
535                            })?,
536                        );
537                    }
538                }
539                "_count" | "_offset" => {
540                    // Ignore offset and count parameter as these parameters are held separately and not used in the where clause.
541                }
542                _ => {}
543            },
544            _ => {
545                return Err(OperationOutcomeError::fatal(
546                    IssueType::NotSupported(None),
547                    format!(
548                        "Parameter '{}' is not supported for history requests.",
549                        parameter.name()
550                    ),
551                ));
552            }
553        }
554    }
555
556    Ok(())
557}
558
559fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
560    connection: Connection,
561    tenant: &'a TenantId,
562    project: &'a ProjectId,
563    history_request: &'a HistoryRequest,
564) -> impl Future<Output = Result<Vec<ResourceHistoryValue>, OperationOutcomeError>> + Send + 'a {
565    async move {
566        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
567
568        let mut query_builder: QueryBuilder<Postgres> =
569            QueryBuilder::new(r#"SELECT resource, request_method FROM resources WHERE  "#);
570
571        let mut clauses = query_builder.separated(" AND ");
572        clauses
573            .push(" tenant = ")
574            .push_bind_unseparated(tenant.as_ref())
575            .push(" project = ")
576            .push_bind_unseparated(project.as_ref());
577
578        let history_parameters = match history_request {
579            HistoryRequest::Instance(history_instance_request) => {
580                &history_instance_request.parameters
581            }
582            HistoryRequest::Type(history_type_request) => &history_type_request.parameters,
583            HistoryRequest::System(system_request) => &system_request.parameters,
584        };
585
586        process_history_parameters(&history_parameters, &mut clauses)?;
587
588        match history_request {
589            HistoryRequest::Instance(history_instance_request) => {
590                clauses
591                    .push(" resource_type = ")
592                    .push_bind_unseparated(history_instance_request.resource_type.as_ref())
593                    .push(" id = ")
594                    .push_bind_unseparated(&history_instance_request.id);
595            }
596            HistoryRequest::Type(history_type_request) => {
597                clauses
598                    .push(" resource_type = ")
599                    .push_bind_unseparated(history_type_request.resource_type.as_ref());
600            }
601            HistoryRequest::System(_request) => {}
602        };
603
604        let count =
605            if let Some(ParsedParameter::Result(count_param)) = history_parameters.get("_count") {
606                // Enforce a maximum count of 100 to prevent abuse and performance issues.
607                std::cmp::min(
608                    100,
609                    count_param
610                        .value
611                        .get(0)
612                        .and_then(|v| v.parse::<usize>().ok())
613                        .unwrap_or(100),
614                )
615            } else {
616                100
617            };
618
619        query_builder
620            .push(" ORDER BY sequence DESC LIMIT ")
621            .push_bind(count as i64);
622
623        if let Some(ParsedParameter::Result(offset_param)) = history_parameters.get("_offset") {
624            let offset = std::cmp::max(
625                offset_param
626                    .value
627                    .get(0)
628                    .and_then(|v| v.parse::<usize>().ok())
629                    .unwrap_or(0),
630                0,
631            );
632
633            query_builder.push(" OFFSET ").push_bind(offset as i64);
634        }
635
636        let query = query_builder.build_query_as();
637
638        let result: Vec<HistoryValue> = query
639            .fetch_all(&mut *conn)
640            .await
641            .map_err(StoreError::from)?;
642
643        Ok(result
644            .into_iter()
645            .map(|r| ResourceHistoryValue {
646                resource: r.resource.0,
647                request_method: r.request_method,
648            })
649            .collect::<Vec<_>>())
650    }
651}