Я пытаюсь прочитать файлы, присутствующие на Sequenceпутях в scala. Ниже приведен пример (псевдо) кода:
val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)
Теперь в приведенном выше порядке одни пути существуют, а другие нет. Есть ли способ игнорировать отсутствующие пути при чтении parquetфайлов (чтобы избежать org.apache.spark.sql.AnalysisException: Path does not exist)?
Я попробовал следующее, и, похоже, это работает, но в конце концов я дважды прочитал один и тот же путь, которого хотел бы избежать:
val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)
Я проверил optionsметод, DataFrameReaderно, похоже, нет такой опции, как ignore_if_missing.
Кроме того, эти пути могут быть hdfsлибо s3( Seqэто передается как аргумент метода), и при чтении я не знаю, можно s3ли hdfsиспользовать путь s3или hdfsконкретный API для проверки существования.
Решение
Вы можете отфильтровать ненужные файлы, как в ответе @Psidom. В Spark лучший способ сделать это — использовать внутреннюю конфигурацию Spark для Hadoop. Учитывая, что переменная сеанса Spark называется «spark», вы можете сделать:
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
def testDirExist(path: String): Boolean = {
val p = new Path(path)
hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)
Комментариев нет:
Отправить комментарий