Skip to main content

haste_wal_worker/
main.rs

1use std::sync::Arc;
2
3use etl::{
4    config::{BatchConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig, TlsConfig},
5    pipeline::Pipeline,
6    store::both::memory::MemoryStore,
7};
8use haste_config::get_config;
9use haste_fhir_search::elastic_search::ElasticSearchEngine;
10
11use crate::es_search_destination::ESSearchDestination;
12mod es_search_destination;
13
14static PIPELINE_ID: u64 = 1;
15
16pub enum ESSearchWorkerEnvironmentVariables {
17    ElasticSearchURL,
18    ElasticSearchUsername,
19    ElasticSearchPassword,
20}
21
22impl From<ESSearchWorkerEnvironmentVariables> for String {
23    fn from(value: ESSearchWorkerEnvironmentVariables) -> Self {
24        match value {
25            ESSearchWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
26            ESSearchWorkerEnvironmentVariables::ElasticSearchUsername => {
27                "ELASTICSEARCH_USERNAME".to_string()
28            }
29            ESSearchWorkerEnvironmentVariables::ElasticSearchPassword => {
30                "ELASTICSEARCH_PASSWORD".to_string()
31            }
32        }
33    }
34}
35
36#[tokio::main]
37async fn main() -> Result<(), Box<dyn std::error::Error>> {
38    let config = get_config::<ESSearchWorkerEnvironmentVariables>("environment".into());
39    let search_engine = ElasticSearchEngine::new(
40        Arc::new(haste_fhirpath::FPEngine::new()),
41        &config
42            .get(ESSearchWorkerEnvironmentVariables::ElasticSearchURL)
43            .expect(&format!(
44                "'{}' variable not set",
45                String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchURL)
46            )),
47        config
48            .get(ESSearchWorkerEnvironmentVariables::ElasticSearchUsername)
49            .expect(&format!(
50                "'{}' variable not set",
51                String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchUsername)
52            )),
53        config
54            .get(ESSearchWorkerEnvironmentVariables::ElasticSearchPassword)
55            .expect(&format!(
56                "'{}' variable not set",
57                String::from(ESSearchWorkerEnvironmentVariables::ElasticSearchPassword)
58            )),
59    )
60    .expect("Failed to create Elasticsearch client");
61
62    let pg_config = PgConnectionConfig {
63        host: "localhost".to_string(),
64        port: 5432,
65        name: "haste_health".to_string(),
66        username: "postgres".to_string(),
67        password: Some("postgres".to_string().into()), // Update this
68        tls: TlsConfig {
69            enabled: false,
70            trusted_root_certs: String::new(),
71        },
72        keepalive: None,
73    };
74
75    let config = PipelineConfig {
76        id: PIPELINE_ID,
77        publication_name: "my_publication".to_string(),
78        pg_connection: pg_config.clone(),
79        batch: BatchConfig {
80            max_size: 1000,
81            max_fill_ms: 5000,
82        },
83        table_error_retry_delay_ms: 10000,
84        table_error_retry_max_attempts: 5,
85        max_table_sync_workers: 4,
86        table_sync_copy: TableSyncCopyConfig::SkipAllTables,
87    };
88
89    let store = MemoryStore::new();
90    // let store = PostgresStore::new(PIPELINE_ID, pg_config);
91    let destination = ESSearchDestination::new(search_engine)
92        .expect("Failed to create Elasticsearch destination");
93
94    println!("Starting pipeline...");
95    let mut pipeline = Pipeline::new(config, store, destination);
96    pipeline.start().await?;
97    pipeline.wait().await?;
98
99    Ok(())
100}