Skip to main content

haste_repository/pg/
fhir.rs

1use crate::{
2    fhir::{CachePolicy, FHIRRepository, ResourcePollingValue},
3    pg::{
4        PGConnection, StoreError,
5        utilities::{commit_transaction, create_transaction},
6    },
7    types::{FHIRMethod, SupportedFHIRVersions},
8    utilities,
9};
10use haste_fhir_client::{
11    request::HistoryRequest,
12    url::{ParsedParameter, ParsedParameters},
13};
14use haste_fhir_model::r4::{
15    datetime::parse_datetime,
16    generated::{
17        resources::{Resource, ResourceType},
18        terminology::IssueType,
19    },
20    sqlx::{FHIRJson, FHIRJsonRef},
21};
22use haste_fhir_operation_error::OperationOutcomeError;
23use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
24use moka::future::Cache;
25use sqlx::{Acquire, Postgres, QueryBuilder, query_builder::Separated};
26use std::sync::Arc;
27use tokio::sync::Mutex;
28
29#[derive(sqlx::FromRow, Debug)]
30struct ReturnSingularResource {
31    resource: FHIRJson<Resource>,
32}
33
34#[derive(sqlx::FromRow, Debug)]
35struct ReturnVersionedResource {
36    resource: FHIRJson<Resource>,
37    version_id: VersionId,
38}
39
40async fn read_version_ids_from_cache<'a>(
41    cache: &Cache<VersionId, Resource>,
42    version_ids: &'a [&VersionId],
43) -> (Vec<Resource>, Vec<&'a VersionId>) {
44    let mut remaining_version_ids = vec![];
45    let mut cached_resources = vec![];
46    for version_id in version_ids.iter() {
47        if let Some(resource) = cache.get(*version_id).await {
48            cached_resources.push(resource)
49        } else {
50            remaining_version_ids.push(*version_id);
51        }
52    }
53
54    (cached_resources, remaining_version_ids)
55}
56
57impl FHIRRepository for PGConnection {
58    async fn create(
59        &self,
60        tenant: &TenantId,
61        project: &ProjectId,
62        author: &UserTokenClaims,
63        fhir_version: &SupportedFHIRVersions,
64        resource: &mut Resource,
65    ) -> Result<Resource, OperationOutcomeError> {
66        match &self {
67            PGConnection::Pool(_pool, _) => {
68                let tx = create_transaction(self, true).await?;
69                let res = {
70                    let mut conn = tx.lock().await;
71                    let res =
72                        create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
73                    res
74                };
75                commit_transaction(tx).await?;
76                Ok(res)
77            }
78            PGConnection::Transaction(tx, _) => {
79                let mut tx = tx.lock().await;
80                let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
81                Ok(res)
82            }
83        }
84    }
85
86    async fn delete(
87        &self,
88        tenant: &TenantId,
89        project: &ProjectId,
90        author: &UserTokenClaims,
91        fhir_version: &SupportedFHIRVersions,
92        resource: &mut Resource,
93        id: &str,
94    ) -> Result<Resource, OperationOutcomeError> {
95        match self {
96            PGConnection::Pool(_pool, _) => {
97                let tx = create_transaction(self, true).await?;
98                let res = {
99                    let mut conn = tx.lock().await;
100                    let res = delete(
101                        &mut *conn,
102                        tenant,
103                        project,
104                        author,
105                        fhir_version,
106                        resource,
107                        id,
108                    )
109                    .await?;
110                    res
111                };
112                commit_transaction(tx).await?;
113                Ok(res)
114            }
115            PGConnection::Transaction(tx, _) => {
116                let mut conn = tx.lock().await;
117                // Handle PgConnection connection
118                let res = delete(
119                    &mut *conn,
120                    tenant,
121                    project,
122                    author,
123                    fhir_version,
124                    resource,
125                    id,
126                )
127                .await?;
128                Ok(res)
129            }
130        }
131    }
132
133    async fn update(
134        &self,
135        tenant: &TenantId,
136        project: &ProjectId,
137        author: &UserTokenClaims,
138        fhir_version: &SupportedFHIRVersions,
139        resource: &mut Resource,
140        id: &str,
141    ) -> Result<Resource, OperationOutcomeError> {
142        match self {
143            PGConnection::Pool(_pool, _) => {
144                let tx = create_transaction(self, true).await?;
145                let res = {
146                    let mut conn = tx.lock().await;
147                    let res = update(
148                        &mut *conn,
149                        tenant,
150                        project,
151                        author,
152                        fhir_version,
153                        resource,
154                        id,
155                    )
156                    .await?;
157                    res
158                };
159
160                commit_transaction(tx).await?;
161                Ok(res)
162            }
163            PGConnection::Transaction(tx, _) => {
164                let mut conn = tx.lock().await;
165                // Handle PgConnection connection
166                let res = update(
167                    &mut *conn,
168                    tenant,
169                    project,
170                    author,
171                    fhir_version,
172                    resource,
173                    id,
174                )
175                .await?;
176                Ok(res)
177            }
178        }
179    }
180
181    async fn read_by_version_ids(
182        &self,
183        tenant_id: &TenantId,
184        project_id: &ProjectId,
185        version_ids: &[&VersionId],
186        cache_policy: CachePolicy,
187    ) -> Result<Vec<Resource>, OperationOutcomeError> {
188        if version_ids.is_empty() {
189            return Ok(vec![]);
190        }
191
192        let (cached_result, remaining_version_ids) =
193            read_version_ids_from_cache(self.cache(), &version_ids).await;
194
195        if remaining_version_ids.is_empty() {
196            return Ok(cached_result);
197        }
198
199        match self {
200            PGConnection::Pool(pool, cache) => {
201                let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
202                    .await?;
203
204                if cache_policy == CachePolicy::Cache {
205                    for v in res.iter() {
206                        cache
207                            .insert(v.version_id.clone(), v.resource.0.clone())
208                            .await;
209                    }
210                }
211
212                Ok(cached_result
213                    .into_iter()
214                    .chain(res.into_iter().map(|r| r.resource.0))
215                    .collect::<Vec<_>>())
216            }
217            PGConnection::Transaction(tx, cache) => {
218                let mut conn = tx.lock().await;
219                // Handle PgConnection connection
220                let res =
221                    read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
222                        .await?;
223
224                if cache_policy == CachePolicy::Cache {
225                    for v in res.iter() {
226                        cache
227                            .insert(v.version_id.clone(), v.resource.0.clone())
228                            .await;
229                    }
230                }
231
232                Ok(cached_result
233                    .into_iter()
234                    .chain(res.into_iter().map(|r| r.resource.0))
235                    .collect::<Vec<_>>())
236            }
237        }
238    }
239
240    async fn read_latest(
241        &self,
242        tenant_id: &TenantId,
243        project_id: &ProjectId,
244        resource_type: &ResourceType,
245        resource_id: &ResourceId,
246    ) -> Result<Option<Resource>, OperationOutcomeError> {
247        match self {
248            PGConnection::Pool(pool, _) => {
249                let res =
250                    read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
251                Ok(res)
252            }
253            PGConnection::Transaction(tx, _) => {
254                let mut conn = tx.lock().await;
255                // Handle PgConnection connection
256                let res = read_latest(
257                    &mut *conn,
258                    tenant_id,
259                    project_id,
260                    resource_type,
261                    resource_id,
262                )
263                .await?;
264                Ok(res)
265            }
266        }
267    }
268
269    async fn history(
270        &self,
271        tenant_id: &TenantId,
272        project_id: &ProjectId,
273        request: &HistoryRequest,
274    ) -> Result<Vec<Resource>, OperationOutcomeError> {
275        match self {
276            PGConnection::Pool(pool, _) => {
277                let res = history(pool, tenant_id, project_id, request).await?;
278                Ok(res)
279            }
280            PGConnection::Transaction(tx, _) => {
281                let mut conn = tx.lock().await;
282                // Handle PgConnection connection
283                let res = history(&mut *conn, tenant_id, project_id, request).await?;
284                Ok(res)
285            }
286        }
287    }
288
289    async fn get_sequence(
290        &self,
291        tenant_id: &TenantId,
292        sequence_id: u64,
293        count: Option<u64>,
294    ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
295        match self {
296            PGConnection::Pool(pool, _) => {
297                let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
298                Ok(res)
299            }
300            PGConnection::Transaction(tx, _) => {
301                let mut conn = tx.lock().await;
302
303                // Handle PgConnection connection
304                let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
305                Ok(res)
306            }
307        }
308    }
309
310    fn in_transaction(&self) -> bool {
311        match self {
312            PGConnection::Transaction(_tx, _) => true,
313            _ => false,
314        }
315    }
316
317    async fn transaction<'a>(
318        &'a self,
319        is_updating_sequence: bool,
320    ) -> Result<Self, OperationOutcomeError> {
321        let tx = create_transaction(self, is_updating_sequence).await?;
322        Ok(PGConnection::Transaction(tx, self.cache().clone()))
323    }
324
325    async fn commit(self) -> Result<(), OperationOutcomeError> {
326        match self {
327            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
328            PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
329        }
330    }
331
332    async fn rollback(self) -> Result<(), OperationOutcomeError> {
333        match self {
334            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
335            PGConnection::Transaction(tx, _) => {
336                let conn = Mutex::into_inner(
337                    Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
338                );
339
340                // Handle PgConnection connection
341                let res = conn.rollback().await.map_err(StoreError::from)?;
342                Ok(res)
343            }
344        }
345    }
346}
347
348fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
349    connection: Connection,
350    tenant: &'a TenantId,
351    project: &'a ProjectId,
352    author: &'a UserTokenClaims,
353    fhir_version: &'a SupportedFHIRVersions,
354    resource: &'a mut Resource,
355) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
356    async move {
357        utilities::set_resource_id(resource, None)?;
358        utilities::set_version_id(resource)?;
359        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
360        let result = sqlx::query_as!(
361                ReturnSingularResource,
362                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
363               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
364                RETURNING resource as "resource: FHIRJson<Resource>""#,
365                tenant.as_ref() as &str,
366                project.as_ref() as &str,
367                author.sub.as_ref() as &str,
368                // Useless cast so that macro has access to the type information.
369                // Otherwise it will not compile on type check.
370                fhir_version as &SupportedFHIRVersions,
371                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
372                false, // deleted
373                "POST",
374                author.resource_type.as_ref() as &str,
375                &FHIRMethod::Create as &FHIRMethod,
376            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
377        Ok(result.resource.0)
378    }
379}
380
381fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
382    connection: Connection,
383    tenant: &'a TenantId,
384    project: &'a ProjectId,
385    author: &'a UserTokenClaims,
386    fhir_version: &'a SupportedFHIRVersions,
387    resource: &'a mut Resource,
388    id: &'a str,
389) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
390    async move {
391        utilities::set_resource_id(resource, Some(id.to_string()))?;
392        utilities::set_version_id(resource)?;
393        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
394        let result = sqlx::query_as!(
395                ReturnSingularResource,
396                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
397                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
398                RETURNING resource as "resource: FHIRJson<Resource>""#,
399                tenant.as_ref() as &str,
400                project.as_ref() as &str,
401                author.sub.as_ref() as &str,
402                // Useless cast so that macro has access to the type information.
403                // Otherwise it will not compile on type check.
404                fhir_version as &SupportedFHIRVersions,
405                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
406                true, // deleted
407                "DELETE",
408                author.resource_type.as_ref() as &str,
409                &FHIRMethod::Delete as &FHIRMethod,
410            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
411
412        Ok(result.resource.0)
413    }
414}
415
416fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
417    connection: Connection,
418    tenant: &'a TenantId,
419    project: &'a ProjectId,
420    author: &'a UserTokenClaims,
421    fhir_version: &'a SupportedFHIRVersions,
422    resource: &'a mut Resource,
423    id: &'a str,
424) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
425    async move {
426        utilities::set_resource_id(resource, Some(id.to_string()))?;
427        utilities::set_version_id(resource)?;
428
429        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
430
431        let query = sqlx::query_as!(
432            ReturnSingularResource,
433            r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
434                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
435                RETURNING resource as "resource: FHIRJson<Resource>""#,
436            tenant.as_ref() as &str,
437            project.as_ref() as &str,
438            author.sub.as_ref() as &str,
439            // Useless cast so that macro has access to the type information.
440            // Otherwise it will not compile on type check.
441            fhir_version as &SupportedFHIRVersions,
442            &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
443            false, // deleted
444            "PUT",
445            author.resource_type.as_ref() as &str,
446            &FHIRMethod::Update as &FHIRMethod,
447        );
448
449        let result = query
450            .fetch_one(&mut *conn)
451            .await
452            .map_err(StoreError::from)?;
453
454        Ok(result.resource.0)
455    }
456}
457
458fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
459    connection: Connection,
460    tenant_id: &'a TenantId,
461    project_id: &'a ProjectId,
462    version_ids: &'a Vec<&'a VersionId>,
463) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
464    async move {
465        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
466
467        let mut query_builder: QueryBuilder<Postgres> =
468            QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
469
470        query_builder
471            .push_bind(tenant_id.as_ref())
472            .push(" AND project =")
473            .push_bind(project_id.as_ref());
474
475        query_builder.push(" AND version_id in (");
476
477        let mut separated = query_builder.separated(", ");
478        for version_id in version_ids.iter() {
479            separated.push_bind(version_id.as_ref());
480        }
481        separated.push_unseparated(")");
482
483        // To preserve sort order.
484        query_builder.push(" ORDER BY  array_position(array[");
485        let mut order_separator = query_builder.separated(", ");
486        for version_id in version_ids.iter() {
487            order_separator.push_bind(version_id.as_ref());
488        }
489        query_builder.push("], version_id)");
490
491        let query = query_builder.build_query_as();
492        let response: Vec<ReturnVersionedResource> = query
493            .fetch_all(&mut *conn)
494            .await
495            .map_err(StoreError::from)?;
496
497        Ok(response)
498    }
499}
500
501fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
502    connection: Connection,
503    tenant_id: &'a TenantId,
504    project_id: &'a ProjectId,
505    resource_type: &'a ResourceType,
506    resource_id: &'a ResourceId,
507) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
508    async move {
509        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
510        let response = sqlx::query!(
511            r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC"#,
512            tenant_id.as_ref(),
513            project_id.as_ref(),
514            resource_id.as_ref(),
515            resource_type.as_ref(),
516        ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
517
518        // For deletes entry will contain deleted = true.
519        // In that case return None.
520        if let Some(true) = response.as_ref().map(|r| r.deleted) {
521            Ok(None)
522        } else {
523            Ok(response.map(|r| r.resource.0))
524        }
525    }
526}
527
528fn process_history_parameters<'a>(
529    parameters: &'a ParsedParameters,
530    clauses: &mut Separated<'_, 'a, Postgres, &str>,
531) -> Result<(), OperationOutcomeError> {
532    for parameter in parameters.parameters() {
533        match parameter {
534            ParsedParameter::Result(result_param) => match result_param.name.as_str() {
535                "_since" => {
536                    if let Some(value) = &result_param.value.get(0) {
537                        let date_time = parse_datetime(value.as_str()).map_err(|e| {
538                            OperationOutcomeError::fatal(
539                                IssueType::Invalid(None),
540                                format!("Invalid _since parameter datetime: {:?}", e),
541                            )
542                        })?;
543
544                        clauses.push(" created_at >= ").push_bind_unseparated(
545                            chrono::DateTime::try_from(date_time).map_err(|e| {
546                                OperationOutcomeError::fatal(
547                                    IssueType::Invalid(None),
548                                    format!("Invalid _since parameter datetime: {:?}", e),
549                                )
550                            })?,
551                        );
552                    }
553                }
554                _ => {}
555            },
556            _ => {
557                return Err(OperationOutcomeError::fatal(
558                    IssueType::NotSupported(None),
559                    format!(
560                        "Parameter '{}' is not supported for history requests.",
561                        parameter.name()
562                    ),
563                ));
564            }
565        }
566    }
567
568    Ok(())
569}
570
571fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
572    connection: Connection,
573    tenant: &'a TenantId,
574    project: &'a ProjectId,
575    history_request: &'a HistoryRequest,
576) -> impl Future<Output = Result<Vec<Resource>, OperationOutcomeError>> + Send + 'a {
577    async move {
578        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
579
580        let mut query_builder: QueryBuilder<Postgres> =
581            QueryBuilder::new(r#"SELECT resource FROM resources WHERE  "#);
582
583        let mut clauses = query_builder.separated(" AND ");
584        clauses
585            .push(" tenant = ")
586            .push_bind_unseparated(tenant.as_ref())
587            .push(" project = ")
588            .push_bind_unseparated(project.as_ref());
589
590        let history_parameters = match history_request {
591            HistoryRequest::Instance(history_instance_request) => {
592                &history_instance_request.parameters
593            }
594            HistoryRequest::Type(history_type_request) => &history_type_request.parameters,
595            HistoryRequest::System(system_request) => &system_request.parameters,
596        };
597
598        process_history_parameters(&history_parameters, &mut clauses)?;
599
600        match history_request {
601            HistoryRequest::Instance(history_instance_request) => {
602                clauses
603                    .push(" resource_type = ")
604                    .push_bind_unseparated(history_instance_request.resource_type.as_ref())
605                    .push(" id = ")
606                    .push_bind_unseparated(&history_instance_request.id);
607            }
608            HistoryRequest::Type(history_type_request) => {
609                clauses
610                    .push(" resource_type = ")
611                    .push_bind_unseparated(history_type_request.resource_type.as_ref());
612            }
613            HistoryRequest::System(_request) => {}
614        };
615
616        query_builder.push(" ORDER BY sequence DESC LIMIT 100");
617
618        let query = query_builder.build_query_as();
619
620        let result: Vec<ReturnSingularResource> = query
621            .fetch_all(&mut *conn)
622            .await
623            .map_err(StoreError::from)?;
624
625        Ok(result.into_iter().map(|r| r.resource.0).collect::<Vec<_>>())
626    }
627}
628
629fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
630    connection: Connection,
631    tenant_id: &'a TenantId,
632    cur_sequence: u64,
633    count: Option<u64>,
634) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
635    async move {
636        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
637        // Run as a transaction to ensure safe sequence retrieval.
638        // Run as seperate query.
639        // Isolation level must be set to allowe dirty reads from pg_locks.
640        // This is to ensure that we can read the safe sequence even if other transactions are in progress.
641        let safe_sequence =
642            sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
643                .fetch_one(&mut *conn)
644                .await
645                .map_err(StoreError::from)?
646                .max_safe_seq
647                .unwrap_or(0);
648
649        let result = sqlx::query_as!(
650            ResourcePollingValue,
651            r#"SELECT  id as "id: ResourceId", 
652                       tenant as "tenant: TenantId", 
653                       project as "project: ProjectId", 
654                       version_id, 
655                       resource_type as "resource_type: ResourceType", 
656                       fhir_method as "fhir_method: FHIRMethod", 
657                       sequence, 
658                       resource as "resource: FHIRJson<Resource>"
659            FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
660            tenant_id.as_ref() as &str,
661            cur_sequence as i64,
662            safe_sequence,
663            count.unwrap_or(100) as i64
664        )
665        .fetch_all(&mut *conn)
666        .await
667        .map_err(StoreError::from)?;
668
669        // if !result.is_empty() {
670        //     println!("safe_sequence: {:?}", safe_sequence);
671        // }
672
673        Ok(result)
674    }
675}