[SSUDA] 텐서플로 사용하기
import numpy as np
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"
# 버전이 2.4보다 커야합니다.
np.random.seed(42)
tf.random.set_seed(42)
넘파이와 텐서플로 패키지를 설치합니다.
tf.constant([[1., 2., 3.], [4., 5., 6.]])
constant 함수로 텐서를 생성했습니다. 넘파이 행렬과 유사합니다.
유의할 점은 텐서는 변경이 불가능한 객체입니다.
tf.constant(42)
스칼라 값도 입력이 가능합니다.
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape
t.dtype
넘파이 행렬과 비슷하게 크기(shape)와 데이터 타입(dtype)을 가집니다.
t[:, 1:]
t[:, 1, tf.newaxis]
tf.square(t)
t @ tf.transpose(t)
텐서 데이터 타입은 넘파이와 마찬가지로 여러가지 연산이 가능합니다.
a = np.array([2., 4., 5.])
tf.constant(a)
t.numpy()
텐서롤 넘파이로, 넘파이를 텐서로 쉽게 변형이 가능합니다.
다만 넘파이는 64비트 기반 텐서는 32비트 기반이기 때문에 넘파이 배열로 텐서를 만들때는 dtype = tf.float32로 해야합니다.
v = tf.Variable([[1.,2.,3.], [4., 5., 6.]])
v.assign(2 * v)
v[0, 1].assign(42)
tf.Tensor은 앞서 말한데로 변경이 불가능한 객체입니다. 이 객체만으로는 지속적으로 업데이트되는 신경망 가중치등을 사용할 수 없습니다.
tf.Variable은 이 경우에 필요합니다. assign 함수를 이용해 변수 값을 바꿀 수 있습니다.
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
X_train_full, y_train_full, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)
주택 가격 데이터를 입력해 트레인, 벨리드, 테스트 데이터로 분할하고 스케일링 해줍니다.
input_shape = X_train.shape[1:]
model = keras.models.Sequential([
keras.layers.Dense(30, activation = 'selu', kernel_initializer= 'lecun_normal',
# LeCun 정규분포에서 값을 추출해 가중치 초기값을 설정합니다.
input_shape = input_shape),
keras.layers.Dense(1)
])
간단한 딥러닝 모델을 구축합니다. 이때 활성화 함수로 자기정규화를 일으키는 selu을 사용합니다.
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < 1 # 작은 오차라면
squared_loss = tf.square(error) / 2 # 오차 제곱의 절반값
linear_loss = tf.abs(error) - 0.5 # 큰 오차라면 오차의 절대값 - 0.5
# where 함수는 조건에 따른 벡터 출력에 최적화 되어있습니다.
# 참일때는 2번 입력값을 거짓일때는 3번입력값을 출력한 것을 묶어 벡터로 출력합니다.
return tf.where(is_small_error, squared_loss, linear_loss)
손실함수를 직접 구현합니다. 평균 제곱 오차는 큰 오차에 너무 과한 벌칙이 있고 절대값 오차는 이상치에 너무 관대합니다.
그러므로 앞서 말한 단점을 보안한 후버 손실 함수를 직접 구현하였습니다.
model.compile(loss = huber_fn, optimizer = 'nadam', metrics = 'mae')
model.fit(X_train_scaled, y_train, epochs = 2, validation_data=(X_valid_scaled, y_valid))
앞서 정의한 손실함수를 사용해 간단한 모델을 적합시켰습니다.
model.save('my_model_with_first')
model = keras.models.load_model('my_model_with_first',
custom_objects = {'huber_fn' : huber_fn})
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
모델을 저장하고 다시 불러왔습니다. 앞선 모델과 꽤 유사하게 정상적으로 잘 작동합니다.
class HuberLoss(keras.losses.Loss): # 케라스 loss 클래스 상속
def __init__(self, threshold = 1.0, **kwargs):
self.threshold = threshold
super().__init__(**kwargs)
def call(self, y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < self.threshold
squared_loss = tf.square(error) / 2
linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2/ 2
return tf.where(is_small_error, squared_loss, linear_loss)
def get_config(self): # 이 함수를 구현하여 threshold 값을 모델 로드시에도 유지할 수 있습니다.
base_config = super().get_config()
return {**base_config, 'threshold' : self.threshold}
손실함수를 함수로 구현한다면 함수 내 매개변수가 있을 때 그 값은 모델 로드시 날아갑니다.
이를 방지하려면 클래스로 손실함수를 구현해야합니다. 케라스 loss 클래스를 상속하고 get_config 함수를 구현하면 됩니다.
model.compile(loss = HuberLoss(2.), optimizer='nadam', metrics = 'mae')
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
model.save('my_model_with_second')
model = keras.models.load_model('my_model_with_second',
custom_objects = {'HuberLoss' : HuberLoss})
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
모델을 처음 만들었을때와 저장 후 로드한 모델을 새로 사용했을 때 비슷한 결과를 보이는 것을 알 수 있습니다.
이는 매개변수가 잘 저장됬다는 것을 보여줍니다. (threshold = 2)
keras.backend.clear_session() # 환경을 초기화 해줍니다.
np.random.seed(42)
tf.random.set_seed(42)
def my_softplus(z):
return tf.math.log(tf.exp(z) + 1.0)
def my_glorot_initializer(shape, dtype = tf.float32):
stddev = tf.sqrt(2. / (shape[0] + shape[1]))
return tf.random.normal(shape, stddev = stddev, dtype = dtype)
def my_l1_regularizer(weights):
return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights):
return tf.where(weights < 0., tf.zeros_like(weights), weights)
활성화 함수, 가중치 초기값 부여방식, 규제, 제한을 이미 텐서플로 내 존재하지만 직접 구현해보았습니다.
model = keras.models.Sequential([
keras.layers.Dense(30, activation = 'selu', kernel_initializer='lecun_normal',
input_shape = input_shape),
keras.layers.Dense(1, activation= my_softplus,
kernel_regularizer = my_l1_regularizer,
kernel_constraint = my_positive_weights,
kernel_initializer = my_glorot_initializer),
])
model.compile(loss = 'mse', optimizer = 'nadam', metrics = ['mae'])
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
이를 적용시킨 모델을 만들었습니다.
model.save('my_model_with_third')
model = keras.models.load_model('my_model_with_third',
custom_objects={
"my_l1_regularizer": my_l1_regularizer,
"my_positive_weights": my_positive_weights,
"my_glorot_initializer": my_glorot_initializer,
"my_softplus": my_softplus,
})
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
잘 작동합니다.
class MyDense(keras.layers.Layer):
def __init__(self, units, activation = None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, batch_input_shape):
# 층이 처음 사용될 때 호출되는 함수
self.kernel = self.add_weight(
name = 'kernel', shape = [batch_input_shape[-1], self.units],
initializer = 'glorot_normal'
)
self.bias = self.add_weight(
name = 'bias', shape = [self.units], initializer = 'zeros'
)
super().build(batch_input_shape)
def call(self, X):
return self.activation(X @ self.kernel + self.bias)
def compute_output_shape(self, batch_input_shape):
return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
def get_config(self):
# 환경설정 저장
base_config = super().get_config()
return {**base_config, 'units': self.units,
'activation' : keras.activations.serialize(self.activation)}
층을 직접 구현해보았습니다. 이 사용자 정의 층은 보통의 층과 동일하게 사용할 수 있습니다.
model = keras.models.Sequential([
MyDense(30, activation = 'relu', input_shape = input_shape),
MyDense(1)
])
model.compile(loss = 'mse', optimizer = 'nadam')
model.fit(X_train_scaled, y_train, epochs = 2,
validation_data = (X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)
사용자 정의 층을 사용해 모델을 만들어봤어요.
X_train_A, X_train_B = X_train_scaled[:, :5], X_train_scaled[:, 2:]
# 0~4까지 첫번째 입력, 2~7까지 두번째 입력
X_valid_A, X_valid_B = X_valid_scaled[:, :5], X_valid_scaled[:, 2:]
X_test_A, X_test_B = X_test_scaled[:, :5], X_test_scaled[:, 2:]
X_new_A, X_new_B = X_test_A[:3], X_test_B[:3]
# 입력층이 2개인 모델 구축
input_A = keras.layers.Input(shape = [5], name = 'wide_input')
input_B = keras.layers.Input(shape = [6], name = 'depp_input')
hidden1 = keras.layers.Dense(30, activation = 'relu')(input_B)
hidden2 = keras.layers.Dense(30, activation = 'relu')(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
output = keras.layers.Dense(1, name = 'output')(concat)
model = keras.Model(inputs = [input_A, input_B], outputs = [output])
입력층이 2개인 모델을 만들기 위해 데이터를 먼저 분할합니다. 그 다음 입력층을 두 개 받습니다.
입력층_B에서 은닉층 2개를 통과한 뒤 입력층_A와 층 연결을 하고 아웃풋을 출력하는 모델을 만들었습니다.
model.compile(loss = 'mse', optimizer = keras.optimizers.SGD(lr = 1e-3))
history = model.fit((X_train_A, X_train_B), y_train, epochs = 20,
validation_data = ((X_valid_A, X_valid_B), y_valid))
mse_test = model.evaluate((X_test_A, X_test_B), y_test)
y_pred = model.predict((X_new_A, X_new_B))
모델을 적합시키고 출력시켰습니다. 성능은 비슷한 것 같아요.
비슷한 매커니즘으로 출력을 여러개 하는 것 또한 가능합니다. 이 때 손실함수는 각각 필요하며, 어느 출력물에 가중치를 늘리는것도 가능합니다.
class WideAndDeepModel(keras.models.Model):
def __init__(self, units = 30, activation = 'relu', **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(units, activation = activation)
self.hidden2 = keras.layers.Dense(units, activation = activation)
self.main_output = keras.layers.Dense(1)
self.aux_output = keras.layers.Dense(1)
def call(self, inputs):
input_A, input_B = inputs
hidden1 = self.hidden1(input_B)
hidden2 = self.hidden2(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
main_output = self.main_output(concat)
aux_output = self.aux_output(hidden2)
return main_output, aux_output
model = WideAndDeepModel(30, activation = 'relu')
앞서 만든 함수형 API는 정적이기 때문에 반복문/조건문 등 여러 동적인 구조를 필요로 하는 경우 사용이 힘듭니다.
이를 극복하기 위해 서브클레싱 API 모델을 만들었습니다. 이전과 동일하나 출력층이 2개인 구조입니다.
생성자에 층 구성을 하고 call 함수로 정방향 계산을 만들었습니다. 이때 input 클래스의 객체는 만들 필요가 없습니다.
model.compile(loss = 'mse', loss_weights= [0.9, 0.1], optimizer = keras.optimizers.SGD(learning_rate=1e-3))
history = model.fit((X_train_A, X_train_B), (y_train, y_train), epochs=10,
validation_data = ((X_valid_A, X_valid_B), (y_valid, y_valid)))
total_loss, main_loss, aux_loss = model.evaluate((X_test_A, X_test_B), (y_test, y_test))
y_pred_main, y_pred_aux = model.predict((X_new_A, X_new_B))
서브클래싱 API로 만든 모델입니다. 잘 실행됩니다.
다음과 같은 모델을 직접 정의하려고 합니다.
class ResidualBlock(keras.layers.Layer):
def __init__(self, n_layers, n_neurons, **kwargs):
super().__init__(**kwargs)
# 히든층을 만들어줍니다. n_layers 개수만큼 층을 쌓습니다.
self.hidden = [keras.layers.Dense(n_neurons, activation = 'elu',
kernel_initializer = 'he_normal')
for _ in range(n_layers)]
def call(self, inputs):
# 활성화 함수를 쓸때 쓰는 함수
# 인풋을 받으면 계속 히든 층에 레이어에 투입시킵니다.
Z = inputs
for layer in self.hidden:
Z = layer(Z)
return inputs + Z
레이어의 수를 입력받아 그만큼 히든층을 만드는 작업을 해줍니다. 이때 잔차 블록은 출력에 입력을 더합니다.
class ResidualRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(30, activation = 'elu',
kernel_initializer = 'he_normal')
self.block1 = ResidualBlock(2, 30)
self.block2 = ResidualBlock(2, 30)
self.out = keras.layers.Dense(output_dim)
def call(self, inputs):
Z = self.hidden1(inputs)
for _ in range(1 + 3):
Z = self.block1(Z)
Z = self.block2(Z)
return self.out(Z)
생성자에서 층을 만들고 call 메서드에서 이를 사용합니다. 이 모델은 다른 일반 모델처럼 사용도 가능합니다.
model = ResidualRegressor(1)
model.compile(loss = 'mse', optimizer = 'nadam')
history = model.fit(X_train_scaled, y_train, epochs = 5)
score = model.evaluate(X_test_scaled, y_test)
방금 정의한 모델을 적용시킨 결과입니다.