이전에 스파크를 설치해보았다. 지금은 스파크의 간단한 코드를 작성해볼 것이다. 환경은 파이썬 내에서 실행한다. 파이썬에서 스파크를 실행하기 위해서는 Session을 생성해주어야한다. 아래와 같이 Session을 생성하고 변수에 받을 수 있다.
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# Session 생성
spark = SparkSession.builder.appName('Basics').getOrCreate()
# make DataFrame
myRange = spark.range(1000).toDF('number')
myRange는 0부터 999까지의 데이터를 받아 데이터 프레임 형태로 변환한 값을 받고 있다. 스파크에서는 'action'을 수행하지 않으면 출력이 되지 않으므로, myRange를 실행해도 아무런 값이 도출되지 않는다.
divisBy2 = myRange.where('number % 2 = 0')
# action
myRange.count() # 500
divisBy2.count() # 1000
myRange에 where이라는 함수를 사용하여 myRange의 값 중에서 짝수인 값만 호출해 divisBy2라는 변수에 할당해주었다. 그 후 count()를 사용해 action 을 해주면 500, 1000 이렇게 각각의 값이 출력되는 것을 확인할 수 있을 것이다.
스파크의 액션에는 총 3가지 유형이 존재한다.
1. 콘솔에서 데이터를 보는 액션
2. 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
3. 출력 데이터소스에 저장하는 액션
연습문제
flightData2015 = spark.read.option('inferSchema', 'true').option('header', 'true').csv(file)
flightData2015.take(3)
spark에서는 schema를 가지고 올 수 있다. option('inferSchema', 'true')는 스키마 추론 기능을 사용한다는 것을 의미하며, 데이터 프레임의 스키마를 추론할 수 있다. 예를 들어 count라는 변수의 구성이 number로 되어 있으면 숫자의 형태로 받아오고, string 형태로 되어 있으면 string으로 받아오는 것을 의미한다.
option('header', 'true')는 파이썬에서 csv 파일을 불러올 때 header = True 의 의미와 동일하다. option('header', 'true')를 수행하면 맨 첫번째 열을 header로 인식해서 column으로 받아올 수 있다. 그 후 csv(file)을 통해 csv 파일을 불러온다. file에는 파일이 위치한 경로와 파일 이름을 설정하면 된다.
아래에 보이는 take는 파이썬에서 head와 동일하다 보면 된다. 상위 3개의 정보를 가져와 출력하는데 [Row(col1 = data1), Row(col2 = data2), ...] 의 구조로 데이터를 가지고 온다. csv 파일을 읽어 리스트 형태로 변환하는 과정을 수행해 가지고 온다고 보면 된다.
flightData2015.sort('count').take(2)
from pyspark.sql.functions import max
flightData2015.select(max('count')).take(1)
flightData2015.sort('count').take(2)는 flightData2015라는 데이터프레임을 count 컬럼을 기준으로 sorting한 후 상위 2개를 가져오는 코드이다. max는 파이썬과 같이 count라는 컬럼에서 최댓값을 가져오는데 1개만 가져온다는 의미이다.
from pyspark.sql.functions import desc
flightData2015.groupBy('DEST_COUNTRY_NAME').sum('count')\
.withColumnRenamed('sum(count)', 'destination_total')\
.sort(desc('destination_total')).limit(5).show()
flightData2015 데이터를 DEST_COUNTRY_NAME을 기준으로 그룹화 해준 후 count를 합산한다. 그 후 withColumnRenamed를 통해 column 이름을 변환해주고 destination_total의 역순을 기준으로 정렬 해준다. 마지막으로 limit(5)를 통해 5개를 제한한 후 show()로 출력해준다. 위 코드를 수행한 결과는 아래와 같다.
'Python > Pyspark' 카테고리의 다른 글
[pyspark] Example of pyspark ML (0) | 2022.02.22 |
---|---|
[pyspark] csv 파일 불러오기 (0) | 2022.02.21 |
[pyspark] 환경 설정 (0) | 2022.02.17 |