1use crate::{
2 fhir::{CachePolicy, FHIRRepository, ResourcePollingValue},
3 pg::{PGConnection, StoreError},
4 types::{FHIRMethod, SupportedFHIRVersions},
5 utilities,
6};
7use haste_fhir_client::request::HistoryRequest;
8use haste_fhir_model::r4::{
9 generated::resources::{Resource, ResourceType},
10 sqlx::{FHIRJson, FHIRJsonRef},
11};
12use haste_fhir_operation_error::OperationOutcomeError;
13use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
14use moka::future::Cache;
15use sqlx::{Acquire, Postgres, QueryBuilder, Transaction};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18
19#[derive(sqlx::FromRow, Debug)]
20struct ReturnSingularResource {
21 resource: FHIRJson<Resource>,
22}
23
24#[derive(sqlx::FromRow, Debug)]
25struct ReturnVersionedResource {
26 resource: FHIRJson<Resource>,
27 version_id: VersionId,
28}
29
30async fn read_version_ids_from_cache<'a>(
31 cache: &Cache<VersionId, Resource>,
32 version_ids: &'a [&VersionId],
33) -> (Vec<Resource>, Vec<&'a VersionId>) {
34 let mut remaining_version_ids = vec![];
35 let mut cached_resources = vec![];
36 for version_id in version_ids.iter() {
37 if let Some(resource) = cache.get(*version_id).await {
38 cached_resources.push(resource)
39 } else {
40 remaining_version_ids.push(*version_id);
41 }
42 }
43
44 (cached_resources, remaining_version_ids)
45}
46
47async fn create_transaction(
48 connection: &PGConnection,
49 is_updating_sequence: bool,
50) -> Result<Arc<Mutex<Transaction<'static, Postgres>>>, OperationOutcomeError> {
51 match connection {
52 PGConnection::Pool(pool, _cache) => {
53 let tx = if is_updating_sequence {
54 pool.begin_with(
55 "BEGIN; SELECT register_sequence_transaction('resources_sequence_seq')",
56 )
57 .await
58 .map_err(StoreError::from)?
59 } else {
60 pool.begin().await.map_err(StoreError::from)?
61 };
62
63 Ok(Arc::new(Mutex::new(tx)))
64 }
65 PGConnection::Transaction(tx, _) => Ok(tx.clone()), }
67}
68
69async fn commit_transaction(
70 tx: Arc<Mutex<Transaction<'static, Postgres>>>,
71) -> Result<(), OperationOutcomeError> {
72 let conn = Mutex::into_inner(Arc::try_unwrap(tx).map_err(|e| {
73 println!("Error during commit: {:?}", e);
74 StoreError::FailedCommitTransaction
75 })?);
76
77 let res = conn.commit().await.map_err(StoreError::from)?;
79 Ok(res)
80}
81
82impl FHIRRepository for PGConnection {
83 async fn create(
84 &self,
85 tenant: &TenantId,
86 project: &ProjectId,
87 author: &UserTokenClaims,
88 fhir_version: &SupportedFHIRVersions,
89 resource: &mut Resource,
90 ) -> Result<Resource, OperationOutcomeError> {
91 match &self {
92 PGConnection::Pool(_pool, _) => {
93 let tx = create_transaction(self, true).await?;
94 let res = {
95 let mut conn = tx.lock().await;
96 let res =
97 create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
98 res
99 };
100 commit_transaction(tx).await?;
101 Ok(res)
102 }
103 PGConnection::Transaction(tx, _) => {
104 let mut tx = tx.lock().await;
105 let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
106 Ok(res)
107 }
108 }
109 }
110
111 async fn delete(
112 &self,
113 tenant: &TenantId,
114 project: &ProjectId,
115 author: &UserTokenClaims,
116 fhir_version: &SupportedFHIRVersions,
117 resource: &mut Resource,
118 id: &str,
119 ) -> Result<Resource, OperationOutcomeError> {
120 match self {
121 PGConnection::Pool(_pool, _) => {
122 let tx = create_transaction(self, true).await?;
123 let res = {
124 let mut conn = tx.lock().await;
125 let res = delete(
126 &mut *conn,
127 tenant,
128 project,
129 author,
130 fhir_version,
131 resource,
132 id,
133 )
134 .await?;
135 res
136 };
137 commit_transaction(tx).await?;
138 Ok(res)
139 }
140 PGConnection::Transaction(tx, _) => {
141 let mut conn = tx.lock().await;
142 let res = delete(
144 &mut *conn,
145 tenant,
146 project,
147 author,
148 fhir_version,
149 resource,
150 id,
151 )
152 .await?;
153 Ok(res)
154 }
155 }
156 }
157
158 async fn update(
159 &self,
160 tenant: &TenantId,
161 project: &ProjectId,
162 author: &UserTokenClaims,
163 fhir_version: &SupportedFHIRVersions,
164 resource: &mut Resource,
165 id: &str,
166 ) -> Result<Resource, OperationOutcomeError> {
167 match self {
168 PGConnection::Pool(_pool, _) => {
169 let tx = create_transaction(self, true).await?;
170 let res = {
171 let mut conn = tx.lock().await;
172 let res = update(
173 &mut *conn,
174 tenant,
175 project,
176 author,
177 fhir_version,
178 resource,
179 id,
180 )
181 .await?;
182 res
183 };
184
185 commit_transaction(tx).await?;
186 Ok(res)
187 }
188 PGConnection::Transaction(tx, _) => {
189 let mut conn = tx.lock().await;
190 let res = update(
192 &mut *conn,
193 tenant,
194 project,
195 author,
196 fhir_version,
197 resource,
198 id,
199 )
200 .await?;
201 Ok(res)
202 }
203 }
204 }
205
206 async fn read_by_version_ids(
207 &self,
208 tenant_id: &TenantId,
209 project_id: &ProjectId,
210 version_ids: &[&VersionId],
211 cache_policy: CachePolicy,
212 ) -> Result<Vec<Resource>, OperationOutcomeError> {
213 if version_ids.is_empty() {
214 return Ok(vec![]);
215 }
216
217 let (cached_result, remaining_version_ids) =
218 read_version_ids_from_cache(self.cache(), &version_ids).await;
219
220 if remaining_version_ids.is_empty() {
221 return Ok(cached_result);
222 }
223
224 match self {
225 PGConnection::Pool(pool, cache) => {
226 let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
227 .await?;
228
229 if cache_policy == CachePolicy::Cache {
230 for v in res.iter() {
231 cache
232 .insert(v.version_id.clone(), v.resource.0.clone())
233 .await;
234 }
235 }
236
237 Ok(cached_result
238 .into_iter()
239 .chain(res.into_iter().map(|r| r.resource.0))
240 .collect::<Vec<_>>())
241 }
242 PGConnection::Transaction(tx, cache) => {
243 let mut conn = tx.lock().await;
244 let res =
246 read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
247 .await?;
248
249 if cache_policy == CachePolicy::Cache {
250 for v in res.iter() {
251 cache
252 .insert(v.version_id.clone(), v.resource.0.clone())
253 .await;
254 }
255 }
256
257 Ok(cached_result
258 .into_iter()
259 .chain(res.into_iter().map(|r| r.resource.0))
260 .collect::<Vec<_>>())
261 }
262 }
263 }
264
265 async fn read_latest(
266 &self,
267 tenant_id: &TenantId,
268 project_id: &ProjectId,
269 resource_type: &ResourceType,
270 resource_id: &ResourceId,
271 ) -> Result<Option<Resource>, OperationOutcomeError> {
272 match self {
273 PGConnection::Pool(pool, _) => {
274 let res =
275 read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
276 Ok(res)
277 }
278 PGConnection::Transaction(tx, _) => {
279 let mut conn = tx.lock().await;
280 let res = read_latest(
282 &mut *conn,
283 tenant_id,
284 project_id,
285 resource_type,
286 resource_id,
287 )
288 .await?;
289 Ok(res)
290 }
291 }
292 }
293
294 async fn history(
295 &self,
296 tenant_id: &TenantId,
297 project_id: &ProjectId,
298 request: &HistoryRequest,
299 ) -> Result<Vec<Resource>, OperationOutcomeError> {
300 match self {
301 PGConnection::Pool(pool, _) => {
302 let res = history(pool, tenant_id, project_id, request).await?;
303 Ok(res)
304 }
305 PGConnection::Transaction(tx, _) => {
306 let mut conn = tx.lock().await;
307 let res = history(&mut *conn, tenant_id, project_id, request).await?;
309 Ok(res)
310 }
311 }
312 }
313
314 async fn get_sequence(
315 &self,
316 tenant_id: &TenantId,
317 sequence_id: u64,
318 count: Option<u64>,
319 ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
320 match self {
321 PGConnection::Pool(pool, _) => {
322 let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
323 Ok(res)
324 }
325 PGConnection::Transaction(tx, _) => {
326 let mut conn = tx.lock().await;
327
328 let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
330 Ok(res)
331 }
332 }
333 }
334
335 fn in_transaction(&self) -> bool {
336 match self {
337 PGConnection::Transaction(_tx, _) => true,
338 _ => false,
339 }
340 }
341
342 async fn transaction<'a>(
343 &'a self,
344 is_updating_sequence: bool,
345 ) -> Result<Self, OperationOutcomeError> {
346 let tx = create_transaction(self, is_updating_sequence).await?;
347 Ok(PGConnection::Transaction(tx, self.cache().clone()))
348 }
349
350 async fn commit(self) -> Result<(), OperationOutcomeError> {
351 match self {
352 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
353 PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
354 }
355 }
356
357 async fn rollback(self) -> Result<(), OperationOutcomeError> {
358 match self {
359 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
360 PGConnection::Transaction(tx, _) => {
361 let conn = Mutex::into_inner(
362 Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
363 );
364
365 let res = conn.rollback().await.map_err(StoreError::from)?;
367 Ok(res)
368 }
369 }
370 }
371}
372
373fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
374 connection: Connection,
375 tenant: &'a TenantId,
376 project: &'a ProjectId,
377 author: &'a UserTokenClaims,
378 fhir_version: &'a SupportedFHIRVersions,
379 resource: &'a mut Resource,
380) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
381 async move {
382 utilities::set_resource_id(resource, None)?;
383 utilities::set_version_id(resource)?;
384 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
385 let result = sqlx::query_as!(
386 ReturnSingularResource,
387 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
388 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
389 RETURNING resource as "resource: FHIRJson<Resource>""#,
390 tenant.as_ref() as &str,
391 project.as_ref() as &str,
392 author.sub.as_ref() as &str,
393 fhir_version as &SupportedFHIRVersions,
396 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
397 false, "POST",
399 author.resource_type.as_ref() as &str,
400 &FHIRMethod::Create as &FHIRMethod,
401 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
402 Ok(result.resource.0)
403 }
404}
405
406fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
407 connection: Connection,
408 tenant: &'a TenantId,
409 project: &'a ProjectId,
410 author: &'a UserTokenClaims,
411 fhir_version: &'a SupportedFHIRVersions,
412 resource: &'a mut Resource,
413 id: &'a str,
414) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
415 async move {
416 utilities::set_resource_id(resource, Some(id.to_string()))?;
417 utilities::set_version_id(resource)?;
418 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
419 let result = sqlx::query_as!(
420 ReturnSingularResource,
421 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
422 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
423 RETURNING resource as "resource: FHIRJson<Resource>""#,
424 tenant.as_ref() as &str,
425 project.as_ref() as &str,
426 author.sub.as_ref() as &str,
427 fhir_version as &SupportedFHIRVersions,
430 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
431 true, "DELETE",
433 author.resource_type.as_ref() as &str,
434 &FHIRMethod::Delete as &FHIRMethod,
435 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
436
437 Ok(result.resource.0)
438 }
439}
440
441fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
442 connection: Connection,
443 tenant: &'a TenantId,
444 project: &'a ProjectId,
445 author: &'a UserTokenClaims,
446 fhir_version: &'a SupportedFHIRVersions,
447 resource: &'a mut Resource,
448 id: &'a str,
449) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
450 async move {
451 utilities::set_resource_id(resource, Some(id.to_string()))?;
452 utilities::set_version_id(resource)?;
453
454 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
455
456 let query = sqlx::query_as!(
457 ReturnSingularResource,
458 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
459 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
460 RETURNING resource as "resource: FHIRJson<Resource>""#,
461 tenant.as_ref() as &str,
462 project.as_ref() as &str,
463 author.sub.as_ref() as &str,
464 fhir_version as &SupportedFHIRVersions,
467 &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
468 false, "PUT",
470 author.resource_type.as_ref() as &str,
471 &FHIRMethod::Update as &FHIRMethod,
472 );
473
474 let result = query
475 .fetch_one(&mut *conn)
476 .await
477 .map_err(StoreError::from)?;
478
479 Ok(result.resource.0)
480 }
481}
482
483fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
484 connection: Connection,
485 tenant_id: &'a TenantId,
486 project_id: &'a ProjectId,
487 version_ids: &'a Vec<&'a VersionId>,
488) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
489 async move {
490 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
491
492 let mut query_builder: QueryBuilder<Postgres> =
493 QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
494
495 query_builder
496 .push_bind(tenant_id.as_ref())
497 .push(" AND project =")
498 .push_bind(project_id.as_ref());
499
500 query_builder.push(" AND version_id in (");
501
502 let mut separated = query_builder.separated(", ");
503 for version_id in version_ids.iter() {
504 separated.push_bind(version_id.as_ref());
505 }
506 separated.push_unseparated(")");
507
508 query_builder.push(" ORDER BY array_position(array[");
510 let mut order_separator = query_builder.separated(", ");
511 for version_id in version_ids.iter() {
512 order_separator.push_bind(version_id.as_ref());
513 }
514 query_builder.push("], version_id)");
515
516 let query = query_builder.build_query_as();
517 let response: Vec<ReturnVersionedResource> = query
518 .fetch_all(&mut *conn)
519 .await
520 .map_err(StoreError::from)?;
521
522 Ok(response)
523 }
524}
525
526fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
527 connection: Connection,
528 tenant_id: &'a TenantId,
529 project_id: &'a ProjectId,
530 resource_type: &'a ResourceType,
531 resource_id: &'a ResourceId,
532) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
533 async move {
534 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
535 let response = sqlx::query!(
536 r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC"#,
537 tenant_id.as_ref(),
538 project_id.as_ref(),
539 resource_id.as_ref(),
540 resource_type.as_ref(),
541 ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
542
543 if let Some(true) = response.as_ref().map(|r| r.deleted) {
546 Ok(None)
547 } else {
548 Ok(response.map(|r| r.resource.0))
549 }
550 }
551}
552
553fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
554 connection: Connection,
555 tenant_id: &'a TenantId,
556 project_id: &'a ProjectId,
557 history_request: &'a HistoryRequest,
558) -> impl Future<Output = Result<Vec<Resource>, OperationOutcomeError>> + Send + 'a {
559 async move {
560 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
561 match history_request {
562 HistoryRequest::Instance(history_instance_request) => {
563 let response = sqlx::query_as!(ReturnSingularResource,
564 r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC LIMIT 100"#,
565 tenant_id.as_ref() as &str,
566 project_id.as_ref() as &str,
567 history_instance_request.id.as_ref() as &str,
568 history_instance_request.resource_type.as_ref() as &str
569 ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
570
571 Ok(response.into_iter().map(|r| r.resource.0).collect())
572 }
573 HistoryRequest::Type(history_type_request) => {
574 let response = sqlx::query_as!(ReturnSingularResource,
575 r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 AND resource_type = $3 ORDER BY sequence DESC LIMIT 100"#,
576 tenant_id.as_ref() as &str,
577 project_id.as_ref() as &str,
578 history_type_request.resource_type.as_ref() as &str
579 ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
580
581 Ok(response.into_iter().map(|r| r.resource.0).collect())
582 }
583 HistoryRequest::System(_request) => {
584 let response = sqlx::query_as!(ReturnSingularResource,
585 r#"SELECT resource as "resource: FHIRJson<Resource>" FROM resources WHERE tenant = $1 AND project = $2 ORDER BY sequence DESC LIMIT 100"#,
586 tenant_id.as_ref() as &str,
587 project_id.as_ref() as &str
588 ).fetch_all(&mut *conn).await.map_err(StoreError::from)?;
589
590 Ok(response.into_iter().map(|r| r.resource.0).collect())
591 }
592 }
593 }
594}
595
596fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
597 connection: Connection,
598 tenant_id: &'a TenantId,
599 cur_sequence: u64,
600 count: Option<u64>,
601) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
602 async move {
603 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
604 let safe_sequence =
609 sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
610 .fetch_one(&mut *conn)
611 .await
612 .map_err(StoreError::from)?
613 .max_safe_seq
614 .unwrap_or(0);
615
616 let result = sqlx::query_as!(
617 ResourcePollingValue,
618 r#"SELECT id as "id: ResourceId",
619 tenant as "tenant: TenantId",
620 project as "project: ProjectId",
621 version_id,
622 resource_type as "resource_type: ResourceType",
623 fhir_method as "fhir_method: FHIRMethod",
624 sequence,
625 resource as "resource: FHIRJson<Resource>"
626 FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
627 tenant_id.as_ref() as &str,
628 cur_sequence as i64,
629 safe_sequence,
630 count.unwrap_or(100) as i64
631 )
632 .fetch_all(&mut *conn)
633 .await
634 .map_err(StoreError::from)?;
635
636 Ok(result)
641 }
642}