Skip to main content

haste_repository/pg/
tenant.rs

1use crate::{
2    admin::TenantAuthAdmin,
3    pg::{PGConnection, StoreError},
4    types::tenant::{CreateTenant, Tenant, TenantSearchClaims},
5    utilities::{generate_id, validate_id},
6};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::TenantId;
9use sqlx::{Acquire, Postgres, QueryBuilder};
10
11fn create_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
12    connection: Connection,
13    tenant: CreateTenant,
14) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
15    async move {
16        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
17        let id = tenant.id.unwrap_or(TenantId::new(generate_id(None)));
18        validate_id(id.as_ref())?;
19
20        let result = sqlx::query_as!(
21            Tenant,
22            r#"INSERT INTO tenants (id, subscription_tier) VALUES ($1, $2) RETURNING id as "id: TenantId", subscription_tier"#,
23            id as TenantId,
24            tenant.subscription_tier.unwrap_or("free".to_string())
25        )
26        .fetch_one(&mut *conn)
27        .await;
28
29        if let Err(res) = result.as_ref()
30            && let sqlx::Error::Database(db_error) = res
31            && db_error.code().as_deref() == Some("23505")
32        {
33            println!("Duplicate tenant ID detected");
34            Err(StoreError::Duplicate.into())
35        } else {
36            Ok(result.map_err(StoreError::SQLXError)?)
37        }
38    }
39}
40
41fn read_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
42    connection: Connection,
43    id: &'a str,
44) -> impl Future<Output = Result<Option<Tenant>, OperationOutcomeError>> + Send + 'a {
45    async move {
46        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
47        let tenant = sqlx::query_as!(
48            Tenant,
49            r#"SELECT id as "id: TenantId", subscription_tier FROM tenants where id = $1"#,
50            id
51        )
52        .fetch_optional(&mut *conn)
53        .await
54        .map_err(StoreError::SQLXError)?;
55
56        Ok(tenant)
57    }
58}
59
60fn update_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
61    connection: Connection,
62    tenant: Tenant,
63) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
64    async move {
65        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
66        let updated_tenant = sqlx::query_as!(
67            Tenant,
68            r#"UPDATE tenants SET subscription_tier = $1 WHERE id = $2 RETURNING id as "id: TenantId", subscription_tier"#,
69            tenant.subscription_tier,
70            tenant.id as TenantId,
71        )
72        .fetch_one(&mut *conn)
73        .await
74        .map_err(StoreError::SQLXError)?;
75
76        Ok(updated_tenant)
77    }
78}
79
80fn delete_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
81    connection: Connection,
82    id: &'a str,
83) -> impl Future<Output = Result<(), OperationOutcomeError>> + Send + 'a {
84    async move {
85        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
86        let _deleted_tenant = sqlx::query_as!(
87            Tenant,
88            r#"DELETE FROM tenants WHERE id = $1 RETURNING id as "id: TenantId", subscription_tier"#,
89            id
90        )
91        .fetch_optional(&mut *conn)
92        .await
93        .map_err(StoreError::SQLXError)?;
94
95        Ok(())
96    }
97}
98
99fn search_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
100    connection: Connection,
101    clauses: &'a TenantSearchClaims,
102) -> impl Future<Output = Result<Vec<Tenant>, OperationOutcomeError>> + Send + 'a {
103    async move {
104        let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
105        let mut query_builder: QueryBuilder<Postgres> =
106            QueryBuilder::new(r#"SELECT id, subscription_tier FROM tenants WHERE "#);
107
108        if let Some(subscription_tier) = clauses.subscription_tier.as_ref() {
109            query_builder
110                .push(" subscription_tier = ")
111                .push_bind(subscription_tier);
112        }
113
114        let query = query_builder.build_query_as();
115
116        let tenants: Vec<Tenant> = query
117            .fetch_all(&mut *conn)
118            .await
119            .map_err(StoreError::from)?;
120
121        Ok(tenants)
122    }
123}
124
125impl<Key: AsRef<str> + Send + Sync>
126    TenantAuthAdmin<CreateTenant, Tenant, TenantSearchClaims, Tenant, Key> for PGConnection
127{
128    async fn create(
129        &self,
130        _tenant: &TenantId,
131        new_tenant: CreateTenant,
132    ) -> Result<Tenant, OperationOutcomeError> {
133        match self {
134            PGConnection::Pool(pool, _) => {
135                let res = create_tenant(pool, new_tenant).await?;
136                Ok(res)
137            }
138            PGConnection::Transaction(tx, _) => {
139                let mut tx = tx.lock().await;
140                let res = create_tenant(&mut *tx, new_tenant).await?;
141                Ok(res)
142            }
143        }
144    }
145
146    async fn read(
147        &self,
148        _tenant: &TenantId,
149        id: &Key,
150    ) -> Result<Option<Tenant>, haste_fhir_operation_error::OperationOutcomeError> {
151        match self {
152            PGConnection::Pool(pool, _) => {
153                let res = read_tenant(pool, id.as_ref()).await?;
154                Ok(res)
155            }
156            PGConnection::Transaction(tx, _) => {
157                let mut tx = tx.lock().await;
158                let res = read_tenant(&mut *tx, id.as_ref()).await?;
159                Ok(res)
160            }
161        }
162    }
163
164    async fn update(
165        &self,
166        _tenant: &TenantId,
167        model: Tenant,
168    ) -> Result<Tenant, haste_fhir_operation_error::OperationOutcomeError> {
169        match self {
170            PGConnection::Pool(pool, _) => {
171                let res = update_tenant(pool, model).await?;
172                Ok(res)
173            }
174            PGConnection::Transaction(tx, _) => {
175                let mut tx = tx.lock().await;
176                let res = update_tenant(&mut *tx, model).await?;
177                Ok(res)
178            }
179        }
180    }
181
182    async fn delete(
183        &self,
184        _tenant: &TenantId,
185        id: &Key,
186    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
187        match self {
188            PGConnection::Pool(pool, _) => {
189                let res = delete_tenant(pool, id.as_ref()).await?;
190                Ok(res)
191            }
192            PGConnection::Transaction(tx, _) => {
193                let mut tx = tx.lock().await;
194                let res = delete_tenant(&mut *tx, id.as_ref()).await?;
195                Ok(res)
196            }
197        }
198    }
199
200    async fn search(
201        &self,
202        _tenant: &TenantId,
203        claims: &TenantSearchClaims,
204    ) -> Result<Vec<Tenant>, OperationOutcomeError> {
205        match self {
206            PGConnection::Pool(pool, _) => {
207                let res = search_tenant(pool, claims).await?;
208                Ok(res)
209            }
210            PGConnection::Transaction(tx, _) => {
211                let mut tx = tx.lock().await;
212                let res = search_tenant(&mut *tx, claims).await?;
213                Ok(res)
214            }
215        }
216    }
217}