1use crate::{
2 fhir::{CachePolicy, FHIRRepository, ResourcePollingValue},
3 pg::{
4 PGConnection, StoreError,
5 utilities::{commit_transaction, create_transaction},
6 },
7 types::{FHIRMethod, SupportedFHIRVersions},
8 utilities,
9};
10use haste_fhir_client::{
11 request::HistoryRequest,
12 url::{ParsedParameter, ParsedParameters},
13};
14use haste_fhir_model::r4::{
15 datetime::parse_datetime,
16 generated::{
17 resources::{Resource, ResourceType},
18 terminology::IssueType,
19 },
20 sqlx::{FHIRJson, FHIRJsonRef},
21};
22use haste_fhir_operation_error::OperationOutcomeError;
23use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
24use moka::future::Cache;
25use sqlx::{Acquire, Postgres, QueryBuilder, query_builder::Separated};
26use std::sync::Arc;
27use tokio::sync::Mutex;
28
29#[derive(sqlx::FromRow, Debug)]
30struct ReturnSingularResource {
31 resource: FHIRJson<Resource>,
32}
33
34#[derive(sqlx::FromRow, Debug)]
35struct ReturnVersionedResource {
36 resource: FHIRJson<Resource>,
37 version_id: VersionId,
38}
39
40async fn read_version_ids_from_cache<'a>(
41 cache: &Cache<VersionId, Resource>,
42 version_ids: &'a [&VersionId],
43) -> (Vec<Resource>, Vec<&'a VersionId>) {
44 let mut remaining_version_ids = vec![];
45 let mut cached_resources = vec![];
46 for version_id in version_ids.iter() {
47 if let Some(resource) = cache.get(*version_id).await {
48 cached_resources.push(resource)
49 } else {
50 remaining_version_ids.push(*version_id);
51 }
52 }
53
54 (cached_resources, remaining_version_ids)
55}
56
57impl FHIRRepository for PGConnection {
58 async fn create(
59 &self,
60 tenant: &TenantId,
61 project: &ProjectId,
62 author: &UserTokenClaims,
63 fhir_version: &SupportedFHIRVersions,
64 resource: &mut Resource,
65 ) -> Result<Resource, OperationOutcomeError> {
66 match &self {
67 PGConnection::Pool(_pool, _) => {
68 let tx = create_transaction(self, true).await?;
69 let res = {
70 let mut conn = tx.lock().await;
71 let res =
72 create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
73 res
74 };
75 commit_transaction(tx).await?;
76 Ok(res)
77 }
78 PGConnection::Transaction(tx, _) => {
79 let mut tx = tx.lock().await;
80 let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
81 Ok(res)
82 }
83 }
84 }
85
86 async fn delete(
87 &self,
88 tenant: &TenantId,
89 project: &ProjectId,
90 author: &UserTokenClaims,
91 fhir_version: &SupportedFHIRVersions,
92 resource: &mut Resource,
93 id: &str,
94 ) -> Result<Resource, OperationOutcomeError> {
95 match self {
96 PGConnection::Pool(_pool, _) => {
97 let tx = create_transaction(self, true).await?;
98 let res = {
99 let mut conn = tx.lock().await;
100 let res = delete(
101 &mut *conn,
102 tenant,
103 project,
104 author,
105 fhir_version,
106 resource,
107 id,
108 )
109 .await?;
110 res
111 };
112 commit_transaction(tx).await?;
113 Ok(res)
114 }
115 PGConnection::Transaction(tx, _) => {
116 let mut conn = tx.lock().await;
117 let res = delete(
119 &mut *conn,
120 tenant,
121 project,
122 author,
123 fhir_version,
124 resource,
125 id,
126 )
127 .await?;
128 Ok(res)
129 }
130 }
131 }
132
133 async fn update(
134 &self,
135 tenant: &TenantId,
136 project: &ProjectId,
137 author: &UserTokenClaims,
138 fhir_version: &SupportedFHIRVersions,
139 resource: &mut Resource,
140 id: &str,
141 ) -> Result<Resource, OperationOutcomeError> {
142 match self {
143 PGConnection::Pool(_pool, _) => {
144 let tx = create_transaction(self, true).await?;
145 let res = {
146 let mut conn = tx.lock().await;
147 let res = update(
148 &mut *conn,
149 tenant,
150 project,
151 author,
152 fhir_version,
153 resource,
154 id,
155 )
156 .await?;
157 res
158 };
159
160 commit_transaction(tx).await?;
161 Ok(res)
162 }
163 PGConnection::Transaction(tx, _) => {
164 let mut conn = tx.lock().await;
165 let res = update(
167 &mut *conn,
168 tenant,
169 project,
170 author,
171 fhir_version,
172 resource,
173 id,
174 )
175 .await?;
176 Ok(res)
177 }
178 }
179 }
180
181 async fn read_by_version_ids(
182 &self,
183 tenant_id: &TenantId,
184 project_id: &ProjectId,
185 version_ids: &[&VersionId],
186 cache_policy: CachePolicy,
187 ) -> Result<Vec<Resource>, OperationOutcomeError> {
188 if version_ids.is_empty() {
189 return Ok(vec![]);
190 }
191
192 let (cached_result, remaining_version_ids) =
193 read_version_ids_from_cache(self.cache(), &version_ids).await;
194
195 if remaining_version_ids.is_empty() {
196 return Ok(cached_result);
197 }
198
199 match self {
200 PGConnection::Pool(pool, cache) => {
201 let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
202 .await?;
203
204 if cache_policy == CachePolicy::Cache {
205 for v in res.iter() {
206 cache
207 .insert(v.version_id.clone(), v.resource.0.clone())
208 .await;
209 }
210 }
211
212 Ok(cached_result
213 .into_iter()
214 .chain(res.into_iter().map(|r| r.resource.0))
215 .collect::<Vec<_>>())
216 }
217 PGConnection::Transaction(tx, cache) => {
218 let mut conn = tx.lock().await;
219 let res =
221 read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
222 .await?;
223
224 if cache_policy == CachePolicy::Cache {
225 for v in res.iter() {
226 cache
227 .insert(v.version_id.clone(), v.resource.0.clone())
228 .await;
229 }
230 }
231
232 Ok(cached_result
233 .into_iter()
234 .chain(res.into_iter().map(|r| r.resource.0))
235 .collect::<Vec<_>>())
236 }
237 }
238 }
239
240 async fn read_latest(
241 &self,
242 tenant_id: &TenantId,
243 project_id: &ProjectId,
244 resource_type: &ResourceType,
245 resource_id: &ResourceId,
246 ) -> Result<Option<Resource>, OperationOutcomeError> {
247 match self {
248 PGConnection::Pool(pool, _) => {
249 let res =
250 read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
251 Ok(res)
252 }
253 PGConnection::Transaction(tx, _) => {
254 let mut conn = tx.lock().await;
255 let res = read_latest(
257 &mut *conn,
258 tenant_id,
259 project_id,
260 resource_type,
261 resource_id,
262 )
263 .await?;
264 Ok(res)
265 }
266 }
267 }
268
269 async fn history(
270 &self,
271 tenant_id: &TenantId,
272 project_id: &ProjectId,
273 request: &HistoryRequest,
274 ) -> Result<Vec<Resource>, OperationOutcomeError> {
275 match self {
276 PGConnection::Pool(pool, _) => {
277 let res = history(pool, tenant_id, project_id, request).await?;
278 Ok(res)
279 }
280 PGConnection::Transaction(tx, _) => {
281 let mut conn = tx.lock().await;
282 let res = history(&mut *conn, tenant_id, project_id, request).await?;
284 Ok(res)
285 }
286 }
287 }
288
289 async fn get_sequence(
290 &self,
291 tenant_id: &TenantId,
292 sequence_id: u64,
293 count: Option<u64>,
294 ) -> Result<Vec<ResourcePollingValue>, OperationOutcomeError> {
295 match self {
296 PGConnection::Pool(pool, _) => {
297 let res = get_sequence(pool, tenant_id, sequence_id, count).await?;
298 Ok(res)
299 }
300 PGConnection::Transaction(tx, _) => {
301 let mut conn = tx.lock().await;
302
303 let res = get_sequence(&mut *conn, tenant_id, sequence_id, count).await?;
305 Ok(res)
306 }
307 }
308 }
309
310 fn in_transaction(&self) -> bool {
311 match self {
312 PGConnection::Transaction(_tx, _) => true,
313 _ => false,
314 }
315 }
316
317 async fn transaction<'a>(
318 &'a self,
319 is_updating_sequence: bool,
320 ) -> Result<Self, OperationOutcomeError> {
321 let tx = create_transaction(self, is_updating_sequence).await?;
322 Ok(PGConnection::Transaction(tx, self.cache().clone()))
323 }
324
325 async fn commit(self) -> Result<(), OperationOutcomeError> {
326 match self {
327 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
328 PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
329 }
330 }
331
332 async fn rollback(self) -> Result<(), OperationOutcomeError> {
333 match self {
334 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
335 PGConnection::Transaction(tx, _) => {
336 let conn = Mutex::into_inner(
337 Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
338 );
339
340 let res = conn.rollback().await.map_err(StoreError::from)?;
342 Ok(res)
343 }
344 }
345 }
346}
347
348fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
349 connection: Connection,
350 tenant: &'a TenantId,
351 project: &'a ProjectId,
352 author: &'a UserTokenClaims,
353 fhir_version: &'a SupportedFHIRVersions,
354 resource: &'a mut Resource,
355) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
356 async move {
357 utilities::set_resource_id(resource, None)?;
358 utilities::set_version_id(resource)?;
359 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
360 let result = sqlx::query_as!(
361 ReturnSingularResource,
362 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
363 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
364 RETURNING resource as "resource: FHIRJson<Resource>""#,
365 tenant.as_ref() as &str,
366 project.as_ref() as &str,
367 author.sub.as_ref() as &str,
368 fhir_version as &SupportedFHIRVersions,
371 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
372 false, "POST",
374 author.resource_type.as_ref() as &str,
375 &FHIRMethod::Create as &FHIRMethod,
376 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
377 Ok(result.resource.0)
378 }
379}
380
381fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
382 connection: Connection,
383 tenant: &'a TenantId,
384 project: &'a ProjectId,
385 author: &'a UserTokenClaims,
386 fhir_version: &'a SupportedFHIRVersions,
387 resource: &'a mut Resource,
388 id: &'a str,
389) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
390 async move {
391 utilities::set_resource_id(resource, Some(id.to_string()))?;
392 utilities::set_version_id(resource)?;
393 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
394 let result = sqlx::query_as!(
395 ReturnSingularResource,
396 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
397 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
398 RETURNING resource as "resource: FHIRJson<Resource>""#,
399 tenant.as_ref() as &str,
400 project.as_ref() as &str,
401 author.sub.as_ref() as &str,
402 fhir_version as &SupportedFHIRVersions,
405 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
406 true, "DELETE",
408 author.resource_type.as_ref() as &str,
409 &FHIRMethod::Delete as &FHIRMethod,
410 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
411
412 Ok(result.resource.0)
413 }
414}
415
416fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
417 connection: Connection,
418 tenant: &'a TenantId,
419 project: &'a ProjectId,
420 author: &'a UserTokenClaims,
421 fhir_version: &'a SupportedFHIRVersions,
422 resource: &'a mut Resource,
423 id: &'a str,
424) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
425 async move {
426 utilities::set_resource_id(resource, Some(id.to_string()))?;
427 utilities::set_version_id(resource)?;
428
429 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
430
431 let query = sqlx::query_as!(
432 ReturnSingularResource,
433 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
434 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
435 RETURNING resource as "resource: FHIRJson<Resource>""#,
436 tenant.as_ref() as &str,
437 project.as_ref() as &str,
438 author.sub.as_ref() as &str,
439 fhir_version as &SupportedFHIRVersions,
442 &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
443 false, "PUT",
445 author.resource_type.as_ref() as &str,
446 &FHIRMethod::Update as &FHIRMethod,
447 );
448
449 let result = query
450 .fetch_one(&mut *conn)
451 .await
452 .map_err(StoreError::from)?;
453
454 Ok(result.resource.0)
455 }
456}
457
458fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
459 connection: Connection,
460 tenant_id: &'a TenantId,
461 project_id: &'a ProjectId,
462 version_ids: &'a Vec<&'a VersionId>,
463) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
464 async move {
465 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
466
467 let mut query_builder: QueryBuilder<Postgres> =
468 QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
469
470 query_builder
471 .push_bind(tenant_id.as_ref())
472 .push(" AND project =")
473 .push_bind(project_id.as_ref());
474
475 query_builder.push(" AND version_id in (");
476
477 let mut separated = query_builder.separated(", ");
478 for version_id in version_ids.iter() {
479 separated.push_bind(version_id.as_ref());
480 }
481 separated.push_unseparated(")");
482
483 query_builder.push(" ORDER BY array_position(array[");
485 let mut order_separator = query_builder.separated(", ");
486 for version_id in version_ids.iter() {
487 order_separator.push_bind(version_id.as_ref());
488 }
489 query_builder.push("], version_id)");
490
491 let query = query_builder.build_query_as();
492 let response: Vec<ReturnVersionedResource> = query
493 .fetch_all(&mut *conn)
494 .await
495 .map_err(StoreError::from)?;
496
497 Ok(response)
498 }
499}
500
501fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
502 connection: Connection,
503 tenant_id: &'a TenantId,
504 project_id: &'a ProjectId,
505 resource_type: &'a ResourceType,
506 resource_id: &'a ResourceId,
507) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
508 async move {
509 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
510 let response = sqlx::query!(
511 r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC"#,
512 tenant_id.as_ref(),
513 project_id.as_ref(),
514 resource_id.as_ref(),
515 resource_type.as_ref(),
516 ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
517
518 if let Some(true) = response.as_ref().map(|r| r.deleted) {
521 Ok(None)
522 } else {
523 Ok(response.map(|r| r.resource.0))
524 }
525 }
526}
527
528fn process_history_parameters<'a>(
529 parameters: &'a ParsedParameters,
530 clauses: &mut Separated<'_, 'a, Postgres, &str>,
531) -> Result<(), OperationOutcomeError> {
532 for parameter in parameters.parameters() {
533 match parameter {
534 ParsedParameter::Result(result_param) => match result_param.name.as_str() {
535 "_since" => {
536 if let Some(value) = &result_param.value.get(0) {
537 let date_time = parse_datetime(value.as_str()).map_err(|e| {
538 OperationOutcomeError::fatal(
539 IssueType::Invalid(None),
540 format!("Invalid _since parameter datetime: {:?}", e),
541 )
542 })?;
543
544 clauses.push(" created_at >= ").push_bind_unseparated(
545 chrono::DateTime::try_from(date_time).map_err(|e| {
546 OperationOutcomeError::fatal(
547 IssueType::Invalid(None),
548 format!("Invalid _since parameter datetime: {:?}", e),
549 )
550 })?,
551 );
552 }
553 }
554 _ => {}
555 },
556 _ => {
557 return Err(OperationOutcomeError::fatal(
558 IssueType::NotSupported(None),
559 format!(
560 "Parameter '{}' is not supported for history requests.",
561 parameter.name()
562 ),
563 ));
564 }
565 }
566 }
567
568 Ok(())
569}
570
571fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
572 connection: Connection,
573 tenant: &'a TenantId,
574 project: &'a ProjectId,
575 history_request: &'a HistoryRequest,
576) -> impl Future<Output = Result<Vec<Resource>, OperationOutcomeError>> + Send + 'a {
577 async move {
578 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
579
580 let mut query_builder: QueryBuilder<Postgres> =
581 QueryBuilder::new(r#"SELECT resource FROM resources WHERE "#);
582
583 let mut clauses = query_builder.separated(" AND ");
584 clauses
585 .push(" tenant = ")
586 .push_bind_unseparated(tenant.as_ref())
587 .push(" project = ")
588 .push_bind_unseparated(project.as_ref());
589
590 let history_parameters = match history_request {
591 HistoryRequest::Instance(history_instance_request) => {
592 &history_instance_request.parameters
593 }
594 HistoryRequest::Type(history_type_request) => &history_type_request.parameters,
595 HistoryRequest::System(system_request) => &system_request.parameters,
596 };
597
598 process_history_parameters(&history_parameters, &mut clauses)?;
599
600 match history_request {
601 HistoryRequest::Instance(history_instance_request) => {
602 clauses
603 .push(" resource_type = ")
604 .push_bind_unseparated(history_instance_request.resource_type.as_ref())
605 .push(" id = ")
606 .push_bind_unseparated(&history_instance_request.id);
607 }
608 HistoryRequest::Type(history_type_request) => {
609 clauses
610 .push(" resource_type = ")
611 .push_bind_unseparated(history_type_request.resource_type.as_ref());
612 }
613 HistoryRequest::System(_request) => {}
614 };
615
616 query_builder.push(" ORDER BY sequence DESC LIMIT 100");
617
618 let query = query_builder.build_query_as();
619
620 let result: Vec<ReturnSingularResource> = query
621 .fetch_all(&mut *conn)
622 .await
623 .map_err(StoreError::from)?;
624
625 Ok(result.into_iter().map(|r| r.resource.0).collect::<Vec<_>>())
626 }
627}
628
629fn get_sequence<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
630 connection: Connection,
631 tenant_id: &'a TenantId,
632 cur_sequence: u64,
633 count: Option<u64>,
634) -> impl Future<Output = Result<Vec<ResourcePollingValue>, OperationOutcomeError>> + Send + 'a {
635 async move {
636 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
637 let safe_sequence =
642 sqlx::query!("SELECT max_safe_seq('resources_sequence_seq') as max_safe_seq")
643 .fetch_one(&mut *conn)
644 .await
645 .map_err(StoreError::from)?
646 .max_safe_seq
647 .unwrap_or(0);
648
649 let result = sqlx::query_as!(
650 ResourcePollingValue,
651 r#"SELECT id as "id: ResourceId",
652 tenant as "tenant: TenantId",
653 project as "project: ProjectId",
654 version_id,
655 resource_type as "resource_type: ResourceType",
656 fhir_method as "fhir_method: FHIRMethod",
657 sequence,
658 resource as "resource: FHIRJson<Resource>"
659 FROM resources WHERE tenant = $1 AND sequence > $2 AND sequence <= $3 ORDER BY sequence LIMIT $4 "#,
660 tenant_id.as_ref() as &str,
661 cur_sequence as i64,
662 safe_sequence,
663 count.unwrap_or(100) as i64
664 )
665 .fetch_all(&mut *conn)
666 .await
667 .map_err(StoreError::from)?;
668
669 Ok(result)
674 }
675}