Skip to content

beam-duckdb

DuckDB is an in-process database with a focus on high-performance analytics. It is a column-oriented database, which make it suitable for data science and data engineering workloads.

DuckDB-specific functionality

The beam-duckdb backend is the most recent SQL backend for beam. As such, it might not implement all of the DuckDB features you want. Feel free to raise an issue!

Data sources

One of the key features of DuckDB is its support for popular data formats such as Parquet or Apache Iceberg. beam-duckdb adds support for declaring files as a source of data for your database, and thus allow you to query data from these files, as if they were regular database tables.

beam-duckdb exports the DataSourceEntity type, which allows you to declare file or files as data sources for your database, in the same way a ViewEntity is used to declare a database view. Like ViewEntity, a DataSourceEntity only supports data selection; the type system will prevent you from inserting or updating a DataSourceEntity. Thanks compiler!

While DuckDB supports many data sources, we currently support a subset of them, described below. Feel free to raise an issue! if you'd like us to add support for another data source!

Parquet

Parquet is an open-source file format that is commonly used in data science.

beam-duckdb exports parquet function, which allows you to declare one or more Parquet files as a datasource.

Consider a Parquet file for which the following schema holds:

data ExamT f = Exam
  { _examId :: Columnar f Int32,
    _examName :: Columnar f Text,
    _examScore :: Columnar f Double,
    _examDate :: Columnar f Day
  }

type Exam = ExamT Identity
deriving instance Show Exam
deriving instance Eq Exam

instance Beamable ExamT
instance Table ExamT where
  data PrimaryKey ExamT f = ExamId (Columnar f Int32) deriving (Generic)
  primaryKey = ExamId . _examId
instance Beamable (PrimaryKey ExamT)

Then, we can declare the database as having one "table" sources from a Parquet file:

data SchoolDB f = SchoolDB
  { _exams :: f (DataSourceEntity ExamT)
  }
  deriving (Generic, Database DuckDB)

schoolDB :: DatabaseSettings DuckDB SchoolDB
schoolDB =
  defaultDbSettings
    `withDbModification` (dbModification @_ @DuckDB)
      { _exams =
          dataSource (parquet (NonEmpty.singleton "data/exams.parquet"))
            <> modifyDataSourceFields
              tableModification
                { _examId = "id",
                  _examName = "name",
                  _examScore = "score",
                  _examDate = "exam_date"
                }
      }

Note here that we have specified as single file, but we could have specified multiple files with the same schema, or even one or more globs.

Once this is done, you can query the "table" just like any other beam entity. For example, to fetch the maximum exam score:

Just bestScore <-
  runBeamDuckDBDebug putStrLn conn
    $ runSelectReturningOne
      $ select
        $ aggregate_
            (max_ . _examScore)
            (allFromDataSource_ (_exams schoolDB))

print bestScore
Just 97.5
SELECT MAX("t0"."score") AS "res0"
FROM read_parquet('/build/gc8kw31pyha5y3y4k1h6r87ij44g42ip-source/docs/.beam-query-cache/data/exams.parquet') AS "t0";

-- With values: []

Note the one difference: instead of pulling all rows using all_ (for a database table), or allFromView_ (for a database view), we use allFromDataSource_. That's the only difference!

Apache Iceberg

Apache Iceberg is an open-source format for large analytics tables.

Assume that we have a large Iceberg table, with the same ExamT schema as our Parquet example above. We simply swap out the use of parquet in the example above with icebergTable:

schoolDB :: DatabaseSettings DuckDB SchoolDB
schoolDB =
  defaultDbSettings
    `withDbModification` (dbModification @_ @DuckDB)
      { _exams = dataSource (icebergTable "s3://.../exams")
            <> modifyDataSourceFields
              tableModification
                { _examId = "id",
                  _examName = "name",
                  _examScore = "score",
                  _examDate = "exam_date"
                }
      }

In this case, the table data is stored in an Amazon S3 bucket (assuming that you have set up the appropriate authentication, which is not handled by beam-duckdb).

All queries work just as before, provided you use allFromDataSource_ as we did above.

CSV

Finally, DuckDB supports loading data from comma-separated-value (CSV) files.

Assume that we have multiple CSV files, with the same ExamT schema as our examples above. Each file has a header line:

id|name|score|exam_date
1|alice|76.5|2025-01-01
2|bob|74.0|2025-01-01
...

You can see the added wrinkle that the separator isn't a comma, but a pipe | instead. The CSV format is full of fun twists like that.

We could instruct beam that we'll be querying from CSV files using csv. However, contrary to parquet and icebergTable, we'll need to tweak the default CSV options by changing the default delimiter and specifying that files have a header row, using csvWith:

schoolDB :: DatabaseSettings DuckDB SchoolDB
schoolDB =
  defaultDbSettings
    `withDbModification` (dbModification @_ @DuckDB)
      { _exams =
          dataSource (
            csvWith
              (NonEmpty.singleton "scores/*.csv"))
              (defaultCSVOptions{delim = Just "|", header = Just True}
            ) <> modifyDataSourceFields
              tableModification
                { _examId = "id",
                  _examName = "name",
                  _examScore = "score",
                  _examDate = "exam_date"
                }
      }

Once more, just like with Parquet and Apache iceberg, we can perform queries using all of beam's machinery, using allFromDataSource_ instead of all_.

COPY support

beam-duckdb supports bulk-export and bulk-import of data through DuckDB's COPY ... TO and COPY ... FROM statements.

See the cross-backend COPY page for the shared copyTableTo / copySelectTo / copyTableFrom API. The DuckDB-specific pieces are the smart constructors that build a DuckDBCopyToOptions / DuckDBCopyFromOptions value: each constructor pins both the file format and (optionally) a record of format-specific options.

DuckDB supports three file formats out of the box: CSV, Parquet, and JSON. Each file format has an associated set of options that is exposed by beam-duckdb. All smart constructors and option types are re-exported from Database.Beam.DuckDB.

Here's an example of exporting to CSV:

--! import Database.Beam.Backend.SQL.BeamExtensions
--! import Database.Beam.DuckDB
runCopyTo $
  copyTableTo
    (playlist chinookDb)
    id -- no projection: export entire table
    ( DuckDB.copyToCSVWith "/tmp/beam-docs-csv-options.csv"
        DuckDB.defaultDuckDBCSVCopyToOptions
          { csvCopyToDelimiter = Just "|"
          , csvCopyToHeader    = Just False
          }
    )
COPY Playlist("PlaylistId", "Name") TO '/tmp/beam-docs-csv-options.csv' (FORMAT CSV,
                                                                         HEADER False,
                                                                         DELIMITER '|');

-- With values: [];

On the other hand, the same export to Parquet supports different options:

--! import Database.Beam.Backend.SQL.BeamExtensions
--! import Database.Beam.DuckDB
runCopyTo $
  copyTableTo
    (playlist chinookDb)
    id -- no projection: export entire table
    ( DuckDB.copyToParquetWith "/tmp/beam-docs-playlists.parquet"
        DuckDB.defaultDuckDBParquetCopyToOptions
          { parquetCopyToCompression = Just DuckDB.ParquetZstd
          }
    )
COPY Playlist("PlaylistId", "Name") TO '/tmp/beam-docs-playlists.parquet' (FORMAT PARQUET,
                                                                           COMPRESSION ZSTD);

-- With values: [];

On the import side, here's an example of a simple round-trip through Parquet:

--! import Database.Beam.Backend.SQL.BeamExtensions

-- Export → Parquet
runCopyTo $
  copyTableTo
    (playlist chinookDb)
    id
    (DuckDB.copyToParquet "/tmp/beam-docs-roundtrip.parquet")

-- Clear the table (and its dependents) so the re-import doesn't conflict
-- on primary keys.
runDelete $ delete (playlistTrack chinookDb) (\_ -> val_ True)
runDelete $ delete (playlist chinookDb) (\_ -> val_ True)

-- Re-import.
runCopyFrom $
  copyTableFrom
    (playlist chinookDb)
    id
    (DuckDB.copyFromParquet "/tmp/beam-docs-roundtrip.parquet")
COPY Playlist("PlaylistId", "Name") TO '/tmp/beam-docs-roundtrip.parquet' (FORMAT PARQUET);

-- With values: [];

DELETE
FROM "PlaylistTrack" AS "delete_target"
WHERE ?;

-- With values: [True];

DELETE
FROM "Playlist" AS "delete_target"
WHERE ?;

-- With values: [True];
COPY Playlist("PlaylistId", "Name")
FROM '/tmp/beam-docs-roundtrip.parquet' (FORMAT PARQUET);

-- With values: [];