haste_repository/pg/
sequence.rs1use haste_fhir_model::r4::{
2 generated::resources::{Resource, ResourceType},
3 sqlx::FHIRJson,
4};
5use haste_fhir_operation_error::OperationOutcomeError;
6use haste_jwt::{ProjectId, ResourceId, TenantId};
7use sqlx::{Acquire, Postgres};
8
9use crate::{
10 pg::{PGConnection, StoreError},
11 sequence::{ResourcePollingValue, ResourceSequential},
12 types::FHIRMethod,
13};
14
15fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
16 connection: Connection,
17 tenant_id: &'a TenantId,
18 cur_sequence: u64,
19 count: Option<u64>,
20) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
21 async move {
22 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
23 let safe_sequence =
28 sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
29 .fetch_one(&mut *conn)
30 .await
31 .map_err(StoreError::from)?
32 .max_safe_seq
33 .unwrap_or(0);
34
35 let result = sqlx::query_as!(
36 ResourcePollingValue,
37 r#"SELECT id as "id: ResourceId",
38 tenant as "tenant: TenantId",
39 project as "project: ProjectId",
40 version_id,
41 resource_type as "resource_type: ResourceType",
42 fhir_method as "fhir_method: FHIRMethod",
43 sequence,
44 resource as "resource: FHIRJson<Resource>"
45 FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
46 tenant_id.as_ref() as &str,
47 cur_sequence as i64,
48 safe_sequence,
49 count.unwrap_or(100) as i64
50 )
51 .fetch_all(&mut *conn)
52 .await
53 .map_err(StoreError::from)?;
54
55 Ok(result)
60 }
61}
62
63impl ResourceSequential for PGConnection {
64 async fn get_sequence(
65 &self,
66 tenant_id: &TenantId,
67 sequence_id: u64,
68 count: Option<u64>,
69 ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
70 match self {
71 PGConnection::Pool(pool, _) => {
72 let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
73 Ok(res)
74 }
75 PGConnection::Transaction(tx, _) => {
76 let mut conn = tx.lock().await;
77
78 let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
80 Ok(res)
81 }
82 }
83 }
84}