Перейти к основному содержимому
Перейти к основному содержимому

Коннектор Spark

Этот коннектор использует специфические для ClickHouse оптимизации, такие как продвинутое партиционирование и сокращение предикатов, для улучшения производительности запросов и обработки данных. Коннектор основан на официальном JDBC коннекторе ClickHouse и управляет своим собственным каталогом.

До версии Spark 3.0 в Spark не существовало концепции встроенного каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователи должны были вручную регистрировать свои источники данных перед доступом к ним в Spark. Однако с введением концепции каталога в Spark 3.0 теперь Spark может автоматически обнаруживать таблицы, регистрируя плагины каталогов.

Каталог по умолчанию для Spark — spark_catalog, а таблицы идентифицируются по формату {catalog name}.{database}.{table}. С новой функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.

Требования

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3, 3.4 или 3.5

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от
0.3.0Spark 3.2, 3.3Не зависит от
0.2.1Spark 3.2Не зависит от
0.1.2Spark 3.2Не зависит от

Установка и настройка

Для интеграции ClickHouse с Spark существует несколько вариантов установки, подходящих для различных проектных настроек. Вы можете добавить коннектор ClickHouse Spark в качестве зависимости непосредственно в файле сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Кроме того, вы можете поместить необходимые JAR-файлы в папку $SPARK_HOME/jars/ или передать их непосредственно в качестве опции Spark, используя флаг --jars в команде spark-submit. Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.

Импорт как зависимость

Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.

Скачивание библиотеки

Шаблон имени двоичного JAR:

Вы можете найти все доступные выпущенные JAR-файлы в Maven Central Repository и все дневные сборки SNAPSHOT JAR-файлы в Sonatype OSS Snapshots Repository.

к сведению

Важно включать clickhouse-jdbc JAR с классификатором "all", так как коннектор зависит от clickhouse-http и clickhouse-client — оба из которых объединены в clickhouse-jdbc:all. В качестве альтернативы вы можете добавлять clickhouse-client JAR и clickhouse-http по отдельности, если не хотите использовать полный пакет JDBC.

В любом случае убедитесь, что версии пакетов совместимы согласно Матрице совместимости.

Регистрация каталога (обязательно)

Чтобы получить доступ к вашим таблицам ClickHouse, вы должны настроить новый каталог Spark с следующими параметрами:

СвойствоЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AДа
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostНет
spark.sql.catalog.<catalog_name>.protocolhttphttpНет
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123Нет
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultНет
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)Нет
spark.sql.catalog.<catalog_name>.database<database>defaultНет
spark.<catalog_name>.write.formatjsonarrowНет

Эти настройки могут быть заданы одним из следующих способов:

  • Редактировать/создать spark-defaults.conf.
  • Передать конфигурацию вашей команде spark-submit (или вашим командам spark-shell/spark-sql CLI).
  • Добавить конфигурацию при инициализации вашего контекста.
к сведению

При работе с кластером ClickHouse вы должны задать уникальное имя каталога для каждого экземпляра. Например:

Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table> из Spark SQL с помощью clickhouse1.<ck_db>.<ck_table>, а к таблице clickhouse2 <ck_db>.<ck_table> с помощью clickhouse2.<ck_db>.<ck_table>.

Чтение данных

Запись данных

Операции DDL

Вы можете выполнять операции DDL на вашем экземпляре ClickHouse, используя Spark SQL, при этом все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет вам писать запросы точно так же, как вы делали бы это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие - без изменений, например:

Приведенные примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своем приложении, используя любой API — Java, Scala, PySpark или оболочку.

Конфигурации

Следующие Adjustable конфигурации доступны в коннекторе:


КлючЗначение по умолчаниюОписаниеС версии
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиций, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если установлено true, игнорировать неподдерживаемые выражения, в противном случае быстро завершить с исключением. Обратите внимание, что при включении spark.clickhouse.write.distributed.convertLocal, игнорирование неподдерживаемых ключей шардирования может испортить данные.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для распаковки данных для чтения. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении распределенной таблицы, читать локальную таблицу вместо самой себя. Если установлено true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧтение ClickHouse типа FixedString как указанный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключение фильтра в реальном времени для чтения.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли установлено true, создать фильтр входной партиции по виртуальной колонке _partition_id, а не по значению партиции. Известны проблемы с составлением SQL предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли установлено true, пометить все поля схемы запроса как допустимые при выполнении CREATE/REPLACE TABLE ... AS SELECT ... для создания таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступно в Spark 3.5), без этой правки всегда будет действовать как true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей на партию при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в распределенную таблицу, писать локальную таблицу вместо самой себя. Если установлено true, игнорировать spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗапись на все узлы кластера при записи в распределенную таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации для записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли установлено true, выполнить локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение spark.clickhouse.write.repartitionByPartitionЕсли установлено true, выполнить локальную сортировку по партиции перед записью. Если не установлено, будет эквивалентно spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество попыток записи, которые мы будем делать для одной записи, завершившейся неудачей с кодами, подлежащими повторной попытке.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueСледует ли перераспределять данные по ключам партиции ClickHouse, чтобы соответствовать распределению таблицы ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перераспределение данных, чтобы соответствовать распределению таблицы ClickHouse, требуется перед записью; используйте эту конфигурацию, чтобы указать номер перераспределения; значение меньше 1 означает отсутствие требований.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли установлено true, Spark будет строго распределять входящие записи по партициям, чтобы удовлетворить требуемое распределение перед передачей записей в таблицу источника данных на запись. В противном случае Spark может применять определенные оптимизации для ускорения запроса, но нарушить требование распределения. Обратите внимание, что эта конфигурация требует SPARK-37523 (доступно в Spark 3.4), без этой правки всегда будет действовать как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок, подлежащие повторной попытке, возвращаемые сервером ClickHouse при неудаче записи.0.1.0

Поддерживаемые типы данных

В этом разделе описывается сопоставление типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрое руководство по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяЯвляется примитивнымПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаКонтролируется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элемента массива также преобразуется
MapMapTypeНетКлючи ограничены StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется конкретный тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяЯвляется примитивнымПримечания
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (список, кортеж или массив)ArrayНетТип элемента массива также преобразуется
MapTypeMapНетКлючи ограничены StringType
Object
Nested

Участие и поддержка

Если вы хотите внести свой вклад в проект или сообщить о любых проблемах, мы приветствуем ваше мнение! Посетите наш репозиторий на GitHub, чтобы открыть проблему, предложить улучшения или отправить запрос на изменение. Ваш вклад приветствуется! Пожалуйста, ознакомьтесь с руководством по участию в репозитории перед началом. Спасибо за помощь в улучшении нашего коннектора ClickHouse Spark!