Как Apache Spark может окрылить аналитику авиакомпаний?

Опубликовано: 2015-10-30

Мировая индустрия авиаперевозок продолжает быстро расти, но стабильной и стабильной прибыльности пока не видно. По данным Международной ассоциации воздушного транспорта (IATA), за последнее десятилетие доходы отрасли удвоились: с 369 млрд долларов США в 2005 году до ожидаемых 727 млрд долларов США в 2015 году.



В секторе коммерческой авиации каждый игрок в цепочке создания стоимости — аэропорты, производители самолетов, производители реактивных двигателей, турагенты и сервисные компании — получают приличную прибыль.

Все эти игроки по отдельности генерируют чрезвычайно большие объемы данных из-за более высокой текучести транзакций рейсов. Ключевым моментом здесь является выявление и учет спроса, который предоставляет авиакомпаниям гораздо больше возможностей для дифференциации. Таким образом, авиационная промышленность может использовать анализ больших данных для увеличения продаж и увеличения прибыли.

Большие данные — это термин для сбора наборов данных, настолько больших и сложных, что их вычисление не может быть выполнено с помощью традиционных систем обработки данных или имеющихся инструментов СУБД.

Apache Spark — это среда распределенных кластерных вычислений с открытым исходным кодом, специально разработанная для интерактивных запросов и итерационных алгоритмов.

Абстракция Spark DataFrame — это табличный объект данных, аналогичный собственному фрейму данных R или пакету pandas для Python, но хранящийся в среде кластера.

Согласно последнему опросу Fortunes, Apache Spark является самой популярной технологией 2015 года.

Крупнейший поставщик Hadoop Cloudera также прощается с Hadoop MapReduce и приветствует Spark.

Что действительно дает Spark преимущество над Hadoop, так это скорость . Spark выполняет большинство своих операций в памяти , копируя их из распределенного физического хранилища в гораздо более быструю логическую оперативную память. Это сокращает время, затрачиваемое на запись и чтение на медленные, неуклюжие механические жесткие диски и с них, которые необходимо выполнять в системе Hadoops MapReduce.

Кроме того, Spark включает в себя инструменты (обработка в реальном времени, машинное обучение и интерактивный SQL), которые хорошо разработаны для решения бизнес-задач, таких как анализ данных в реальном времени путем объединения исторических данных с подключенных устройств, также известных как Интернет вещей .

Сегодня давайте проанализируем примеры данных об аэропортах с помощью Apache Spark .

В предыдущем блоге мы увидели, как обрабатывать структурированные и частично структурированные данные в Spark с помощью нового API Dataframes, а также рассказали, как эффективно обрабатывать данные JSON.

В этом блоге мы поймем, как запрашивать данные в DataFrames с помощью SQL , а также сохранять вывод в файловую систему в формате CSV.

Использование библиотеки синтаксического анализа Databricks CSV

Для этого я собираюсь использовать библиотеку разбора CSV, предоставленную Databricks, компанией, основанной создателями Apache Spark и которая в настоящее время занимается разработкой и распространением Spark.

Сообщество Spark состоит примерно из 600 участников, что делает его самым активным проектом во всем Apache Software Foundation, главном руководящем органе для программного обеспечения с открытым исходным кодом, с точки зрения количества участников.

Библиотека Spark-csv помогает нам анализировать и запрашивать данные csv в искре. Мы можем использовать эту библиотеку как для чтения, так и для записи данных csv в любую файловую систему, совместимую с Hadoop, и из нее.

Загрузка данных в Spark DataFrames

Давайте загрузим наши входные файлы в Spark DataFrames, используя библиотеку синтаксического анализа spark-csv от Databricks.

Вы можете использовать эту библиотеку в оболочке Spark, указав –packages com.databricks: spark-csv_2.10:1.0.3

При запуске оболочки, как показано ниже:

$ bin/spark-shell – пакеты com.databricks:spark-csv_2.10:1.0.3

Помните, что вы должны быть подключены к Интернету, потому что пакет spark-csv будет автоматически загружен, когда вы дадите эту команду. Я использую искру версии 1.4.0

Давайте создадим sqlContext с уже созданным объектом SparkContext(sc)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

Теперь давайте загрузим наши данные csv из файла airports.csv (csv аэропорта github), схема которого приведена ниже.


схема аэропорта csv

scala> val airportDF = sqlContext.load("com.databricks.spark.csv", Map("path" -> "/home /poonam/airports.csv", "header" -> "true"))

Операция загрузки проанализирует файл *.csv с помощью библиотеки искры-csv Databricks и вернет кадр данных с именами столбцов, такими же, как в первой строке заголовка в файле.

Ниже приведены параметры, передаваемые методу загрузки.

  1. Источник : «com.databricks.spark.csv» сообщает искре, что мы хотим загрузить файл csv.
  2. Опции:
  • path – путь к файлу, где он находится.
  • Заголовок : «заголовок» -> «истина» указывает искре сопоставить первую строку файла с именами столбцов для результирующего фрейма данных.

Давайте посмотрим, что такое схема нашего Dataframe

Проверьте образцы данных в нашем фрейме данных

scala> airportDF.show



Запрос данных CSV с использованием временных таблиц:

Чтобы выполнить запрос к таблице, мы вызываем метод sql() в SQLContext.

Мы создали DataFrame аэропортов и загрузили данные CSV, чтобы запросить эти данные DF, мы должны зарегистрировать их как временную таблицу с именем airports.

>
scala> airportDF.registerTempTable("airports")

Давайте узнаем, сколько аэропортов в юго-восточной части нашего набора данных.

scala> sqlContext.sql("select AirportID, Name, Latitude, Longitude from airports where Latitude<0 and Longitude>0").collect.foreach(println)

[1,Goroka,-6.081689,145.391881]
[2,Madang,-5.207083,145.7887]
[3,Mount Hagen,-5.826789,144.295861
]
[4,Nadzab,-6.569828,146.726242]
[5,Port Moresby Jacksons Intl,-9.443383,147.22005]
[6,Wewak Intl,-3.583828,143.669186]

Мы можем делать агрегации в sql-запросах на Spark.
Узнаем, сколько уникальных городов имеют аэропорты в каждой стране

scala> sqlContext.sql("select Country, count(distinct(City)) from airports group by Country").collect.foreach(println)
[Iceland,10]
[Greenland,4]
[Canada,131]
[Papua New Guinea,6]

Какова средняя высота (в футах) аэропортов в каждой стране?

scala> sqlContext.sql("select Country , avg(Altitude) from airports group by Country").collect
res6: Array[org.apache.spark.sql.Row] =Array(
[Iceland,72.8],
[Greenland,202.75],

[Canada,852.6666666666666],
[Papua New Guinea,1849.0])

Теперь, чтобы узнать, сколько аэропортов работает в каждом часовом поясе?

scala> sqlContext.sql("select Tz , count(Tz) from airports group by Tz").collect.foreach(println)
[America/Dawson_Creek,1]
[America/Coral_Harbour,3]
[America/Halifax,9]
[America/Toronto,48]
[America/Vancouver,19]
[America/Godthab,3]
[Pacific/Port_Moresby,6]
[Atlantic/Reykjavik,10]
[America/Thule,1]
[America/St_Johns,4]
[America/Winnipeg,14]
[America/Edmonton,27]
[America/Regina,10]

Мы также можем рассчитать среднюю широту и долготу для этих аэропортов в каждой стране.

scala> sqlContext.sql("select Country, avg(Latitude), avg(Longitude) from airports group by Country").collect.foreach(println)
[Iceland,65.0477736,-19.5969224]
[Greenland,67.22490275,-54.124131999999996]
[Canada,53.94868565185185,-93.950036237037]
[Papua New Guinea,-6.118766666666666,145.51532]

Давайте посчитаем, сколько существует различных DST

scala> sqlContext.sql("select count(distinct(DST)) from airports").collect.foreach(println)
[4]

Сохранение данных в формате CSV
До сих пор мы загружали и запрашивали данные csv. Теперь мы увидим, как сохранить результаты в формате CSV обратно в файловую систему.
Предположим, мы хотим отправить клиенту отчет обо всех аэропортах в северо-западной части всех стран.
Давайте посчитаем это сначала.

scala> val NorthWestAirportsDF=sqlContext.sql("select AirportID, Name, Latitude, Longitude from airports where Latitude>0 and Longitude<0")
NorthWestAirportsDF: org.apache.spark.sql.DataFrame = [AirportID: string, Name: string, Latitude: string, Longitude: string]

И сохраните его в файл CSV

scala> NorthWestAirportsDF.save("com.databricks.spark.csv", org.apache.spark.sql.SaveMode.ErrorIfExists, Map("path" -> "/home/poonam/NorthWestAirports.csv","header"->"true"))

Ниже приведены параметры, передаваемые в метод сохранения.

  • Источник : то же, что и метод загрузки com.databricks.spark.csv , который указывает искре сохранять данные как csv.
  • SaveMode : это позволяет пользователю заранее указать, что необходимо сделать, если данный выходной путь уже существует. Так что существующие данные не будут потеряны/переписаны по ошибке. Вы можете выдать ошибку, добавить или перезаписать. Здесь мы выдали ошибку ErrorIfExists , так как не хотим перезаписывать какой-либо существующий файл.
  • Параметры : эти параметры аналогичны тем, которые мы передали методу загрузки . Опции:
  • path – путь к файлу, где он должен храниться.
  • Заголовок : «header» -> «true» указывает искре отображать имена столбцов фрейма данных в первую строку результирующего выходного файла.

Преобразование других форматов данных в CSV
Мы также можем преобразовать любой другой формат данных, такой как JSON, паркет, текст в CSV, используя эту библиотеку.

В предыдущем блоге мы создали данные json. вы можете найти его на гитхабе

scala> val employeeDF = sqlContext.read.json("/home/poonam/employee.json")

давайте просто сохраним его как CSV.

scala> employeeDF.save("com.databricks.spark.csv", org.apache.spark.sql.SaveMode.ErrorIfExists, Map("path" -> "/home/poonam/employee.csv", "header"->"true"))

Заключение:

В этом посте мы собрали некоторые сведения о данных аэропортов с помощью интерактивных запросов SparkSQL.
и изучили библиотеку парсинга csv от Spark

В следующем блоге мы рассмотрим очень важный компонент Spark, т.е. Spark Streaming.

Spark Streaming позволяет пользователям собирать данные в реальном времени в Spark и обрабатывать их по мере их поступления, мгновенно выдавая результаты.