haste_repository/pg/
fhir.rs

1use crate::{
2    fhir::{CachePolicy, FHIRRepository, ResourcePollingValue},
3    pg::{PGConnection, StoreError},
4    types::{FHIRMethod, SupportedFHIRVersions},
5    utilities,
6};
7use haste_fhir_client::request::HistoryRequest;
8use haste_fhir_model::r4::{
9    generated::resources::{Resource, ResourceType},
10    sqlx::{FHIRJson, FHIRJsonRef},
11};
12use haste_fhir_operation_error::OperationOutcomeError;
13use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
14use moka::future::Cache;
15use sqlx::{Acquire, Postgres, QueryBuilder, Transaction};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18
19#[derive(sqlx::FromRow, Debug)]
20struct ReturnSingularResource {
21    resource: FHIRJson<Resource>,
22}
23
24#[derive(sqlx::FromRow, Debug)]
25struct ReturnVersionedResource {
26    resource: FHIRJson<Resource>,
27    version_id: VersionId,
28}
29
30async fn read_version_ids_from_cache<'a>(
31    cache: &Cache<VersionId, Resource>,
32    version_ids: &'a [&VersionId],
33) -> (Vec<Resource>, Vec<&'a VersionId>) {
34    let mut remaining_version_ids = vec![];
35    let mut cached_resources = vec![];
36    for version_id in version_ids.iter() {
37        if let Some(resource) = cache.get(*version_id).await {
38            cached_resources.push(resource)
39        } else {
40            remaining_version_ids.push(*version_id);
41        }
42    }
43
44    (cached_resources, remaining_version_ids)
45}
46
47async fn create_transaction(
48    connection: &PGConnection,
49    is_updating_sequence: bool,
50) -> Result<Arc<Mutex<Transaction<'static, Postgres>>>, OperationOutcomeError> {
51    match connection {
52        PGConnection::Pool(pool, _cache) => {
53            let tx = if is_updating_sequence {
54                pool.begin_with(
55                    "BEGIN; SELECT register_sequence_transaction('resources_sequence_seq')",
56                )
57                .await
58                .map_err(StoreError::from)?
59            } else {
60                pool.begin().await.map_err(StoreError::from)?
61            };
62
63            Ok(Arc::new(Mutex::new(tx)))
64        }
65        PGConnection::Transaction(tx, _) => Ok(tx.clone()), // Transaction doesn't live long enough so cannot create.
66    }
67}
68
69async fn commit_transaction(
70    tx: Arc<Mutex<Transaction<'static, Postgres>>>,
71) -> Result<(), OperationOutcomeError> {
72    let conn = Mutex::into_inner(Arc::try_unwrap(tx).map_err(|e| {
73        println!("Error during commit: {:?}", e);
74        StoreError::FailedCommitTransaction
75    })?);
76
77    // Handle PgConnection connection
78    let res = conn.commit().await.map_err(StoreError::from)?;
79    Ok(res)
80}
81
82impl FHIRRepository for PGConnection {
83    async fn create(
84        &self,
85        tenant: &TenantId,
86        project: &ProjectId,
87        author: &UserTokenClaims,
88        fhir_version: &SupportedFHIRVersions,
89        resource: &mut Resource,
90    ) -> Result<Resource, OperationOutcomeError> {
91        match &self {
92            PGConnection::Pool(_pool, _) => {
93                let tx = create_transaction(self, true).await?;
94                let res = {
95                    let mut conn = tx.lock().await;
96                    let res =
97                        create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
98                    res
99                };
100                commit_transaction(tx).await?;
101                Ok(res)
102            }
103            PGConnection::Transaction(tx, _) => {
104                let mut tx = tx.lock().await;
105                let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
106                Ok(res)
107            }
108        }
109    }
110
111    async fn delete(
112        &self,
113        tenant: &TenantId,
114        project: &ProjectId,
115        author: &UserTokenClaims,
116        fhir_version: &SupportedFHIRVersions,
117        resource: &mut Resource,
118        id: &str,
119    ) -> Result<Resource, OperationOutcomeError> {
120        match self {
121            PGConnection::Pool(_pool, _) => {
122                let tx = create_transaction(self, true).await?;
123                let res = {
124                    let mut conn = tx.lock().await;
125                    let res = delete(
126                        &mut *conn,
127                        tenant,
128                        project,
129                        author,
130                        fhir_version,
131                        resource,
132                        id,
133                    )
134                    .await?;
135                    res
136                };
137                commit_transaction(tx).await?;
138                Ok(res)
139            }
140            PGConnection::Transaction(tx, _) => {
141                let mut conn = tx.lock().await;
142                // Handle PgConnection connection
143                let res = delete(
144                    &mut *conn,
145                    tenant,
146                    project,
147                    author,
148                    fhir_version,
149                    resource,
150                    id,
151                )
152                .await?;
153                Ok(res)
154            }
155        }
156    }
157
158    async fn update(
159        &self,
160        tenant: &TenantId,
161        project: &ProjectId,
162        author: &UserTokenClaims,
163        fhir_version: &SupportedFHIRVersions,
164        resource: &mut Resource,
165        id: &str,
166    ) -> Result<Resource, OperationOutcomeError> {
167        match self {
168            PGConnection::Pool(_pool, _) => {
169                let tx = create_transaction(self, true).await?;
170                let res = {
171                    let mut conn = tx.lock().await;
172                    let res = update(
173                        &mut *conn,
174                        tenant,
175                        project,
176                        author,
177                        fhir_version,
178                        resource,
179                        id,
180                    )
181                    .await?;
182                    res
183                };
184
185                commit_transaction(tx).await?;
186                Ok(res)
187            }
188            PGConnection::Transaction(tx, _) => {
189                let mut conn = tx.lock().await;
190                // Handle PgConnection connection
191                let res = update(
192                    &mut *conn,
193                    tenant,
194                    project,
195                    author,
196                    fhir_version,
197                    resource,
198                    id,
199                )
200                .await?;
201                Ok(res)
202            }
203        }
204    }
205
206    async fn read_by_version_ids(
207        &self,
208        tenant_id: &TenantId,
209        project_id: &ProjectId,
210        version_ids: &[&VersionId],
211        cache_policy: CachePolicy,
212    ) -> Result<Vec<Resource>, OperationOutcomeError> {
213        if version_ids.is_empty() {
214            return Ok(vec![]);
215        }
216
217        let (cached_result, remaining_version_ids) =
218            read_version_ids_from_cache(self.cache(), &version_ids).await;
219
220        if remaining_version_ids.is_empty() {
221            return Ok(cached_result);
222        }
223
224        match self {
225            PGConnection::Pool(pool, cache) => {
226                let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
227                    .await?;
228
229                if cache_policy == CachePolicy::Cache {
230                    for v in res.iter() {
231                        cache
232                            .insert(v.version_id.clone(), v.resource.0.clone())
233                            .await;
234                    }
235                }
236
237                Ok(cached_result
238                    .into_iter()
239                    .chain(res.into_iter().map(|r| r.resource.0))
240                    .collect::<Vec<_>>())
241            }
242            PGConnection::Transaction(tx, cache) => {
243                let mut conn = tx.lock().await;
244                // Handle PgConnection connection
245                let res =
246                    read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
247                        .await?;
248
249                if cache_policy == CachePolicy::Cache {
250                    for v in res.iter() {
251                        cache
252                            .insert(v.version_id.clone(), v.resource.0.clone())
253                            .await;
254                    }
255                }
256
257                Ok(cached_result
258                    .into_iter()
259                    .chain(res.into_iter().map(|r| r.resource.0))
260                    .collect::<Vec<_>>())
261            }
262        }
263    }
264
265    async fn read_latest(
266        &self,
267        tenant_id: &TenantId,
268        project_id: &ProjectId,
269        resource_type: &ResourceType,
270        resource_id: &ResourceId,
271    ) -> Result<Option<Resource>, OperationOutcomeError> {
272        match self {
273            PGConnection::Pool(pool, _) => {
274                let res =
275                    read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
276                Ok(res)
277            }
278            PGConnection::Transaction(tx, _) => {
279                let mut conn = tx.lock().await;
280                // Handle PgConnection connection
281                let res = read_latest(
282                    &mut *conn,
283                    tenant_id,
284                    project_id,
285                    resource_type,
286                    resource_id,
287                )
288                .await?;
289                Ok(res)
290            }
291        }
292    }
293
294    async fn history(
295        &self,
296        tenant_id: &TenantId,
297        project_id: &ProjectId,
298        request: &HistoryRequest,
299    ) -> Result<Vec<Resource>, OperationOutcomeError> {
300        match self {
301            PGConnection::Pool(pool, _) => {
302                let res = history(pool, tenant_id, project_id, request).await?;
303                Ok(res)
304            }
305            PGConnection::Transaction(tx, _) => {
306                let mut conn = tx.lock().await;
307                // Handle PgConnection connection
308                let res = history(&mut *conn, tenant_id, project_id, request).await?;
309                Ok(res)
310            }
311        }
312    }
313
314    async fn get_sequence(
315        &self,
316        tenant_id: &TenantId,
317        sequence_id: u64,
318        count: Option<u64>,
319    ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
320        match self {
321            PGConnection::Pool(pool, _) => {
322                let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
323                Ok(res)
324            }
325            PGConnection::Transaction(tx, _) => {
326                let mut conn = tx.lock().await;
327
328                // Handle PgConnection connection
329                let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
330                Ok(res)
331            }
332        }
333    }
334
335    fn in_transaction(&self) -> bool {
336        match self {
337            PGConnection::Transaction(_tx, _) => true,
338            _ => false,
339        }
340    }
341
342    async fn transaction<'a>(
343        &'a self,
344        is_updating_sequence: bool,
345    ) -> Result<Self, OperationOutcomeError> {
346        let tx = create_transaction(self, is_updating_sequence).await?;
347        Ok(PGConnection::Transaction(tx, self.cache().clone()))
348    }
349
350    async fn commit(self) -> Result<(), OperationOutcomeError> {
351        match self {
352            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
353            PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
354        }
355    }
356
357    async fn rollback(self) -> Result<(), OperationOutcomeError> {
358        match self {
359            PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
360            PGConnection::Transaction(tx, _) => {
361                let conn = Mutex::into_inner(
362                    Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
363                );
364
365                // Handle PgConnection connection
366                let res = conn.rollback().await.map_err(StoreError::from)?;
367                Ok(res)
368            }
369        }
370    }
371}
372
373fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
374    connection: Connection,
375    tenant: &'a TenantId,
376    project: &'a ProjectId,
377    author: &'a UserTokenClaims,
378    fhir_version: &'a SupportedFHIRVersions,
379    resource: &'a mut Resource,
380) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
381    async move {
382        utilities::set_resource_id(resource, None)?;
383        utilities::set_version_id(resource)?;
384        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
385        let result = sqlx::query_as!(
386                ReturnSingularResource,
387                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
388               VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
389                RETURNING resource as "resource: FHIRJson<Resource>""#,
390                tenant.as_ref() as &str,
391                project.as_ref() as &str,
392                author.sub.as_ref() as &str,
393                // Useless cast so that macro has access to the type information.
394                // Otherwise it will not compile on type check.
395                fhir_version as &SupportedFHIRVersions,
396                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
397                false, // deleted
398                "POST",
399                author.resource_type.as_ref() as &str,
400                &FHIRMethod::Create as &FHIRMethod,
401            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
402        Ok(result.resource.0)
403    }
404}
405
406fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
407    connection: Connection,
408    tenant: &'a TenantId,
409    project: &'a ProjectId,
410    author: &'a UserTokenClaims,
411    fhir_version: &'a SupportedFHIRVersions,
412    resource: &'a mut Resource,
413    id: &'a str,
414) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
415    async move {
416        utilities::set_resource_id(resource, Some(id.to_string()))?;
417        utilities::set_version_id(resource)?;
418        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
419        let result = sqlx::query_as!(
420                ReturnSingularResource,
421                r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
422                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
423                RETURNING resource as "resource: FHIRJson<Resource>""#,
424                tenant.as_ref() as &str,
425                project.as_ref() as &str,
426                author.sub.as_ref() as &str,
427                // Useless cast so that macro has access to the type information.
428                // Otherwise it will not compile on type check.
429                fhir_version as &SupportedFHIRVersions,
430                &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
431                true, // deleted
432                "DELETE",
433                author.resource_type.as_ref() as &str,
434                &FHIRMethod::Delete as &FHIRMethod,
435            ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
436
437        Ok(result.resource.0)
438    }
439}
440
441fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
442    connection: Connection,
443    tenant: &'a TenantId,
444    project: &'a ProjectId,
445    author: &'a UserTokenClaims,
446    fhir_version: &'a SupportedFHIRVersions,
447    resource: &'a mut Resource,
448    id: &'a str,
449) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
450    async move {
451        utilities::set_resource_id(resource, Some(id.to_string()))?;
452        utilities::set_version_id(resource)?;
453
454        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
455
456        let query = sqlx::query_as!(
457            ReturnSingularResource,
458            r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method) 
459                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
460                RETURNING resource as "resource: FHIRJson<Resource>""#,
461            tenant.as_ref() as &str,
462            project.as_ref() as &str,
463            author.sub.as_ref() as &str,
464            // Useless cast so that macro has access to the type information.
465            // Otherwise it will not compile on type check.
466            fhir_version as &SupportedFHIRVersions,
467            &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
468            false, // deleted
469            "PUT",
470            author.resource_type.as_ref() as &str,
471            &FHIRMethod::Update as &FHIRMethod,
472        );
473
474        let result = query
475            .fetch_one(&mut *conn)
476            .await
477            .map_err(StoreError::from)?;
478
479        Ok(result.resource.0)
480    }
481}
482
483fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
484    connection: Connection,
485    tenant_id: &'a TenantId,
486    project_id: &'a ProjectId,
487    version_ids: &'a Vec<&'a VersionId>,
488) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
489    async move {
490        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
491
492        let mut query_builder: QueryBuilder<Postgres> =
493            QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
494
495        query_builder
496            .push_bind(tenant_id.as_ref())
497            .push(" AND project =")
498            .push_bind(project_id.as_ref());
499
500        query_builder.push(" AND version_id in (");
501
502        let mut separated = query_builder.separated(", ");
503        for version_id in version_ids.iter() {
504            separated.push_bind(version_id.as_ref());
505        }
506        separated.push_unseparated(")");
507
508        // To preserve sort order.
509        query_builder.push(" ORDER BY  array_position(array[");
510        let mut order_separator = query_builder.separated(", ");
511        for version_id in version_ids.iter() {
512            order_separator.push_bind(version_id.as_ref());
513        }
514        query_builder.push("], version_id)");
515
516        let query = query_builder.build_query_as();
517        let response: Vec<ReturnVersionedResource> = query
518            .fetch_all(&mut *conn)
519            .await
520            .map_err(StoreError::from)?;
521
522        Ok(response)
523    }
524}
525
526fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
527    connection: Connection,
528    tenant_id: &'a TenantId,
529    project_id: &'a ProjectId,
530    resource_type: &'a ResourceType,
531    resource_id: &'a ResourceId,
532) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
533    async move {
534        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
535        let response = sqlx::query!(
536            r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC"#,
537            tenant_id.as_ref(),
538            project_id.as_ref(),
539            resource_id.as_ref(),
540            resource_type.as_ref(),
541        ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
542
543        // For deletes entry will contain deleted = true.
544        // In that case return None.
545        if let Some(true) = response.as_ref().map(|r| r.deleted) {
546            Ok(None)
547        } else {
548            Ok(response.map(|r| r.resource.0))
549        }
550    }
551}
552
553fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
554    connection: Connection,
555    tenant_id: &'a TenantId,
556    project_id: &'a ProjectId,
557    history_request: &'a HistoryRequest,
558) -> impl Future<Output = Result<Vec<Resource>, OperationOutcomeError>> + Send + 'a {
559    async move {
560        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
561        match history_request {
562            HistoryRequest::Instance(history_instance_request) => {
563                let response = sqlx::query_as!(ReturnSingularResource,
564                    r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC LIMIT 100"#,
565                        tenant_id.as_ref()  as &str,
566                        project_id.as_ref() as &str,
567                        history_instance_request.id.as_ref() as &str,
568                        history_instance_request.resource_type.as_ref() as &str
569                    ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
570
571                Ok(response.into_iter().map(|r| r.resource.0).collect())
572            }
573            HistoryRequest::Type(history_type_request) => {
574                let response = sqlx::query_as!(ReturnSingularResource,
575                    r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 AND resource_type = $3 ORDER BY sequence DESC LIMIT 100"#,
576                        tenant_id.as_ref()  as &str,
577                        project_id.as_ref() as &str,
578                        history_type_request.resource_type.as_ref() as &str
579                    ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
580
581                Ok(response.into_iter().map(|r| r.resource.0).collect())
582            }
583            HistoryRequest::System(_request) => {
584                let response = sqlx::query_as!(ReturnSingularResource,
585                    r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 ORDER BY sequence DESC LIMIT 100"#,
586                        tenant_id.as_ref()  as &str,
587                        project_id.as_ref() as &str
588                    ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
589
590                Ok(response.into_iter().map(|r| r.resource.0).collect())
591            }
592        }
593    }
594}
595
596fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
597    connection: Connection,
598    tenant_id: &'a TenantId,
599    cur_sequence: u64,
600    count: Option<u64>,
601) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
602    async move {
603        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
604        // Run as a transaction to ensure safe sequence retrieval.
605        // Run as seperate query.
606        // Isolation level must be set to allowe dirty reads from pg_locks.
607        // This is to ensure that we can read the safe sequence even if other transactions are in progress.
608        let safe_sequence =
609            sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
610                .fetch_one(&mut *conn)
611                .await
612                .map_err(StoreError::from)?
613                .max_safe_seq
614                .unwrap_or(0);
615
616        let result = sqlx::query_as!(
617            ResourcePollingValue,
618            r#"SELECT  id as "id: ResourceId", 
619                       tenant as "tenant: TenantId", 
620                       project as "project: ProjectId", 
621                       version_id, 
622                       resource_type as "resource_type: ResourceType", 
623                       fhir_method as "fhir_method: FHIRMethod", 
624                       sequence, 
625                       resource as "resource: FHIRJson<Resource>"
626            FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
627            tenant_id.as_ref() as &str,
628            cur_sequence as i64,
629            safe_sequence,
630            count.unwrap_or(100) as i64
631        )
632        .fetch_all(&mut *conn)
633        .await
634        .map_err(StoreError::from)?;
635
636        // if !result.is_empty() {
637        //     println!("safe_sequence: {:?}", safe_sequence);
638        // }
639
640        Ok(result)
641    }
642}