Skip to main content

haste_repository/pg/
sequence.rs

1use haste_fhir_model::r4::{
2    generated::resources::{Resource, ResourceType},
3    sqlx::FHIRJson,
4};
5use haste_fhir_operation_error::OperationOutcomeError;
6use haste_jwt::{ProjectId, ResourceId, TenantId};
7use sqlx::{Acquire, Postgres};
8
9use crate::{
10    pg::{PGConnection, StoreError},
11    sequence::{ResourcePollingValue, ResourceSequential},
12    types::FHIRMethod,
13};
14
15fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
16    connection: Connection,
17    tenant_id: &'a TenantId,
18    cur_sequence: u64,
19    count: Option<u64>,
20) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
21    async move {
22        let mut conn = connection.acquire().await.map_err(StoreError::from)?;
23        // Run as a transaction to ensure safe sequence retrieval.
24        // Run as seperate query.
25        // Isolation level must be set to allowe dirty reads from pg_locks.
26        // This is to ensure that we can read the safe sequence even if other transactions are in progress.
27        let safe_sequence =
28            sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
29                .fetch_one(&mut *conn)
30                .await
31                .map_err(StoreError::from)?
32                .max_safe_seq
33                .unwrap_or(0);
34
35        let result = sqlx::query_as!(
36            ResourcePollingValue,
37            r#"SELECT  id as "id: ResourceId", 
38                       tenant as "tenant: TenantId", 
39                       project as "project: ProjectId", 
40                       version_id, 
41                       resource_type as "resource_type: ResourceType", 
42                       fhir_method as "fhir_method: FHIRMethod", 
43                       sequence, 
44                       resource as "resource: FHIRJson<Resource>"
45            FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
46            tenant_id.as_ref() as &str,
47            cur_sequence as i64,
48            safe_sequence,
49            count.unwrap_or(100) as i64
50        )
51        .fetch_all(&mut *conn)
52        .await
53        .map_err(StoreError::from)?;
54
55        // if !result.is_empty() {
56        //     println!("safe_sequence: {:?}", safe_sequence);
57        // }
58
59        Ok(result)
60    }
61}
62
63impl ResourceSequential for PGConnection {
64    async fn get_sequence(
65        &self,
66        tenant_id: &TenantId,
67        sequence_id: u64,
68        count: Option<u64>,
69    ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
70        match self {
71            PGConnection::Pool(pool, _) => {
72                let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
73                Ok(res)
74            }
75            PGConnection::Transaction(tx, _) => {
76                let mut conn = tx.lock().await;
77
78                // Handle PgConnection connection
79                let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
80                Ok(res)
81            }
82        }
83    }
84}