Skip to content

beam-postgres

The beam-postgres backend is the most feature complete SQL backend for beam. The Postgres RDBMS supports most of the standards beam follows, so you can usually expect most queries to simply work. Additionally, beam-postgres is part of the standard Beam distribution, and so upgrades are applied periodically, and new functions are added to achieve feature-parity with the latest Postgres stable

Postgres-specific data types

Postgres has several data types not available from beam-core. The beam-postgres library provides several types and functions to make working with these easier.

The tsvector and tsquery types

The tsvector and tsquery types form the basis of full-text search in Postgres. They correspond to the haskell types TsVector and TsQuery, which are just newtype-wrappers over ByteString.

pure (Pg.toTsVector (Just Pg.english) (as_ @String (val_ "The quick brown fox jumps over the lazy dog")))
SELECT to_tsvector('english', 'The quick brown fox jumps over the lazy dog') AS "res0"

Postgres extensions

SELECT locking clause

Postgres allows you to explicitly lock rows retrieved during a select using the locking clause.

Beam supports most of the Postgres locking clause. However, there are some invariants that are currently not checked at compile time. For example, Postgres does not allow locking clauses with queries that use UNION, EXCEPT, or INTERSECT or those with aggregates. Since all these queries have the same type in Beam, we cannot catch these errors at compile-time. Current guidance is to only use the locking clause in top-level queries that you know to be safe.

The following example finds all customers living in Dublin, and requests a ROW SHARE lock for each row. This prevents concurrent updates from updating these rows until the current transaction is complete.

Pg.lockingAllTablesFor_ Pg.PgSelectLockingStrengthShare Nothing $
  filter_ (\c -> fromMaybe_ "" (addressCity (customerAddress c)) ==. "Dublin") $
  all_ (customer chinookDb)
SELECT "t0"."CustomerId" AS "res0",
       "t0"."FirstName" AS "res1",
       "t0"."LastName" AS "res2",
       "t0"."Company" AS "res3",
       "t0"."Address" AS "res4",
       "t0"."City" AS "res5",
       "t0"."State" AS "res6",
       "t0"."Country" AS "res7",
       "t0"."PostalCode" AS "res8",
       "t0"."Phone" AS "res9",
       "t0"."Fax" AS "res10",
       "t0"."Email" AS "res11",
       "t0"."SupportRepId" AS "res12"
FROM "Customer" AS "t0"
WHERE (COALESCE("t0"."City", '')) = ('Dublin')
  FOR SHARE

Now, suppose we want to update these rows, so we'll want to lock them for an update.

Pg.lockingAllTablesFor_ Pg.PgSelectLockingStrengthUpdate Nothing $
  filter_ (\c -> fromMaybe_ "" (addressCity (customerAddress c)) ==. "Dublin") $
  all_ (customer chinookDb)
SELECT "t0"."CustomerId" AS "res0",
       "t0"."FirstName" AS "res1",
       "t0"."LastName" AS "res2",
       "t0"."Company" AS "res3",
       "t0"."Address" AS "res4",
       "t0"."City" AS "res5",
       "t0"."State" AS "res6",
       "t0"."Country" AS "res7",
       "t0"."PostalCode" AS "res8",
       "t0"."Phone" AS "res9",
       "t0"."Fax" AS "res10",
       "t0"."Email" AS "res11",
       "t0"."SupportRepId" AS "res12"
FROM "Customer" AS "t0"
WHERE (COALESCE("t0"."City", '')) = ('Dublin')
  FOR
  UPDATE

However, because there may be a lot of customers in Dublin that we'd like to update, this may block for a long time. Perhaps, we'd only like to lock rows that aren't already locked. This is inconsistent in general, but we do not always care. Postgres offers the SKIP LOCKED clause for this

Pg.lockingAllTablesFor_ Pg.PgSelectLockingStrengthUpdate (Just Pg.PgSelectLockingOptionsSkipLocked) $
  filter_ (\c -> fromMaybe_ "" (addressCity (customerAddress c)) ==. "Dublin") $
  all_ (customer chinookDb)
SELECT "t0"."CustomerId" AS "res0",
       "t0"."FirstName" AS "res1",
       "t0"."LastName" AS "res2",
       "t0"."Company" AS "res3",
       "t0"."Address" AS "res4",
       "t0"."City" AS "res5",
       "t0"."State" AS "res6",
       "t0"."Country" AS "res7",
       "t0"."PostalCode" AS "res8",
       "t0"."Phone" AS "res9",
       "t0"."Fax" AS "res10",
       "t0"."Email" AS "res11",
       "t0"."SupportRepId" AS "res12"
FROM "Customer" AS "t0"
WHERE (COALESCE("t0"."City", '')) = ('Dublin')
  FOR
  UPDATE SKIP LOCKED

Or, if we do care, and don't want to wait anyway, we can ask Postgres to fail early instead of blocking, using NO WAIT

Pg.lockingAllTablesFor_ Pg.PgSelectLockingStrengthUpdate (Just Pg.PgSelectLockingOptionsNoWait) $
  filter_ (\c -> fromMaybe_ "" (addressCity (customerAddress c)) ==. "Dublin") $
  all_ (customer chinookDb)
SELECT "t0"."CustomerId" AS "res0",
       "t0"."FirstName" AS "res1",
       "t0"."LastName" AS "res2",
       "t0"."Company" AS "res3",
       "t0"."Address" AS "res4",
       "t0"."City" AS "res5",
       "t0"."State" AS "res6",
       "t0"."Country" AS "res7",
       "t0"."PostalCode" AS "res8",
       "t0"."Phone" AS "res9",
       "t0"."Fax" AS "res10",
       "t0"."Email" AS "res11",
       "t0"."SupportRepId" AS "res12"
FROM "Customer" AS "t0"
WHERE (COALESCE("t0"."City", '')) = ('Dublin')
  FOR
  UPDATE NOWAIT

We can also specify the locking clauses when JOINing. Suppose we want to get all customers who live in London and have a support rep who lives in Paris, and skipping rows that we can't lock.

Pg.lockingAllTablesFor_ Pg.PgSelectLockingStrengthShare (Just Pg.PgSelectLockingOptionsSkipLocked) $
  do customer <- filter_ (\c -> fromMaybe_ "" (addressCity (customerAddress c)) ==. "London") $
                 all_ (customer chinookDb)
     employee <- join_ (employee chinookDb)
                       (\e -> fromMaybe_ "" (addressCity (employeeAddress e)) ==. "Paris" &&.
                              just_ (pk e) ==. customerSupportRep customer)
     pure (customerFirstName customer, customerLastName customer, pk employee)
SELECT "t0"."FirstName" AS "res0",
       "t0"."LastName" AS "res1",
       "t1"."EmployeeId" AS "res2"
FROM "Customer" AS "t0"
INNER JOIN "Employee" AS "t1" ON ((COALESCE("t1"."City", '')) = ('Paris'))
AND (("t1"."EmployeeId") IS NOT DISTINCT
     FROM ("t0"."SupportRepId"))
WHERE (COALESCE("t0"."City", '')) = ('London')
  FOR SHARE SKIP LOCKED

You may notice that this query will lock rows in both the customers and employees table. This may not be what you want. You can also specify which tables to lock by using the lockingFor_ function. This requires you to specify which locks you want to hold by returning them from your query. For example, to lock only the customers table

Pg.lockingFor_ Pg.PgSelectLockingStrengthShare (Just Pg.PgSelectLockingOptionsSkipLocked) $
  do (customerLock, customer) <- Pg.locked_ (customer chinookDb)
     guard_ (fromMaybe_ "" (addressCity (customerAddress customer)) ==. "London")
     employee <- filter_ (\e -> fromMaybe_ "" (addressCity (employeeAddress e)) ==. "Paris" &&.
                                just_ (pk e) ==. customerSupportRep customer) $
                 all_ (employee chinookDb)
     pure ((customerFirstName customer, customerLastName customer, pk employee) `Pg.withLocks_` customerLock)
SELECT "t0"."FirstName" AS "res0",
       "t0"."LastName" AS "res1",
       "t1"."EmployeeId" AS "res2"
FROM "Customer" AS "t0"
CROSS JOIN "Employee" AS "t1"
WHERE ((COALESCE("t0"."City", '')) = ('London'))
  AND (((COALESCE("t1"."City", '')) = ('Paris'))
       AND (("t1"."EmployeeId") IS NOT DISTINCT
            FROM ("t0"."SupportRepId")))
  FOR SHARE OF "t0" SKIP LOCKED

In order to use the explicit locking clause, you need to use the locked_ function to get a reference to a lock for a particular table. This forces the locked table to be part of the join, which is a requirement for the Postgres locking clause. You can think of locked_ as exactly like all_, except it returns a table lock as the first return value.

Tip

Locks can be combined monoidally, using mappend or (<>). You can use this to lock multiple tables, by passing the result of mappend to withLocks_.

If you return mempty as the first argument, then this recovers the standard behavior of locking all tables.

lockingFor_ is the most general locking combinator. You can recover the same behavior as lockingAllTablesFor_ by using the lockAll_ function.

Pg.lockingFor_ Pg.PgSelectLockingStrengthShare (Just Pg.PgSelectLockingOptionsSkipLocked) $
  do (customerLock, customer) <- Pg.locked_ (customer chinookDb)
     guard_ (fromMaybe_ "" (addressCity (customerAddress customer)) ==. "London")
     employee <- filter_ (\e -> fromMaybe_ "" (addressCity (employeeAddress e)) ==. "Paris" &&.
                                just_ (pk e) ==. customerSupportRep customer) $
                 all_ (employee chinookDb)
     pure (Pg.lockAll_ (customerFirstName customer, customerLastName customer, pk employee))
SELECT "t0"."FirstName" AS "res0",
       "t0"."LastName" AS "res1",
       "t1"."EmployeeId" AS "res2"
FROM "Customer" AS "t0"
CROSS JOIN "Employee" AS "t1"
WHERE ((COALESCE("t0"."City", '')) = ('London'))
  AND (((COALESCE("t1"."City", '')) = ('Paris'))
       AND (("t1"."EmployeeId") IS NOT DISTINCT
            FROM ("t0"."SupportRepId")))
  FOR SHARE SKIP LOCKED

Tip

Table locks have the type PgLockedTables s, where s is the thread parameter, as described here

DISTINCT ON support

Postgres supports the DISTINCT ON clause with selects to return distinct results based on a particular key. The beam-postgres package provides the pgNubBy_ function to use this feature.

For example, to get an arbitrary customer from each distinct area code

Pg.pgNubBy_ (addressPostalCode . customerAddress) $
  all_ (customer chinookDb)
SELECT DISTINCT ON ("t0"."PostalCode") "t0"."CustomerId" AS "res0",
                   "t0"."FirstName" AS "res1",
                   "t0"."LastName" AS "res2",
                   "t0"."Company" AS "res3",
                   "t0"."Address" AS "res4",
                   "t0"."City" AS "res5",
                   "t0"."State" AS "res6",
                   "t0"."Country" AS "res7",
                   "t0"."PostalCode" AS "res8",
                   "t0"."Phone" AS "res9",
                   "t0"."Fax" AS "res10",
                   "t0"."Email" AS "res11",
                   "t0"."SupportRepId" AS "res12"
FROM "Customer" AS "t0"

Aggregates

string_agg

The Postgres string_agg aggregate combines all column values in a group separated by a given separator. beam-postgres provides pgStringAgg and pgStringAggOver to use the unquantified and quantified versions of the string_agg aggregate appropriately.

For example, to put together a list of all cities in all the postal codes we have for customers,

aggregate_ (\c -> ( group_ (addressPostalCode (customerAddress c))
                  , Pg.pgStringAgg (coalesce_ [addressCity (customerAddress c)] "") ",") ) $
  all_ (customer chinookDb)
SELECT "t0"."PostalCode" AS "res0",
       string_agg(COALESCE("t0"."City", ''), ',') AS "res1"
FROM "Customer" AS "t0"
GROUP BY "t0"."PostalCode"

The above will include one city multiple times if its shared by multiple customers.

aggregate_ (\c -> ( group_ (addressPostalCode (customerAddress c))
                  , Pg.pgStringAggOver distinctInGroup_ (coalesce_ [addressCity (customerAddress c)] "") ",") ) $
  all_ (customer chinookDb)
SELECT "t0"."PostalCode" AS "res0",
       string_agg(DISTINCT COALESCE("t0"."City", ''), ',') AS "res1"
FROM "Customer" AS "t0"
GROUP BY "t0"."PostalCode"

ON CONFLICT

Postgres supports targeting a particular constraint as the target of an ON CONFLICT clause. You can use conflictingConstraint with the name of the constraint with the regular insertOnConflict function to use this functionality.

For example, to update the row, only on conflicts relating to the "PK_CUSTOMER" constraint.

--! import Database.Beam.Backend.SQL.BeamExtensions (BeamHasInsertOnConflict(..))
--! import qualified Database.Beam.Postgres as Pg
let
  newCustomer = Customer 42 "John" "Doe" Nothing (Address (Just "Street") (Just "City") (Just "State") Nothing Nothing) Nothing Nothing "john.doe@johndoe.com" nothing_

runInsert $
  insertOnConflict (customer chinookDb) (insertValues [newCustomer])
    (Pg.conflictingConstraint "PK_Customer")
    (onConflictUpdateSet (\fields _ -> fields <-. val_ newCustomer))
INSERT INTO "Customer"("CustomerId",
                       "FirstName",
                       "LastName",
                       "Company",
                       "Address",
                       "City",
                       "State",
                       "Country",
                       "PostalCode",
                       "Phone",
                       "Fax",
                       "Email",
                       "SupportRepId")
VALUES (42, 'John', 'Doe', null, 'Street', 'City', 'State', null, null, null, null, 'john.doe@johndoe.com', null) ON CONFLICT ON CONSTRAINT "PK_Customer" DO
UPDATE
SET "CustomerId"=(42),
    "FirstName"=('John'),
    "LastName"=('Doe'),
    "Company"=(null),
    "Address"=('Street'),
    "City"=('City'),
    "State"=('State'),
    "Country"=(null),
    "PostalCode"=(null),
    "Phone"=(null),
    "Fax"=(null),
    "Email"=('john.doe@johndoe.com'),
    "SupportRepId"=(null);

Specifying actions

Often times, you do not want to update every field on a conflict. For example, for upserts, you rarely want to update the primary key. The function onConflictUpdateInstead allows you to restrict which fields are updated in the case of a conflict. The required function argument is a projection of which fields ought to be updated.

In the example below, we insert a new row, but if a row with the given primary key already exists, we update only the first and last name.

-- import qualified Database.Beam.Postgres as Pg
let
  newCustomer = Customer 42 "John" "Doe" Nothing (Address (Just "Street") (Just "City") (Just "State") Nothing Nothing) Nothing Nothing "john.doe@johndoe.com" nothing_

runInsert $
  Pg.insert (customer chinookDb) (insertValues [newCustomer]) $
    Pg.onConflict
      (Pg.conflictingFields primaryKey)
      (Pg.onConflictUpdateInstead
         (\c -> ( customerFirstName c
                , customerLastName c )))
INSERT INTO "Customer"("CustomerId",
                       "FirstName",
                       "LastName",
                       "Company",
                       "Address",
                       "City",
                       "State",
                       "Country",
                       "PostalCode",
                       "Phone",
                       "Fax",
                       "Email",
                       "SupportRepId")
VALUES (42, 'John', 'Doe', null, 'Street', 'City', 'State', null, null, null, null, 'john.doe@johndoe.com', null) ON CONFLICT ("CustomerId") DO
UPDATE
SET "FirstName"=("excluded"."FirstName"),
    "LastName"=("excluded"."LastName");

You can also specify a more specific update, using the onConflictUpdateSet function. This is the most general form of the postgres ON CONFLICT action. The excluded table is provided as the second argument. The syntax of the updates is similar to that of update.

In the following example, we append the old first name to the new first name and replace the old last name.

-- import qualified Database.Beam.Postgres as Pg
let
  newCustomer = Customer 42 "John" "Doe" Nothing (Address (Just "Street") (Just "City") (Just "State") Nothing Nothing) Nothing Nothing "john.doe@johndoe.com" nothing_

runInsert $
  Pg.insert (customer chinookDb) (insertValues [newCustomer]) $
    Pg.onConflict
      (Pg.conflictingFields primaryKey)
      (Pg.onConflictUpdateSet
        -- tbl is the old row, tblExcluded is the row proposed for insertion
        (\tbl tblExcluded -> mconcat
          [ customerFirstName tbl <-. concat_ [ current_ (customerFirstName tbl),  customerFirstName tblExcluded ]
          , customerLastName tbl <-. customerLastName tblExcluded ]
        )
      )
INSERT INTO "Customer"("CustomerId",
                       "FirstName",
                       "LastName",
                       "Company",
                       "Address",
                       "City",
                       "State",
                       "Country",
                       "PostalCode",
                       "Phone",
                       "Fax",
                       "Email",
                       "SupportRepId")
VALUES (42, 'John', 'Doe', null, 'Street', 'City', 'State', null, null, null, null, 'john.doe@johndoe.com', null) ON CONFLICT ("CustomerId") DO
UPDATE
SET "FirstName"=(CONCAT("Customer"."FirstName", "excluded"."FirstName")),
    "LastName"=("excluded"."LastName");

Inner CTEs

Standard SQL only allows CTEs (WITH expressions) at the top-level SELECT. However, PostgreSQL allows them anywhere, including in subqueries for joins.

For example, the following is valid Postgres, but not valid standard SQL.

SELECT a.column1, b.column2
FROM (WITH RECURSIVE ... SELECT ...) a
INNER JOIN b

beam-core enforces this by forcing selectWith to only return a SqlSelect, which represents a top-level SQL SELECT statement that can be executed against a backend. However, if we want to allow WITH expressions to appear within joins, then we will need a function similar to selectWith but returning a Q value, which is a re-usable query. beam-postgres provides this function for PostgreSQL, named pgSelectWith. For beam-postgres, select (pgSelectWith x) is equivalent to selectWith x. But, with the new type, we can reuse CTEs (including recursive ones) within other queries.

As an example using our Chinook schema, suppose we had an error with all orders in the month of September 2024, and needed to send out employees to customer homes to correct the issue. We want to find, for each order, an employee who lives in the same city as the customer, but we only want the highest ranking employee for each customer.

First, we order the employees by org structure so that managers appear first, followed by direct reports. We use a recursive query for this, and then join it against the orders.

aggregate_ (\(cust, emp) -> (group_ cust, Pg.pgArrayAgg (employeeId emp))) $ do
  inv <- filter_ (\i -> invoiceDate i >=. val_ (read "2024-09-01 00:00:00.000000")  &&. invoiceDate i <=. val_ (read "2024-10-01 00:00:00.000000")) $ all_ (invoice chinookDb)
  cust <- filter_ (\c -> pk c ==. invoiceCustomer inv) $ all_ (customer chinookDb)
  -- Lookup all employees and their levels
  (employee, _, _) <-
    Pg.pgSelectWith $ do
      let topLevelEmployees =
            fmap (\e -> (e, as_ @Int32 (val_ 0))) $
            filter_ (\e -> isNothing_ (employeeReportsTo e)) $ all_ (employee chinookDb)
      rec employeeOrgChart <-
            selecting (topLevelEmployees `unionAll_`
                        do { (manager, managerLevel) <- reuse employeeOrgChart
                          ; report <- filter_ (\e -> employeeReportsTo e ==. just_ (pk manager)) $ all_ (employee chinookDb)
                          ; pure (report, managerLevel + val_ 1) })
      pure $ filter_ (\(_, level, minLevel) -> level ==. minLevel)
            $ withWindow_ (\(employee, _) -> frame_ (partitionBy_ (addressCity (employeeAddress employee))) noOrder_ noBounds_)
                          (\(employee, level) cityFrame ->
                            (employee, level, coalesce_ [min_ level `over_` cityFrame] (val_ 0)))
                          (reuse employeeOrgChart)
  -- Limit the search only to employees that live in the same city
  guard_ (addressCity (employeeAddress employee) ==. addressCity (customerAddress cust))
  pure (cust, employee)
SELECT "t1"."CustomerId" AS "res0",
       "t1"."FirstName" AS "res1",
       "t1"."LastName" AS "res2",
       "t1"."Company" AS "res3",
       "t1"."Address" AS "res4",
       "t1"."City" AS "res5",
       "t1"."State" AS "res6",
       "t1"."Country" AS "res7",
       "t1"."PostalCode" AS "res8",
       "t1"."Phone" AS "res9",
       "t1"."Fax" AS "res10",
       "t1"."Email" AS "res11",
       "t1"."SupportRepId" AS "res12",
       array_agg("t2"."res0") AS "res13"
FROM "Invoice" AS "t0"
CROSS JOIN "Customer" AS "t1"
CROSS JOIN
  (WITH RECURSIVE "cte0"("res0",
                         "res1",
                         "res2",
                         "res3",
                         "res4",
                         "res5",
                         "res6",
                         "res7",
                         "res8",
                         "res9",
                         "res10",
                         "res11",
                         "res12",
                         "res13",
                         "res14",
                         "res15") AS (
                                        (SELECT "cte0_0"."EmployeeId" AS "res0",
                                                "cte0_0"."LastName" AS "res1",
                                                "cte0_0"."FirstName" AS "res2",
                                                "cte0_0"."Title" AS "res3",
                                                "cte0_0"."ReportsTo" AS "res4",
                                                "cte0_0"."BirthDate" AS "res5",
                                                "cte0_0"."HireDate" AS "res6",
                                                "cte0_0"."Address" AS "res7",
                                                "cte0_0"."City" AS "res8",
                                                "cte0_0"."State" AS "res9",
                                                "cte0_0"."Country" AS "res10",
                                                "cte0_0"."PostalCode" AS "res11",
                                                "cte0_0"."Phone" AS "res12",
                                                "cte0_0"."Fax" AS "res13",
                                                "cte0_0"."Email" AS "res14",
                                                0 AS "res15"
                                         FROM "Employee" AS "cte0_0"
                                         WHERE ("cte0_0"."ReportsTo") IS NULL)
                                      UNION ALL
                                        (SELECT "cte0_1"."EmployeeId" AS "res0",
                                                "cte0_1"."LastName" AS "res1",
                                                "cte0_1"."FirstName" AS "res2",
                                                "cte0_1"."Title" AS "res3",
                                                "cte0_1"."ReportsTo" AS "res4",
                                                "cte0_1"."BirthDate" AS "res5",
                                                "cte0_1"."HireDate" AS "res6",
                                                "cte0_1"."Address" AS "res7",
                                                "cte0_1"."City" AS "res8",
                                                "cte0_1"."State" AS "res9",
                                                "cte0_1"."Country" AS "res10",
                                                "cte0_1"."PostalCode" AS "res11",
                                                "cte0_1"."Phone" AS "res12",
                                                "cte0_1"."Fax" AS "res13",
                                                "cte0_1"."Email" AS "res14",
                                                ("cte0_0"."res15") + (1) AS "res15"
                                         FROM "cte0" AS "cte0_0"
                                         CROSS JOIN "Employee" AS "cte0_1"
                                         WHERE ("cte0_1"."ReportsTo") IS NOT DISTINCT
                                           FROM ("cte0_0"."res0")))SELECT "sub_t0"."res0" AS "res0",
                                                                          "sub_t0"."res1" AS "res1",
                                                                          "sub_t0"."res2" AS "res2",
                                                                          "sub_t0"."res3" AS "res3",
                                                                          "sub_t0"."res4" AS "res4",
                                                                          "sub_t0"."res5" AS "res5",
                                                                          "sub_t0"."res6" AS "res6",
                                                                          "sub_t0"."res7" AS "res7",
                                                                          "sub_t0"."res8" AS "res8",
                                                                          "sub_t0"."res9" AS "res9",
                                                                          "sub_t0"."res10" AS "res10",
                                                                          "sub_t0"."res11" AS "res11",
                                                                          "sub_t0"."res12" AS "res12",
                                                                          "sub_t0"."res13" AS "res13",
                                                                          "sub_t0"."res14" AS "res14",
                                                                          "sub_t0"."res15" AS "res15",
                                                                          "sub_t0"."res16" AS "res16"
   FROM
     (SELECT "sub_t0"."res0" AS "res0",
             "sub_t0"."res1" AS "res1",
             "sub_t0"."res2" AS "res2",
             "sub_t0"."res3" AS "res3",
             "sub_t0"."res4" AS "res4",
             "sub_t0"."res5" AS "res5",
             "sub_t0"."res6" AS "res6",
             "sub_t0"."res7" AS "res7",
             "sub_t0"."res8" AS "res8",
             "sub_t0"."res9" AS "res9",
             "sub_t0"."res10" AS "res10",
             "sub_t0"."res11" AS "res11",
             "sub_t0"."res12" AS "res12",
             "sub_t0"."res13" AS "res13",
             "sub_t0"."res14" AS "res14",
             "sub_t0"."res15" AS "res15",
             COALESCE(MIN("sub_t0"."res15") OVER (PARTITION BY "sub_t0"."res8"), 0) AS "res16"
      FROM "cte0" AS "sub_t0") AS "sub_t0"
   WHERE ("sub_t0"."res15") = ("sub_t0"."res16")) AS "t2"("res0",
                                                          "res1",
                                                          "res2",
                                                          "res3",
                                                          "res4",
                                                          "res5",
                                                          "res6",
                                                          "res7",
                                                          "res8",
                                                          "res9",
                                                          "res10",
                                                          "res11",
                                                          "res12",
                                                          "res13",
                                                          "res14",
                                                          "res15",
                                                          "res16")
WHERE (((("t0"."InvoiceDate") >= ('2024-09-01 00:00:00'))
        AND (("t0"."InvoiceDate") <= ('2024-10-01 00:00:00')))
       AND (("t1"."CustomerId") = ("t0"."CustomerId")))
  AND (("t2"."res8") IS NOT DISTINCT
       FROM ("t1"."City"))
GROUP BY "t1"."CustomerId",
         "t1"."FirstName",
         "t1"."LastName",
         "t1"."Company",
         "t1"."Address",
         "t1"."City",
         "t1"."State",
         "t1"."Country",
         "t1"."PostalCode",
         "t1"."Phone",
         "t1"."Fax",
         "t1"."Email",
         "t1"."SupportRepId"

COPY support

beam-postgres provides instances of MonadBeamCopyTo and MonadBeamCopyFrom (from Database.Beam.Backend.SQL.BeamExtensions) for PostgreSQL's file-mode COPY statement. See the cross-backend COPY page for the shared copyTableTo / copySelectTo / copyTableFrom API.

Server-side files only

PostgreSQL's COPY ... TO 'path' and COPY ... FROM 'path' operate on files on the database server, not on the client. As a result, the calling role needs the pg_write_server_files (for COPY TO) or pg_read_server_files (for COPY FROM) role attribute, or be a superuser. This is not a beam-postgres choice — it is a PostgreSQL security policy.

PostgreSQL supports CSV and a text format by default. Each format has its own options record so the type system prevents mixing options across formats. Both records have a default value to override selected fields against.

Here's an example of exporting to the text format:

--! import Database.Beam.Backend.SQL.BeamExtensions
runCopyTo $
  copyTableTo
    (playlist chinookDb)
    id -- no projection: entire table
    (Pg.copyToText "/tmp/beam-docs-playlists.txt")
COPY "Playlist"("PlaylistId",
                "Name") TO '/tmp/beam-docs-playlists.txt' (FORMAT text);

For CSV, the same export can be done this way:

--! import Database.Beam.Backend.SQL.BeamExtensions
--! import Database.Beam.Postgres
runCopyTo $
  copyTableTo
    (playlist chinookDb)
    id
    ( Pg.copyToCSVWith "/tmp/beam-docs-csv-options.csv"
        Pg.defaultPgCSVCopyToOptions
          { pgCsvCopyToDelimiter = Just '|'
          , pgCsvCopyToHeader    = Just True
          }
    )
COPY "Playlist"("PlaylistId",
                "Name") TO '/tmp/beam-docs-csv-options.csv' (FORMAT csv,
                                                             DELIMITER '|',
                                                                       HEADER TRUE);

Streaming COPY

beam-postgres also provides instances of MonadBeamCopyToStream and MonadBeamCopyFromStream for PostgreSQL's COPY ... TO STDOUT / COPY ... FROM STDIN statements. The data flows through the client connection rather than to/from a server-side file, so no special role attribute is required — this is the usual choice for application code.

The shared statement-builder API(copyTableToStream / copySelectToStream / copyTableFromStream) is documented on the cross-backend COPY page. The PostgreSQL-specific pieces are the smart constructors that build the options record:

Smart constructor Wire format
copyToTextStream / copyToTextStreamWith PostgreSQL text format
copyToCSVStream / copyToCSVStreamWith csv format
copyFromTextStream / copyFromTextStreamWith text format
copyFromCSVStream / copyFromCSVStreamWith csv format

The format-specific option records (PgTextCopyToOptions, PgCSVCopyToOptions, …) are the same ones used by the file-mode API, so overriding e.g. the delimiter or the header flag works identically.

The *Stream runners take an IO-typed callback that participates in the streaming protocol. For runCopyToStream, the callback is a ByteString -> IO () sink invoked once per chunk emitted by the server. The example below prints every chunk without materializing the whole dataset:

--! import Database.Beam.Backend.SQL.BeamExtensions
--! import Database.Beam.Postgres
--! import qualified Data.ByteString.Char8 as BS

runCopyToStream
  (copyTableToStream
     (playlist chinookDb)
      id
      Pg.copyToCSVStream
  )
  BS.putStrLn -- print every chunk
1,
Music 2,
      Movies 3,
             TV Shows 4,
                      Audiobooks 5,
                                 90s Music 6,
                                            Audiobooks 7,
                                                       Movies 8,
                                                              Music 9,
                                                                    Music Videos 10,
                                                                                 TV Shows 11,
                                                                                          Brazilian Music 12,
                                                                                                          Classical 13,
                                                                                                                    Classical 101 - Deep Cuts 14,
                                                                                                                                              Classical 101 - Next Steps 15,
                                                                                                                                                                         Classical 101 - The Basics 16,
                                                                                                                                                                                                    Grunge 17,
                                                                                                                                                                                                           Heavy Metal Classic 18,
                                                                                                                                                                                                                               On-The-Go 1

For runCopyFromStream, the callback is an IO (Maybe ByteString) source that is pulled until it returns Nothing. The example below replays the bytes captured above back into the table — first deleting the existing rows so the re-import does not conflict on the primary key. As with the file-mode example, the doc runner rolls back the surrounding transaction so the chinook database stays unchanged:

--! import Database.Beam.Backend.SQL.BeamExtensions
--! import Database.Beam.Postgres
--! import qualified Data.ByteString as BS
--! import Data.IORef

-- Capture rows via streaming COPY ... TO STDOUT.
chunksRef <- liftIO $ newIORef []
runCopyToStream
  (copyTableToStream (playlist chinookDb) id Pg.copyToCSVStream)
  (\chunk -> modifyIORef' chunksRef (chunk :))
payload <- liftIO (BS.concat . Prelude.reverse <$> readIORef chunksRef)

-- Clear the playlist table (and its dependents) so the re-import doesn't
-- conflict on primary keys.
runDelete $ delete (playlistTrack chinookDb) (\_ -> val_ True)
runDelete $ delete (playlist chinookDb) (\_ -> val_ True)

-- Replay the captured payload via streaming COPY ... FROM STDIN.
sourceRef <- liftIO $ newIORef (Just payload)
runCopyFromStream
  (copyTableFromStream (playlist chinookDb) id Pg.copyFromCSVStream)
  (do mchunk <- readIORef sourceRef
      writeIORef sourceRef Nothing
      pure mchunk)
DELETE
FROM "PlaylistTrack" AS "delete_target"
WHERE true;


DELETE
FROM "Playlist" AS "delete_target"
WHERE true;

The format options behave the same as in the file-mode case. Switching the above example to Pg.copyToCSVStreamWith / Pg.copyFromCSVStreamWith lets you override the CSV delimiter, the header flag, the quote character, and so on.