# etl_tool **Repository Path**: py_7/etl_tool ## Basic Information - **Project Name**: etl_tool - **Description**: 命令行 ETL 工具,支持以下数据源和目标之间的数据同步: - GBase 8a - 达梦(DM) - MySQL - PostgreSQL - CSV 文件(可作为数据源或导出目标) - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-04-24 - **Last Updated**: 2026-04-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 通用 ETL 同步工具 命令行 ETL 工具,支持多种数据库和 CSV 文件之间的数据同步。 ## 支持的数据源与目标 | 类型 | 作为源 | 作为目标 | |------|--------|----------| | GBase 8a | ✓ | ✓ | | 达梦(DM) | ✓ | ✓ | | MySQL | ✓ | ✓ | | PostgreSQL | ✓ | ✓ | | CSV 文件 | ✓ | ✓(导出模式) | 支持的同步方向:MySQL→PG、PG→MySQL、GBase→DM、CSV→MySQL、MySQL→CSV 等。 ## 快速开始 ```bash pip install -r requirements.txt python etl.py # 使用默认 config.yaml python etl.py -c /path/to/config.yaml # 指定配置文件 ``` ## 配置文件 编辑 `config.yaml`,配置数据源、目标和同步任务。 ### 数据库 → 数据库 ```yaml source: type: mysql host: 127.0.0.1 port: 3306 user: root password: your_password database: mydb target: type: postgresql host: localhost port: 5432 user: postgres password: your_password database: mydb sync: source_table: orders target_table: orders # where: "create_time >= '2024-01-01'" # columns: # - source: user_id # target: user_id # - source: amount * 100 as amount_cent # target: amount_cent workers: 4 batch_size: 10000 commit_every: 50000 ``` ### CSV → 数据库 目标表不存在时自动建表(基于 CSV 数据推断列类型)。 ```yaml source: type: csv file: /path/to/data.csv # encoding: gbk delimiter: ',' target: type: mysql host: 127.0.0.1 port: 3306 user: root password: your_password database: mydb sync: target_table: my_table workers: 4 batch_size: 10000 commit_every: 50000 ``` ### 数据库 → CSV ```yaml source: type: mysql host: 127.0.0.1 port: 3306 user: root password: your_password database: mydb sync: source_table: orders output_csv: ./export/orders.csv batch_size: 50000 encoding: gb18030 ``` ### 多表关联同步 用 `from_clause` 替代 `source_table`,支持 JOIN 查询: ```yaml sync: from_clause: "orders o JOIN customers c ON o.customer_id = c.id" target_table: order_with_customer columns: - source: o.order_id target: order_id - source: c.customer_name target: customer_name - source: o.amount target: amount # where: "o.create_time >= '2024-01-01'" workers: 4 batch_size: 10000 commit_every: 50000 ``` ## 配置项说明 | 配置项 | 说明 | 默认值 | |--------|------|--------| | `workers` | 并行写入线程数,1 为单线程 | 1 | | `batch_size` | 每次读取行数 | 10000 | | `commit_every` | 每 N 行提交一次 | 50000 | | `columns` | 列映射,不配则按同名字段 1:1 映射 | - | | `from_clause` | 替代 source_table,支持 JOIN | - | | `where` | WHERE 过滤条件 | - | | `output_csv` | 配置时进入数据库→CSV 导出模式 | - | ### columns 配置 - 不配时按同名字段 1:1 映射 - 配置后 `source` 支持 SQL 表达式,如 `amount * 100 as amount_cent` - 多表关联时必须配置,指定每个字段的来源和目标名称 ### CSV 源额外配置 | 配置项 | 说明 | 默认值 | |--------|------|--------| | `source.file` | CSV 文件路径 | - | | `source.encoding` | 文件编码 | 自动检测 | | `source.delimiter` | 分隔符 | `,` | ## CSV 自动建表 CSV→数据库时,如果目标表不存在,程序会自动建表: 1. 采样前 100 行推断列类型(INTEGER / FLOAT / DATE / TIMESTAMP / VARCHAR / TEXT) 2. 全文件扫描字符串列的最大长度 3. 最大长度 ≤ 255 → `VARCHAR(255)`,> 255 → `TEXT` ## 目录结构 ``` ├── config.yaml # 数据库连接 + 同步配置 ├── requirements.txt # Python 依赖 ├── etl.py # 入口 + 并行调度 ├── connectors.py # 数据库连接器 + 类型推断 + 建表 ├── mapper.py # 列映射 + SQL 构建 ├── csv_reader.py # CSV 流式读取 ├── csv_writer.py # 数据库导出到 CSV ├── download_deps.sh # 内网部署用:下载离线依赖包 └── offline_packages/ # 离线依赖包存放目录 ``` ## 内网部署 ```bash # 在有网机器上运行,下载离线依赖包 bash download_deps.sh # 将 offline_packages/ 目录拷贝到内网机器 pip install --no-index --find-links=offline_packages/ -r requirements.txt ``` 注意:dmPython 和 gbase-connector-python 包含 C 扩展,需匹配目标平台下载。