1use crate::{
2 admin::TenantAuthAdmin,
3 pg::{PGConnection, StoreError},
4 types::tenant::{CreateTenant, Tenant, TenantSearchClaims},
5 utilities::{generate_id, validate_id},
6};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::TenantId;
9use sqlx::{Acquire, Postgres, QueryBuilder};
10
11fn create_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
12 connection: Connection,
13 tenant: CreateTenant,
14) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
15 async move {
16 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
17 let id = tenant.id.unwrap_or(TenantId::new(generate_id(None)));
18 validate_id(id.as_ref())?;
19
20 let result = sqlx::query_as!(
21 Tenant,
22 r#"INSERT INTO tenants (id, subscription_tier) VALUES ($1, $2) RETURNING id as "id: TenantId", subscription_tier"#,
23 id as TenantId,
24 tenant.subscription_tier.unwrap_or("free".to_string())
25 )
26 .fetch_one(&mut *conn)
27 .await;
28
29 if let Err(res) = result.as_ref()
30 && let sqlx::Error::Database(db_error) = res
31 && db_error.code().as_deref() == Some("23505")
32 {
33 println!("Duplicate tenant ID detected");
34 Err(StoreError::Duplicate.into())
35 } else {
36 Ok(result.map_err(StoreError::SQLXError)?)
37 }
38 }
39}
40
41fn read_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
42 connection: Connection,
43 id: &'a str,
44) -> impl Future<Output = Result<Option<Tenant>, OperationOutcomeError>> + Send + 'a {
45 async move {
46 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
47 let tenant = sqlx::query_as!(
48 Tenant,
49 r#"SELECT id as "id: TenantId", subscription_tier FROM tenants where id = $1"#,
50 id
51 )
52 .fetch_optional(&mut *conn)
53 .await
54 .map_err(StoreError::SQLXError)?;
55
56 Ok(tenant)
57 }
58}
59
60fn update_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
61 connection: Connection,
62 tenant: Tenant,
63) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
64 async move {
65 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
66 let updated_tenant = sqlx::query_as!(
67 Tenant,
68 r#"UPDATE tenants SET subscription_tier = $1 WHERE id = $2 RETURNING id as "id: TenantId", subscription_tier"#,
69 tenant.subscription_tier,
70 tenant.id as TenantId,
71 )
72 .fetch_one(&mut *conn)
73 .await
74 .map_err(StoreError::SQLXError)?;
75
76 Ok(updated_tenant)
77 }
78}
79
80fn delete_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
81 connection: Connection,
82 id: &'a str,
83) -> impl Future<Output = Result<(), OperationOutcomeError>> + Send + 'a {
84 async move {
85 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
86 let _deleted_tenant = sqlx::query_as!(
87 Tenant,
88 r#"DELETE FROM tenants WHERE id = $1 RETURNING id as "id: TenantId", subscription_tier"#,
89 id
90 )
91 .fetch_optional(&mut *conn)
92 .await
93 .map_err(StoreError::SQLXError)?;
94
95 Ok(())
96 }
97}
98
99fn search_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
100 connection: Connection,
101 clauses: &'a TenantSearchClaims,
102) -> impl Future<Output = Result<Vec<Tenant>, OperationOutcomeError>> + Send + 'a {
103 async move {
104 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
105 let mut query_builder: QueryBuilder<Postgres> =
106 QueryBuilder::new(r#"SELECT id, subscription_tier FROM tenants WHERE "#);
107
108 if let Some(subscription_tier) = clauses.subscription_tier.as_ref() {
109 query_builder
110 .push(" subscription_tier = ")
111 .push_bind(subscription_tier);
112 }
113
114 let query = query_builder.build_query_as();
115
116 let tenants: Vec<Tenant> = query
117 .fetch_all(&mut *conn)
118 .await
119 .map_err(StoreError::from)?;
120
121 Ok(tenants)
122 }
123}
124
125impl<Key: AsRef<str> + Send + Sync>
126 TenantAuthAdmin<CreateTenant, Tenant, TenantSearchClaims, Tenant, Key> for PGConnection
127{
128 async fn create(
129 &self,
130 _tenant: &TenantId,
131 new_tenant: CreateTenant,
132 ) -> Result<Tenant, OperationOutcomeError> {
133 match self {
134 PGConnection::Pool(pool, _) => {
135 let res = create_tenant(pool, new_tenant).await?;
136 Ok(res)
137 }
138 PGConnection::Transaction(tx, _) => {
139 let mut tx = tx.lock().await;
140 let res = create_tenant(&mut *tx, new_tenant).await?;
141 Ok(res)
142 }
143 }
144 }
145
146 async fn read(
147 &self,
148 _tenant: &TenantId,
149 id: &Key,
150 ) -> Result<Option<Tenant>, haste_fhir_operation_error::OperationOutcomeError> {
151 match self {
152 PGConnection::Pool(pool, _) => {
153 let res = read_tenant(pool, id.as_ref()).await?;
154 Ok(res)
155 }
156 PGConnection::Transaction(tx, _) => {
157 let mut tx = tx.lock().await;
158 let res = read_tenant(&mut *tx, id.as_ref()).await?;
159 Ok(res)
160 }
161 }
162 }
163
164 async fn update(
165 &self,
166 _tenant: &TenantId,
167 model: Tenant,
168 ) -> Result<Tenant, haste_fhir_operation_error::OperationOutcomeError> {
169 match self {
170 PGConnection::Pool(pool, _) => {
171 let res = update_tenant(pool, model).await?;
172 Ok(res)
173 }
174 PGConnection::Transaction(tx, _) => {
175 let mut tx = tx.lock().await;
176 let res = update_tenant(&mut *tx, model).await?;
177 Ok(res)
178 }
179 }
180 }
181
182 async fn delete(
183 &self,
184 _tenant: &TenantId,
185 id: &Key,
186 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
187 match self {
188 PGConnection::Pool(pool, _) => {
189 let res = delete_tenant(pool, id.as_ref()).await?;
190 Ok(res)
191 }
192 PGConnection::Transaction(tx, _) => {
193 let mut tx = tx.lock().await;
194 let res = delete_tenant(&mut *tx, id.as_ref()).await?;
195 Ok(res)
196 }
197 }
198 }
199
200 async fn search(
201 &self,
202 _tenant: &TenantId,
203 claims: &TenantSearchClaims,
204 ) -> Result<Vec<Tenant>, OperationOutcomeError> {
205 match self {
206 PGConnection::Pool(pool, _) => {
207 let res = search_tenant(pool, claims).await?;
208 Ok(res)
209 }
210 PGConnection::Transaction(tx, _) => {
211 let mut tx = tx.lock().await;
212 let res = search_tenant(&mut *tx, claims).await?;
213 Ok(res)
214 }
215 }
216 }
217}