1use crate::{
2 fhir::{CachePolicy, FHIRRepository, ResourceHistoryValue},
3 pg::{
4 PGConnection, StoreError,
5 utilities::{commit_transaction, create_transaction},
6 },
7 types::{FHIRMethod, SupportedFHIRVersions},
8 utilities,
9};
10use haste_fhir_client::{
11 request::HistoryRequest,
12 url::{ParsedParameter, ParsedParameters},
13};
14use haste_fhir_model::r4::{
15 datetime::parse_datetime,
16 generated::{
17 resources::{Resource, ResourceType},
18 terminology::IssueType,
19 },
20 sqlx::{FHIRJson, FHIRJsonRef},
21};
22use haste_fhir_operation_error::OperationOutcomeError;
23use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId, claims::UserTokenClaims};
24use moka::future::Cache;
25use sqlx::{Acquire, Postgres, QueryBuilder, query_builder::Separated};
26use std::sync::Arc;
27use tokio::sync::Mutex;
28
29#[derive(sqlx::FromRow, Debug)]
30struct ReturnSingularResource {
31 resource: FHIRJson<Resource>,
32}
33
34#[derive(sqlx::FromRow, Debug)]
35struct ReturnVersionedResource {
36 resource: FHIRJson<Resource>,
37 version_id: VersionId,
38}
39
40#[derive(sqlx::FromRow, Debug)]
41struct HistoryValue {
42 pub resource: FHIRJson<Resource>,
43 pub request_method: String,
44}
45
46async fn read_version_ids_from_cache<'a>(
47 cache: &Cache<VersionId, Resource>,
48 version_ids: &'a [&VersionId],
49) -> (Vec<Resource>, Vec<&'a VersionId>) {
50 let mut remaining_version_ids = vec![];
51 let mut cached_resources = vec![];
52 for version_id in version_ids.iter() {
53 if let Some(resource) = cache.get(*version_id).await {
54 cached_resources.push(resource)
55 } else {
56 remaining_version_ids.push(*version_id);
57 }
58 }
59
60 (cached_resources, remaining_version_ids)
61}
62
63impl FHIRRepository for PGConnection {
64 async fn create(
65 &self,
66 tenant: &TenantId,
67 project: &ProjectId,
68 author: &UserTokenClaims,
69 fhir_version: &SupportedFHIRVersions,
70 resource: &mut Resource,
71 ) -> Result<Resource, OperationOutcomeError> {
72 match &self {
73 PGConnection::Pool(_pool, _) => {
74 let tx = create_transaction(self, true).await?;
75 let res = {
76 let mut conn = tx.lock().await;
77 let res =
78 create(&mut *conn, tenant, project, author, fhir_version, resource).await?;
79 res
80 };
81 commit_transaction(tx).await?;
82 Ok(res)
83 }
84 PGConnection::Transaction(tx, _) => {
85 let mut tx = tx.lock().await;
86 let res = create(&mut *tx, tenant, project, author, fhir_version, resource).await?;
87 Ok(res)
88 }
89 }
90 }
91
92 async fn delete(
93 &self,
94 tenant: &TenantId,
95 project: &ProjectId,
96 author: &UserTokenClaims,
97 fhir_version: &SupportedFHIRVersions,
98 resource: &mut Resource,
99 id: &str,
100 ) -> Result<Resource, OperationOutcomeError> {
101 match self {
102 PGConnection::Pool(_pool, _) => {
103 let tx = create_transaction(self, true).await?;
104 let res = {
105 let mut conn = tx.lock().await;
106 let res = delete(
107 &mut *conn,
108 tenant,
109 project,
110 author,
111 fhir_version,
112 resource,
113 id,
114 )
115 .await?;
116 res
117 };
118 commit_transaction(tx).await?;
119 Ok(res)
120 }
121 PGConnection::Transaction(tx, _) => {
122 let mut conn = tx.lock().await;
123 let res = delete(
125 &mut *conn,
126 tenant,
127 project,
128 author,
129 fhir_version,
130 resource,
131 id,
132 )
133 .await?;
134 Ok(res)
135 }
136 }
137 }
138
139 async fn update(
140 &self,
141 tenant: &TenantId,
142 project: &ProjectId,
143 author: &UserTokenClaims,
144 fhir_version: &SupportedFHIRVersions,
145 resource: &mut Resource,
146 id: &str,
147 ) -> Result<Resource, OperationOutcomeError> {
148 match self {
149 PGConnection::Pool(_pool, _) => {
150 let tx = create_transaction(self, true).await?;
151 let res = {
152 let mut conn = tx.lock().await;
153 let res = update(
154 &mut *conn,
155 tenant,
156 project,
157 author,
158 fhir_version,
159 resource,
160 id,
161 )
162 .await?;
163 res
164 };
165
166 commit_transaction(tx).await?;
167 Ok(res)
168 }
169 PGConnection::Transaction(tx, _) => {
170 let mut conn = tx.lock().await;
171 let res = update(
173 &mut *conn,
174 tenant,
175 project,
176 author,
177 fhir_version,
178 resource,
179 id,
180 )
181 .await?;
182 Ok(res)
183 }
184 }
185 }
186
187 async fn read_by_version_ids(
188 &self,
189 tenant_id: &TenantId,
190 project_id: &ProjectId,
191 version_ids: &[&VersionId],
192 cache_policy: CachePolicy,
193 ) -> Result<Vec<Resource>, OperationOutcomeError> {
194 if version_ids.is_empty() {
195 return Ok(vec![]);
196 }
197
198 let (cached_result, remaining_version_ids) =
199 read_version_ids_from_cache(self.cache(), &version_ids).await;
200
201 if remaining_version_ids.is_empty() {
202 return Ok(cached_result);
203 }
204
205 match self {
206 PGConnection::Pool(pool, cache) => {
207 let res = read_by_version_ids(pool, tenant_id, project_id, &remaining_version_ids)
208 .await?;
209
210 if cache_policy == CachePolicy::Cache {
211 for v in res.iter() {
212 cache
213 .insert(v.version_id.clone(), v.resource.0.clone())
214 .await;
215 }
216 }
217
218 Ok(cached_result
219 .into_iter()
220 .chain(res.into_iter().map(|r| r.resource.0))
221 .collect::<Vec<_>>())
222 }
223 PGConnection::Transaction(tx, cache) => {
224 let mut conn = tx.lock().await;
225 let res =
227 read_by_version_ids(&mut *conn, tenant_id, project_id, &remaining_version_ids)
228 .await?;
229
230 if cache_policy == CachePolicy::Cache {
231 for v in res.iter() {
232 cache
233 .insert(v.version_id.clone(), v.resource.0.clone())
234 .await;
235 }
236 }
237
238 Ok(cached_result
239 .into_iter()
240 .chain(res.into_iter().map(|r| r.resource.0))
241 .collect::<Vec<_>>())
242 }
243 }
244 }
245
246 async fn read_latest(
247 &self,
248 tenant_id: &TenantId,
249 project_id: &ProjectId,
250 resource_type: &ResourceType,
251 resource_id: &ResourceId,
252 ) -> Result<Option<Resource>, OperationOutcomeError> {
253 match self {
254 PGConnection::Pool(pool, _) => {
255 let res =
256 read_latest(pool, tenant_id, project_id, resource_type, resource_id).await?;
257 Ok(res)
258 }
259 PGConnection::Transaction(tx, _) => {
260 let mut conn = tx.lock().await;
261 let res = read_latest(
263 &mut *conn,
264 tenant_id,
265 project_id,
266 resource_type,
267 resource_id,
268 )
269 .await?;
270 Ok(res)
271 }
272 }
273 }
274
275 async fn history(
276 &self,
277 tenant_id: &TenantId,
278 project_id: &ProjectId,
279 request: &HistoryRequest,
280 ) -> Result<Vec<ResourceHistoryValue>, OperationOutcomeError> {
281 match self {
282 PGConnection::Pool(pool, _) => {
283 let res = history(pool, tenant_id, project_id, request).await?;
284 Ok(res)
285 }
286 PGConnection::Transaction(tx, _) => {
287 let mut conn = tx.lock().await;
288 let res = history(&mut *conn, tenant_id, project_id, request).await?;
290 Ok(res)
291 }
292 }
293 }
294
295 fn in_transaction(&self) -> bool {
296 match self {
297 PGConnection::Transaction(_tx, _) => true,
298 _ => false,
299 }
300 }
301
302 async fn transaction<'a>(
303 &'a self,
304 is_updating_sequence: bool,
305 ) -> Result<Self, OperationOutcomeError> {
306 let tx = create_transaction(self, is_updating_sequence).await?;
307 Ok(PGConnection::Transaction(tx, self.cache().clone()))
308 }
309
310 async fn commit(self) -> Result<(), OperationOutcomeError> {
311 match self {
312 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
313 PGConnection::Transaction(tx, _) => commit_transaction(tx).await,
314 }
315 }
316
317 async fn rollback(self) -> Result<(), OperationOutcomeError> {
318 match self {
319 PGConnection::Pool(_pool, _) => Err(StoreError::NotTransaction.into()),
320 PGConnection::Transaction(tx, _) => {
321 let conn = Mutex::into_inner(
322 Arc::try_unwrap(tx).map_err(|_e| StoreError::FailedCommitTransaction)?,
323 );
324
325 let res = conn.rollback().await.map_err(StoreError::from)?;
327 Ok(res)
328 }
329 }
330 }
331}
332
333fn create<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
334 connection: Connection,
335 tenant: &'a TenantId,
336 project: &'a ProjectId,
337 author: &'a UserTokenClaims,
338 fhir_version: &'a SupportedFHIRVersions,
339 resource: &'a mut Resource,
340) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
341 async move {
342 utilities::set_resource_id(resource, None)?;
343 utilities::set_version_id(resource)?;
344 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
345 let result = sqlx::query_as!(
346 ReturnSingularResource,
347 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
348 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
349 RETURNING resource as "resource: FHIRJson<Resource>""#,
350 tenant.as_ref() as &str,
351 project.as_ref() as &str,
352 author.sub.as_ref() as &str,
353 fhir_version as &SupportedFHIRVersions,
356 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
357 false, "POST",
359 author.resource_type.as_ref() as &str,
360 &FHIRMethod::Create as &FHIRMethod,
361 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
362 Ok(result.resource.0)
363 }
364}
365
366fn delete<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
367 connection: Connection,
368 tenant: &'a TenantId,
369 project: &'a ProjectId,
370 author: &'a UserTokenClaims,
371 fhir_version: &'a SupportedFHIRVersions,
372 resource: &'a mut Resource,
373 id: &'a str,
374) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
375 async move {
376 utilities::set_resource_id(resource, Some(id.to_string()))?;
377 utilities::set_version_id(resource)?;
378 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
379 let result = sqlx::query_as!(
380 ReturnSingularResource,
381 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
382 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
383 RETURNING resource as "resource: FHIRJson<Resource>""#,
384 tenant.as_ref() as &str,
385 project.as_ref() as &str,
386 author.sub.as_ref() as &str,
387 fhir_version as &SupportedFHIRVersions,
390 &FHIRJsonRef(resource) as &FHIRJsonRef<'_ , Resource>,
391 true, "DELETE",
393 author.resource_type.as_ref() as &str,
394 &FHIRMethod::Delete as &FHIRMethod,
395 ).fetch_one(&mut *conn).await.map_err(StoreError::from)?;
396
397 Ok(result.resource.0)
398 }
399}
400
401fn update<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
402 connection: Connection,
403 tenant: &'a TenantId,
404 project: &'a ProjectId,
405 author: &'a UserTokenClaims,
406 fhir_version: &'a SupportedFHIRVersions,
407 resource: &'a mut Resource,
408 id: &'a str,
409) -> impl Future<Output = Result<Resource, OperationOutcomeError>> + Send + 'a {
410 async move {
411 utilities::set_resource_id(resource, Some(id.to_string()))?;
412 utilities::set_version_id(resource)?;
413
414 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
415
416 let query = sqlx::query_as!(
417 ReturnSingularResource,
418 r#"INSERT INTO resources (tenant, project, author_id, fhir_version, resource, deleted, request_method, author_type, fhir_method)
419 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
420 RETURNING resource as "resource: FHIRJson<Resource>""#,
421 tenant.as_ref() as &str,
422 project.as_ref() as &str,
423 author.sub.as_ref() as &str,
424 fhir_version as &SupportedFHIRVersions,
427 &FHIRJsonRef(resource) as &FHIRJsonRef<'_, Resource>,
428 false, "PUT",
430 author.resource_type.as_ref() as &str,
431 &FHIRMethod::Update as &FHIRMethod,
432 );
433
434 let result = query
435 .fetch_one(&mut *conn)
436 .await
437 .map_err(StoreError::from)?;
438
439 Ok(result.resource.0)
440 }
441}
442
443fn read_by_version_ids<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
444 connection: Connection,
445 tenant_id: &'a TenantId,
446 project_id: &'a ProjectId,
447 version_ids: &'a Vec<&'a VersionId>,
448) -> impl Future<Output = Result<Vec<ReturnVersionedResource>, OperationOutcomeError>> + Send + 'a {
449 async move {
450 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
451
452 let mut query_builder: QueryBuilder<Postgres> =
453 QueryBuilder::new(r#"SELECT resource, version_id FROM resources WHERE tenant = "#);
454
455 query_builder
456 .push_bind(tenant_id.as_ref())
457 .push(" AND project =")
458 .push_bind(project_id.as_ref());
459
460 query_builder.push(" AND version_id in (");
461
462 let mut separated = query_builder.separated(", ");
463 for version_id in version_ids.iter() {
464 separated.push_bind(version_id.as_ref());
465 }
466 separated.push_unseparated(")");
467
468 query_builder.push(" ORDER BY array_position(array[");
470 let mut order_separator = query_builder.separated(", ");
471 for version_id in version_ids.iter() {
472 order_separator.push_bind(version_id.as_ref());
473 }
474 query_builder.push("], version_id)");
475
476 let query = query_builder.build_query_as();
477 let response: Vec<ReturnVersionedResource> = query
478 .fetch_all(&mut *conn)
479 .await
480 .map_err(StoreError::from)?;
481
482 Ok(response)
483 }
484}
485
486fn read_latest<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
487 connection: Connection,
488 tenant_id: &'a TenantId,
489 project_id: &'a ProjectId,
490 resource_type: &'a ResourceType,
491 resource_id: &'a ResourceId,
492) -> impl Future<Output = Result<Option<Resource>, OperationOutcomeError>> + Send + 'a {
493 async move {
494 let mut conn = connection.acquire().await.map_err(StoreError::SQLXError)?;
495 let response = sqlx::query!(
496 r#"SELECT resource as "resource: FHIRJson<Resource>", deleted FROM resources WHERE tenant = $1 AND project = $2 AND id = $3 AND resource_type = $4 ORDER BY sequence DESC LIMIT 1"#,
497 tenant_id.as_ref(),
498 project_id.as_ref(),
499 resource_id.as_ref(),
500 resource_type.as_ref(),
501 ).fetch_optional(&mut *conn).await.map_err(StoreError::from)?;
502
503 if let Some(true) = response.as_ref().map(|r| r.deleted) {
506 Ok(None)
507 } else {
508 Ok(response.map(|r| r.resource.0))
509 }
510 }
511}
512
513fn process_history_parameters<'a>(
514 parameters: &'a ParsedParameters,
515 clauses: &mut Separated<'_, 'a, Postgres, &str>,
516) -> Result<(), OperationOutcomeError> {
517 for parameter in parameters.parameters() {
518 match parameter {
519 ParsedParameter::Result(result_param) => match result_param.name.as_str() {
520 "_since" => {
521 if let Some(value) = &result_param.value.get(0) {
522 let date_time = parse_datetime(value.as_str()).map_err(|e| {
523 OperationOutcomeError::fatal(
524 IssueType::Invalid(None),
525 format!("Invalid _since parameter datetime: {:?}", e),
526 )
527 })?;
528
529 clauses.push(" created_at >= ").push_bind_unseparated(
530 chrono::DateTime::try_from(date_time).map_err(|e| {
531 OperationOutcomeError::fatal(
532 IssueType::Invalid(None),
533 format!("Invalid _since parameter datetime: {:?}", e),
534 )
535 })?,
536 );
537 }
538 }
539 "_count" | "_offset" => {
540 }
542 _ => {}
543 },
544 _ => {
545 return Err(OperationOutcomeError::fatal(
546 IssueType::NotSupported(None),
547 format!(
548 "Parameter '{}' is not supported for history requests.",
549 parameter.name()
550 ),
551 ));
552 }
553 }
554 }
555
556 Ok(())
557}
558
559fn history<'a, 'c, Connection: Acquire<'c, Database = Postgres> + Send + 'a>(
560 connection: Connection,
561 tenant: &'a TenantId,
562 project: &'a ProjectId,
563 history_request: &'a HistoryRequest,
564) -> impl Future<Output = Result<Vec<ResourceHistoryValue>, OperationOutcomeError>> + Send + 'a {
565 async move {
566 let mut conn = connection.acquire().await.map_err(StoreError::from)?;
567
568 let mut query_builder: QueryBuilder<Postgres> =
569 QueryBuilder::new(r#"SELECT resource, request_method FROM resources WHERE "#);
570
571 let mut clauses = query_builder.separated(" AND ");
572 clauses
573 .push(" tenant = ")
574 .push_bind_unseparated(tenant.as_ref())
575 .push(" project = ")
576 .push_bind_unseparated(project.as_ref());
577
578 let history_parameters = match history_request {
579 HistoryRequest::Instance(history_instance_request) => {
580 &history_instance_request.parameters
581 }
582 HistoryRequest::Type(history_type_request) => &history_type_request.parameters,
583 HistoryRequest::System(system_request) => &system_request.parameters,
584 };
585
586 process_history_parameters(&history_parameters, &mut clauses)?;
587
588 match history_request {
589 HistoryRequest::Instance(history_instance_request) => {
590 clauses
591 .push(" resource_type = ")
592 .push_bind_unseparated(history_instance_request.resource_type.as_ref())
593 .push(" id = ")
594 .push_bind_unseparated(&history_instance_request.id);
595 }
596 HistoryRequest::Type(history_type_request) => {
597 clauses
598 .push(" resource_type = ")
599 .push_bind_unseparated(history_type_request.resource_type.as_ref());
600 }
601 HistoryRequest::System(_request) => {}
602 };
603
604 let count =
605 if let Some(ParsedParameter::Result(count_param)) = history_parameters.get("_count") {
606 std::cmp::min(
608 100,
609 count_param
610 .value
611 .get(0)
612 .and_then(|v| v.parse::<usize>().ok())
613 .unwrap_or(100),
614 )
615 } else {
616 100
617 };
618
619 query_builder
620 .push(" ORDER BY sequence DESC LIMIT ")
621 .push_bind(count as i64);
622
623 if let Some(ParsedParameter::Result(offset_param)) = history_parameters.get("_offset") {
624 let offset = std::cmp::max(
625 offset_param
626 .value
627 .get(0)
628 .and_then(|v| v.parse::<usize>().ok())
629 .unwrap_or(0),
630 0,
631 );
632
633 query_builder.push(" OFFSET ").push_bind(offset as i64);
634 }
635
636 let query = query_builder.build_query_as();
637
638 let result: Vec<HistoryValue> = query
639 .fetch_all(&mut *conn)
640 .await
641 .map_err(StoreError::from)?;
642
643 Ok(result
644 .into_iter()
645 .map(|r| ResourceHistoryValue {
646 resource: r.resource.0,
647 request_method: r.request_method,
648 })
649 .collect::<Vec<_>>())
650 }
651}