1use std::sync::Arc;
2
3use etl::{
4 config::{BatchConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig, TlsConfig},
5 pipeline::Pipeline,
6 store::both::memory::MemoryStore,
7};
8use haste_config::get_config;
9use haste_fhir_search::elastic_search::ElasticSearchEngine;
10
11use crate::es_search_destination::ESSearchDestination;
12mod es_search_destination;
13
14static PIPELINE_ID: u64 = 1;
15
16pub enum ESSearchWorkerEnvironmentVariables {
17 ElasticSearchURL,
18 ElasticSearchUsername,
19 ElasticSearchPassword,
20}
21
22impl From<ESSearchWorkerEnvironmentVariables> for String {
23 fn from(value: ESSearchWorkerEnvironmentVariables) -> Self {
24 match value {
25 ESSearchWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
26 ESSearchWorkerEnvironmentVariables::ElasticSearchUsername => {
27 "ELASTICSEARCH_USERNAME".to_string()
28 }
29 ESSearchWorkerEnvironmentVariables::ElasticSearchPassword => {
30 "ELASTICSEARCH_PASSWORD".to_string()
31 }
32 }
33 }
34}
35
36#[tokio::main]
37async fn main() -> Result<(), Box<dyn std::error::Error>> {
38 let config = get_config::<ESSearchWorkerEnvironmentVariables>("environment".into());
39 let search_engine = ElasticSearchEngine::new(
40 Arc::new(haste_fhirpath::FPEngine::new()),
41 &config
42 .get(ESSearchWorkerEnvironmentVariables::ElasticSearchURL)
43 .expect(&format!(
44 "'{}' variable not set",
45 String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchURL)
46 )),
47 config
48 .get(ESSearchWorkerEnvironmentVariables::ElasticSearchUsername)
49 .expect(&format!(
50 "'{}' variable not set",
51 String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchUsername)
52 )),
53 config
54 .get(ESSearchWorkerEnvironmentVariables::ElasticSearchPassword)
55 .expect(&format!(
56 "'{}' variable not set",
57 String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchPassword)
58 )),
59 )
60 .expect("Failed to create Elasticsearch client");
61
62 let pg_config = PgConnectionConfig {
63 host: "localhost".to_string(),
64 port: 5432,
65 name: "haste_health".to_string(),
66 username: "postgres".to_string(),
67 password: Some("postgres".to_string().into()), tls: TlsConfig {
69 enabled: false,
70 trusted_root_certs: String::new(),
71 },
72 keepalive: None,
73 };
74
75 let config = PipelineConfig {
76 id: PIPELINE_ID,
77 publication_name: "my_publication".to_string(),
78 pg_connection: pg_config.clone(),
79 batch: BatchConfig {
80 max_size: 1000,
81 max_fill_ms: 5000,
82 },
83 table_error_retry_delay_ms: 10000,
84 table_error_retry_max_attempts: 5,
85 max_table_sync_workers: 4,
86 table_sync_copy: TableSyncCopyConfig::SkipAllTables,
87 };
88
89 let store = MemoryStore::new();
90 let destination = ESSearchDestination::new(search_engine)
92 .expect("Failed to create Elasticsearch destination");
93
94 println!("Starting pipeline...");
95 let mut pipeline = Pipeline::new(config, store, destination);
96 pipeline.start().await?;
97 pipeline.wait().await?;
98
99 Ok(())
100}