Open In Colab

데이터 API

import sys
import sklearn
import tensorflow as tf
from tensorflow import keras

import numpy as np
import pandas as pd

import os
import matplotlib.pyplot as plt
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X) # 주어진 데이터 소스를 여러 텐서로 자릅니다.
dataset # tf.data.Dataset.range(10)과 동등함.
<TensorSliceDataset shapes: (), types: tf.int32>
for item in dataset:
    print(item)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)

from_tensor_slices 함수는 텐서를 받아 10개의 아이템(0~9)으로 쪼갭니다.

이 때 shapes가 () 인것은 크기가 1인 텐서 10개로 쪼개졌다는 것을 의미합니다.

dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)

원래 dataset에 repeat 함수를 적용하면 원본 데이터 아이템을 세차레 반복하는 새로운 데이터셋을 반환합니다.

그 뒤 batch 함수를 적용하면 아이템을 7개씩 그룹으로 묶습니다. 이 때 마지막 데이터셋은 2개가 됩니다.

dataset = dataset.map(lambda x: x*2)

for item in dataset:
    print(item)
tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)

map 함수로 데이터 셋 내 아이템을 변환할 수 있습니다.

tf.random.set_seed(42)

dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=5, seed = 42).batch(7)

for item in dataset:
    print(item)
tf.Tensor([0 1 6 5 7 3 9], shape=(7,), dtype=int64)
tf.Tensor([8 2 1 0 4 6 4], shape=(7,), dtype=int64)
tf.Tensor([7 2 5 9 2 1 3], shape=(7,), dtype=int64)
tf.Tensor([4 3 8 7 9 5 0], shape=(7,), dtype=int64)
tf.Tensor([8 6], shape=(2,), dtype=int64)

buffer_size 만큼 버퍼를 만든 뒤 데이터를 앞 부분부터 순서대로 버퍼 최대 크기만큼 뽑아 버퍼를 채웁니다. 그 후 버퍼 중 한 개 데이터를 뽑습니다.

뽑은 데이터는 첫번째 결과값으로 지정하고 다시 데이터 셋에서 다음 순서 데이터를 뽑아 버퍼를 채웁니다.

그 후 버퍼 중 렌덤하게 한 개 데이터를 뽑아 두번째 결과값으로 지정합니다. 이 과정을 반복하면 데이터가 셔플하게 됩니다.

하지만 여전히 앞 부분 데이터는 앞 부분에 있을 확률이 커 셔플이 됬다고 하기 어렵습니다. 버퍼 크기가 현저히 작을 경우 그런 경향이 더 심합니다.

이를 해결하기 위해 원본 데이터를 에포크마다 한번씩 섞어줘야 합니다.

여러 CSV 파일에서 한 줄씩 번갈아 읽기

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

주택 데이터셋을 다운받습니다. 훈련/검증/테스트 로 나누고 스케일을 조정합니다.

def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

데이터를 여러개의 CSV파일로 쪼갰습니다. (이 부분은 핸즈온머신러닝 깃허브 코드를 복사했습니다.)

train_filepaths
['datasets/housing/my_train_00.csv',
 'datasets/housing/my_train_01.csv',
 'datasets/housing/my_train_02.csv',
 'datasets/housing/my_train_03.csv',
 'datasets/housing/my_train_04.csv',
 'datasets/housing/my_train_05.csv',
 'datasets/housing/my_train_06.csv',
 'datasets/housing/my_train_07.csv',
 'datasets/housing/my_train_08.csv',
 'datasets/housing/my_train_09.csv',
 'datasets/housing/my_train_10.csv',
 'datasets/housing/my_train_11.csv',
 'datasets/housing/my_train_12.csv',
 'datasets/housing/my_train_13.csv',
 'datasets/housing/my_train_14.csv',
 'datasets/housing/my_train_15.csv',
 'datasets/housing/my_train_16.csv',
 'datasets/housing/my_train_17.csv',
 'datasets/housing/my_train_18.csv',
 'datasets/housing/my_train_19.csv']

분할된 CSV파일 저장 경로 리스트 입니다.

filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed = 42)

for filepath in filepath_dataset:
    print(filepath)
tf.Tensor(b'datasets/housing/my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_04.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_11.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_18.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_06.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_13.csv', shape=(), dtype=string)

list_files 함수는 파일 경로를 섞은 데이터셋을 반환합니다.

n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1), # 파일의 첫번째 줄은 열이름이므로 skip함.
    cycle_length = n_readers
)

for line in dataset.take(5):
    print(line.numpy())
b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504'
b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159'
b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'

interleave 함수는 데이터셋(파일경로) 내 다섯 개에서 데이터를 읽는 데이터셋을 만듭니다.

이 때 함수 내 lambda 함수를 이용해 새로운 데이터셋을 만들 것입니다.

인터러브 데이터셋을 반복 구문에 적용하면 다섯 개의 TextLineDataset을 순회하고, 모든 데이터셋의 아이템이 소진될때까지 한 줄씩 읽습니다.

출력된 값은 바이트 스트링입니다.

데이터 전처리 하기

[0.] * 8 + [tf.constant([], dtype = tf.float32)]
[0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 <tf.Tensor: shape=(0,), dtype=float32, numpy=array([], dtype=float32)>]
n_inputs = X_train.shape[-1]

def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype = tf.float32)] # 디폴트값 텐서 형태로 만들기
    fields = tf.io.decode_csv(line, record_defaults=defs) # 스칼라 텐서의 리스트를 반환
    x = tf.stack(fields[:-1]) # 마지막 전까지
    y = tf.stack(fields[-1:])
    return (x-X_mean) / X_std, y

preprocess(b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504')
(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.39593136,  0.74167496, -0.16415128, -0.40340805, -0.6199179 ,
        -0.18355484, -1.4084505 ,  1.2565969 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.504], dtype=float32)>)

전처리 하는 함수를 만들었습니다.

데이터 적재, 전처리 함수 만들기

def csv_reader_dataset(filepaths, repeat = 1, n_readers = 5,
                       n_read_threads = None, shuffle_buffer_size = 10000,
                       n_parse_threads = 5, batch_size = 32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    # 파일 경로를 섞은 데이터셋 반환
    dataset = dataset.interleave( # 한번에 n_readers 파일 수만큼 한 줄씩 번갈아 읽음.
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1), # 첫줄은 열이름이므로 제외.
        cycle_length = n_readers, num_parallel_calls = n_read_threads # 여러파일에서 병렬로 읽고싶을때 사용.
    )
    dataset = dataset.shuffle(shuffle_buffer_size) # 데이터를 셔플합니다. 버퍼는 shuffle_buffer_size만큼.
    dataset = dataset.map(preprocess, num_parallel_calls = n_parse_threads) # preprocess 함수를 데이터 내 적용.

    return dataset.batch(batch_size).prefetch(1) # batch_size만큼 배치 함수를 이용해 데이터를 묶습니다.
    # 프리페치는 속도를 향상시킵니다.

데이터를 적재하고 전처리 하는 부분까지 한번에 하는 함수를 구현했습니다.

tf.random.set_seed(42)

train_set = csv_reader_dataset(train_filepaths, batch_size = 3)
for X_batch, y_batch in train_set.take(2):
    print('X = ', X_batch)
    print('y =', y_batch)
X =  tf.Tensor(
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[1.752]
 [1.313]
 [1.535]], shape=(3, 1), dtype=float32)
X =  tf.Tensor(
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.919]
 [1.028]
 [2.182]], shape=(3, 1), dtype=float32)

직접 구현한 함수를 통해 여러개로 나누어진 csv의 경로를 랜덤하게 배치 사이즈만큼 적재하고 전처리한 모습입니다.

train_set = csv_reader_dataset(train_filepaths, repeat = None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(1),
])

model.compile(loss="mse", optimizer=keras.optimizers.SGD(learning_rate=1e-3))

csv 경로를 이용해 데이터셋을 적재하고 전처리 한 뒤 케라스로 간단한 딥러닝 모델을 구축합니다.

batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,
          validation_data=valid_set) 
        # steps_per_epoch 은 한 에포크당 몇번 진행하는지. 입력값이 train_set이기 때문에 확실히 모름.
Epoch 1/10
362/362 [==============================] - 1s 3ms/step - loss: 2.3908 - val_loss: 1.7373
Epoch 2/10
362/362 [==============================] - 1s 3ms/step - loss: 0.8812 - val_loss: 0.7719
Epoch 3/10
362/362 [==============================] - 1s 2ms/step - loss: 0.7535 - val_loss: 0.7118
Epoch 4/10
362/362 [==============================] - 1s 2ms/step - loss: 0.7236 - val_loss: 1.0734
Epoch 5/10
362/362 [==============================] - 1s 2ms/step - loss: 0.6724 - val_loss: 0.6422
Epoch 6/10
362/362 [==============================] - 1s 3ms/step - loss: 0.6682 - val_loss: 0.6402
Epoch 7/10
362/362 [==============================] - 1s 3ms/step - loss: 0.6197 - val_loss: 0.8125
Epoch 8/10
362/362 [==============================] - 1s 3ms/step - loss: 0.6181 - val_loss: 0.5822
Epoch 9/10
362/362 [==============================] - 1s 3ms/step - loss: 0.5769 - val_loss: 1.0522
Epoch 10/10
362/362 [==============================] - 1s 3ms/step - loss: 0.5596 - val_loss: 0.6446
<keras.callbacks.History at 0x7fa4918802d0>

잘 진행되는 모습입니다.

느낀점

데이터 API를 이용해 텐서플로에서 데이터 적재하는 방법을 간단히 알아봤습니다.

실제로 대용량 데이터를 저장하고 효율적으로 읽기 위해선 TFRecord를 활용한다고 하는데 이후에 추가로 공부해보겠습니다.

딥러닝에 대해 지금까지 가볍게 공부하고 있는데 솔직히 막연하다는 생각이 듭니다. 실전 경험이 없어서일까요.

이와 별개로 개인적으로 스스로에게 휴식을 주고자 합니다. 몸상태가 좋지 못하고, 학기종강 이후 쉼을 못준 것 같네요.

아마 당분간은 주 1회 SSUDA 스터디 코드만 올라갈 것 같습니다.