1use crate::{
2 admin::TenantAuthAdmin,
3 pg::{PGConnection, StoreError},
4 types::tenant::{CreateTenant, Tenant, TenantSearchClaims},
5 utilities::{generate_id, validate_id},
6};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::TenantId;
9use sqlx::{Acquire, Postgres, QueryBuilder};
10
11fn create_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
12 connection: Connection,
13 tenant: CreateTenant,
14) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
15 async move {
16 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
17 let id = tenant.id.unwrap_or(TenantId::new(generate_id(None)));
18 validate_id(id.as_ref())?;
19
20 let tenant = sqlx::query_as!(
21 Tenant,
22 r#"INSERT INTO tenants (id, subscription_tier) VALUES ($1, $2) RETURNING id as "id: TenantId", subscription_tier"#,
23 id as TenantId,
24 tenant.subscription_tier.unwrap_or("free".to_string())
25 )
26 .fetch_one(&mut *conn)
27 .await
28 .map_err(StoreError::SQLXError)?;
29
30 Ok(tenant)
31 }
32}
33
34fn read_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
35 connection: Connection,
36 id: &'a str,
37) -> impl Future<Output = Result<Option<Tenant>, OperationOutcomeError>> + Send + 'a {
38 async move {
39 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
40 let tenant = sqlx::query_as!(
41 Tenant,
42 r#"SELECT id as "id: TenantId", subscription_tier FROM tenants where id = $1"#,
43 id
44 )
45 .fetch_optional(&mut *conn)
46 .await
47 .map_err(StoreError::SQLXError)?;
48
49 Ok(tenant)
50 }
51}
52
53fn update_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
54 connection: Connection,
55 tenant: Tenant,
56) -> impl Future<Output = Result<Tenant, OperationOutcomeError>> + Send + 'a {
57 async move {
58 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
59 let updated_tenant = sqlx::query_as!(
60 Tenant,
61 r#"UPDATE tenants SET subscription_tier = $1 WHERE id = $2 RETURNING id as "id: TenantId", subscription_tier"#,
62 tenant.subscription_tier,
63 tenant.id as TenantId,
64 )
65 .fetch_one(&mut *conn)
66 .await
67 .map_err(StoreError::SQLXError)?;
68
69 Ok(updated_tenant)
70 }
71}
72
73fn delete_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
74 connection: Connection,
75 id: &'a str,
76) -> impl Future<Output = Result<(), OperationOutcomeError>> + Send + 'a {
77 async move {
78 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
79 let _deleted_tenant = sqlx::query_as!(
80 Tenant,
81 r#"DELETE FROM tenants WHERE id = $1 RETURNING id as "id: TenantId", subscription_tier"#,
82 id
83 )
84 .fetch_optional(&mut *conn)
85 .await
86 .map_err(StoreError::SQLXError)?;
87
88 Ok(())
89 }
90}
91
92fn search_tenant<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
93 connection: Connection,
94 clauses: &'a TenantSearchClaims,
95) -> impl Future<Output = Result<Vec<Tenant>, OperationOutcomeError>> + Send + 'a {
96 async move {
97 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
98 let mut query_builder: QueryBuilder<Postgres> =
99 QueryBuilder::new(r#"SELECT id, subscription_tier FROM tenants WHERE "#);
100
101 if let Some(subscription_tier) = clauses.subscription_tier.as_ref() {
102 query_builder
103 .push(" subscription_tier = ")
104 .push_bind(subscription_tier);
105 }
106
107 let query = query_builder.build_query_as();
108
109 let tenants: Vec<Tenant> = query
110 .fetch_all(&mut *conn)
111 .await
112 .map_err(StoreError::from)?;
113
114 Ok(tenants)
115 }
116}
117
118impl<Key: AsRef<str> + Send + Sync>
119 TenantAuthAdmin<CreateTenant, Tenant, TenantSearchClaims, Tenant, Key> for PGConnection
120{
121 async fn create(
122 &self,
123 _tenant: &TenantId,
124 new_tenant: CreateTenant,
125 ) -> Result<Tenant, OperationOutcomeError> {
126 match self {
127 PGConnection::Pool(pool, _) => {
128 let res = create_tenant(pool, new_tenant).await?;
129 Ok(res)
130 }
131 PGConnection::Transaction(tx, _) => {
132 let mut tx = tx.lock().await;
133 let res = create_tenant(&mut *tx, new_tenant).await?;
134 Ok(res)
135 }
136 }
137 }
138
139 async fn read(
140 &self,
141 _tenant: &TenantId,
142 id: &Key,
143 ) -> Result<Option<Tenant>, haste_fhir_operation_error::OperationOutcomeError> {
144 match self {
145 PGConnection::Pool(pool, _) => {
146 let res = read_tenant(pool, id.as_ref()).await?;
147 Ok(res)
148 }
149 PGConnection::Transaction(tx, _) => {
150 let mut tx = tx.lock().await;
151 let res = read_tenant(&mut *tx, id.as_ref()).await?;
152 Ok(res)
153 }
154 }
155 }
156
157 async fn update(
158 &self,
159 _tenant: &TenantId,
160 model: Tenant,
161 ) -> Result<Tenant, haste_fhir_operation_error::OperationOutcomeError> {
162 match self {
163 PGConnection::Pool(pool, _) => {
164 let res = update_tenant(pool, model).await?;
165 Ok(res)
166 }
167 PGConnection::Transaction(tx, _) => {
168 let mut tx = tx.lock().await;
169 let res = update_tenant(&mut *tx, model).await?;
170 Ok(res)
171 }
172 }
173 }
174
175 async fn delete(
176 &self,
177 _tenant: &TenantId,
178 id: &Key,
179 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
180 match self {
181 PGConnection::Pool(pool, _) => {
182 let res = delete_tenant(pool, id.as_ref()).await?;
183 Ok(res)
184 }
185 PGConnection::Transaction(tx, _) => {
186 let mut tx = tx.lock().await;
187 let res = delete_tenant(&mut *tx, id.as_ref()).await?;
188 Ok(res)
189 }
190 }
191 }
192
193 async fn search(
194 &self,
195 _tenant: &TenantId,
196 claims: &TenantSearchClaims,
197 ) -> Result<Vec<Tenant>, OperationOutcomeError> {
198 match self {
199 PGConnection::Pool(pool, _) => {
200 let res = search_tenant(pool, claims).await?;
201 Ok(res)
202 }
203 PGConnection::Transaction(tx, _) => {
204 let mut tx = tx.lock().await;
205 let res = search_tenant(&mut *tx, claims).await?;
206 Ok(res)
207 }
208 }
209 }
210}