haste_repository/pg/
tenant.rs

1use crate::{
2    admin::TenantAuthAdmin,
3    pg::{PGConnection, StoreError},
4    types::tenant::{CreateTenant, Tenant, TenantSearchClaims},
5    utilities::{generate_id, validate_id},
6};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::TenantId;
9use sqlx::{Acquire, Postgres, QueryBuilder};
10
11fn create_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
12    connection: Connection,
13    tenant: CreateTenant,
14) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
15    async move {
16        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
17        let id = tenant.id.unwrap_or(TenantId::new(generate_id(None)));
18        validate_id(id.as_ref())?;
19
20        let tenant = sqlx::query_as!(
21            Tenant,
22            r#"INSERT INTO tenants (id, subscription_tier) VALUES ($1, $2) RETURNING id as "id: TenantId", subscription_tier"#,
23            id as TenantId,
24            tenant.subscription_tier.unwrap_or("free".to_string())
25        )
26        .fetch_one(&mut *conn)
27        .await
28        .map_err(StoreError::SQLXError)?;
29
30        Ok(tenant)
31    }
32}
33
34fn read_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
35    connection: Connection,
36    id: &'a str,
37) -> impl Future<Output = Result<Option<Tenant>, OperationOutcomeError>> + Send + 'a {
38    async move {
39        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
40        let tenant = sqlx::query_as!(
41            Tenant,
42            r#"SELECT id as "id: TenantId", subscription_tier FROM tenants where id = $1"#,
43            id
44        )
45        .fetch_optional(&mut *conn)
46        .await
47        .map_err(StoreError::SQLXError)?;
48
49        Ok(tenant)
50    }
51}
52
53fn update_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
54    connection: Connection,
55    tenant: Tenant,
56) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
57    async move {
58        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
59        let updated_tenant = sqlx::query_as!(
60            Tenant,
61            r#"UPDATE tenants SET subscription_tier = $1 WHERE id = $2 RETURNING id as "id: TenantId", subscription_tier"#,
62            tenant.subscription_tier,
63            tenant.id as TenantId,
64        )
65        .fetch_one(&mut *conn)
66        .await
67        .map_err(StoreError::SQLXError)?;
68
69        Ok(updated_tenant)
70    }
71}
72
73fn delete_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
74    connection: Connection,
75    id: &'a str,
76) -> impl Future<Output = Result<(), OperationOutcomeError>> + Send + 'a {
77    async move {
78        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
79        let _deleted_tenant = sqlx::query_as!(
80            Tenant,
81            r#"DELETE FROM tenants WHERE id = $1 RETURNING id as "id: TenantId", subscription_tier"#,
82            id
83        )
84        .fetch_optional(&mut *conn)
85        .await
86        .map_err(StoreError::SQLXError)?;
87
88        Ok(())
89    }
90}
91
92fn search_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
93    connection: Connection,
94    clauses: &'a TenantSearchClaims,
95) -> impl Future<Output = Result<Vec<Tenant>, OperationOutcomeError>> + Send + 'a {
96    async move {
97        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
98        let mut query_builder: QueryBuilder<Postgres> =
99            QueryBuilder::new(r#"SELECT id, subscription_tier FROM tenants WHERE "#);
100
101        if let Some(subscription_tier) = clauses.subscription_tier.as_ref() {
102            query_builder
103                .push(" subscription_tier = ")
104                .push_bind(subscription_tier);
105        }
106
107        let query = query_builder.build_query_as();
108
109        let tenants: Vec<Tenant> = query
110            .fetch_all(&mut *conn)
111            .await
112            .map_err(StoreError::from)?;
113
114        Ok(tenants)
115    }
116}
117
118impl<Key: AsRef<str> + Send + Sync>
119    TenantAuthAdmin<CreateTenant, Tenant, TenantSearchClaims, Tenant, Key> for PGConnection
120{
121    async fn create(
122        &self,
123        _tenant: &TenantId,
124        new_tenant: CreateTenant,
125    ) -> Result<Tenant, OperationOutcomeError> {
126        match self {
127            PGConnection::Pool(pool, _) => {
128                let res = create_tenant(pool, new_tenant).await?;
129                Ok(res)
130            }
131            PGConnection::Transaction(tx, _) => {
132                let mut tx = tx.lock().await;
133                let res = create_tenant(&mut *tx, new_tenant).await?;
134                Ok(res)
135            }
136        }
137    }
138
139    async fn read(
140        &self,
141        _tenant: &TenantId,
142        id: &Key,
143    ) -> Result<Option<Tenant>, haste_fhir_operation_error::OperationOutcomeError> {
144        match self {
145            PGConnection::Pool(pool, _) => {
146                let res = read_tenant(pool, id.as_ref()).await?;
147                Ok(res)
148            }
149            PGConnection::Transaction(tx, _) => {
150                let mut tx = tx.lock().await;
151                let res = read_tenant(&mut *tx, id.as_ref()).await?;
152                Ok(res)
153            }
154        }
155    }
156
157    async fn update(
158        &self,
159        _tenant: &TenantId,
160        model: Tenant,
161    ) -> Result<Tenant, haste_fhir_operation_error::OperationOutcomeError> {
162        match self {
163            PGConnection::Pool(pool, _) => {
164                let res = update_tenant(pool, model).await?;
165                Ok(res)
166            }
167            PGConnection::Transaction(tx, _) => {
168                let mut tx = tx.lock().await;
169                let res = update_tenant(&mut *tx, model).await?;
170                Ok(res)
171            }
172        }
173    }
174
175    async fn delete(
176        &self,
177        _tenant: &TenantId,
178        id: &Key,
179    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
180        match self {
181            PGConnection::Pool(pool, _) => {
182                let res = delete_tenant(pool, id.as_ref()).await?;
183                Ok(res)
184            }
185            PGConnection::Transaction(tx, _) => {
186                let mut tx = tx.lock().await;
187                let res = delete_tenant(&mut *tx, id.as_ref()).await?;
188                Ok(res)
189            }
190        }
191    }
192
193    async fn search(
194        &self,
195        _tenant: &TenantId,
196        claims: &TenantSearchClaims,
197    ) -> Result<Vec<Tenant>, OperationOutcomeError> {
198        match self {
199            PGConnection::Pool(pool, _) => {
200                let res = search_tenant(pool, claims).await?;
201                Ok(res)
202            }
203            PGConnection::Transaction(tx, _) => {
204                let mut tx = tx.lock().await;
205                let res = search_tenant(&mut *tx, claims).await?;
206                Ok(res)
207            }
208        }
209    }
210}