종목을 조회하기 위해서는 다음과 같은 선제 사항이 되어야 합니다.
- 크레온 플러스 관리자 모드로 접속
- 파이썬 32비트 주피터 노트북 관리자로 접속
- 주피터 노트북 32비트 파일 만들기
- 아래 나오는 코드들은 주피터 노트북 위에서 작업을 진행합니다.
다음 파이썬 코드는 크레온이 연결되었는지 확인합니다.
import ctypes
import sys
import win32com.client
import pandas as pd
# PLUS 공통 OBJECT
g_objCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr')
g_objCpStatus = win32com.client.Dispatch('CpUtil.CpCybos')
g_objCpTrade = win32com.client.Dispatch('CpTrade.CpTdUtil')
def InitPlusCheck():
# 프로세스가 관리자 권한으로 실행 여부
if ctypes.windll.shell32.IsUserAnAdmin():
print('정상: 관리자권한으로 실행된 프로세스입니다.')
else:
print('오류: 일반권한으로 실행됨. 관리자 권한으로 실행해 주세요')
return False
# 연결 여부 체크
if (g_objCpStatus.IsConnect == 0):
print("PLUS가 정상적으로 연결되지 않음. ")
return False
# # 주문 관련 초기화 - 계좌 관련 코드가 있을 때만 사용
# if (g_objCpTrade.TradeInit(0) != 0):
# print("주문 초기화 실패")
# return False
return True
InitPlusCheck()
다음 파이썬 코드는 종목종류를 조회하기 위함입니다.
- 코스닥과 코스피를 전부 조회할 수 있습니다.
- 아래 함수는 조회하기 위해 필요한 함수 및 클래스 입니다.
import sys
import win32com.client
import ctypes
import pandas as pd
################################################
# PLUS 공통 OBJECT
g_objCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr')
g_objCpStatus = win32com.client.Dispatch('CpUtil.CpCybos')
g_objCpTrade = win32com.client.Dispatch('CpTrade.CpTdUtil')
################################################
# PLUS 실행 기본 체크 함수
def InitPlusCheck():
# 프로세스가 관리자 권한으로 실행 여부
if ctypes.windll.shell32.IsUserAnAdmin():
print('정상: 관리자권한으로 실행된 프로세스입니다.')
else:
print('오류: 일반권한으로 실행됨. 관리자 권한으로 실행해 주세요')
return False
# 연결 여부 체크
if (g_objCpStatus.IsConnect == 0):
print("PLUS가 정상적으로 연결되지 않음. ")
return False
# # 주문 관련 초기화 - 계좌 관련 코드가 있을 때만 사용
# if (g_objCpTrade.TradeInit(0) != 0):
# print("주문 초기화 실패")
# return False
return True
class CpMarketEye:
def __init__(self):
self.objRq = win32com.client.Dispatch("CpSysDib.MarketEye")
self.RpFiledIndex = 0
def Request(self, codes, dataInfo):
# 0: 종목코드 4: 현재가 20: 상장주식수
rqField = [0, 4, 20] # 요청 필드
self.objRq.SetInputValue(0, rqField) # 요청 필드
self.objRq.SetInputValue(1, codes) # 종목코드 or 종목코드 리스트
self.objRq.BlockRequest()
# 현재가 통신 및 통신 에러 처리
rqStatus = self.objRq.GetDibStatus()
#print("통신상태", rqStatus, self.objRq.GetDibMsg1())
if rqStatus != 0:
return False
cnt = self.objRq.GetHeaderValue(2)
for i in range(cnt):
code = self.objRq.GetDataValue(0, i) # 코드
cur = self.objRq.GetDataValue(1, i) # 종가
listedStock = self.objRq.GetDataValue(2, i) # 상장주식수
maketAmt = listedStock * cur
if g_objCodeMgr.IsBigListingStock(code) :
maketAmt *= 1000
# print(code, maketAmt)
# key(종목코드) = tuple(상장주식수, 시가총액)
dataInfo[code] = (listedStock, maketAmt)
return True
class CMarketTotal():
def __init__(self):
self.dataInfo = {}
def GetAllMarketTotal(self, stock_type='all'):
if stock_type=='all':
kospi = g_objCodeMgr.GetStockListByMarket(1) # 거래소
kodaq = g_objCodeMgr.GetStockListByMarket(2) # 코스닥
elif stock_type=='kospi':
kospi = g_objCodeMgr.GetStockListByMarket(1) # 거래소
kodaq = ()
elif stock_type=='kodaq':
kospi = () # 거래소
kodaq = g_objCodeMgr.GetStockListByMarket(2) # 코스닥
allcodelist = kospi + kodaq
print('전 종목 코드 %d, 거래소 %d, 코스닥 %d' % (len(allcodelist), len(kospi), len(kodaq)))
objMarket = CpMarketEye()
rqCodeList = []
for i, code in enumerate(allcodelist):
rqCodeList.append(code)
if len(rqCodeList) == 200:
objMarket.Request(rqCodeList, self.dataInfo)
rqCodeList = []
continue
# end of for
if len(rqCodeList) > 0:
objMarket.Request(rqCodeList, self.dataInfo)
def PrintMarketTotal(self):
# 시가총액 순으로 소팅
data2 = sorted(self.dataInfo.items(), key=lambda x: x[1][1], reverse=True)
print('전종목 시가총액 순 조회 (%d 종목)' % (len(data2)))
for item in data2:
name = g_objCodeMgr.CodeToName(item[0])
listed = item[1][0]
markettot = item[1][1]
#print('%s 상장주식수: %s, 시가총액 %s' %(name, format(listed, ','), format(markettot, ',')))
- 위 클래스 및 함수를 이용하여 조회할 수 있습니다.
def stock_confirm(stock_type='all', stock_name=None, stock_code=None, num='all' ):
"""
* stock_type : 'all', 'kospi', 'kodaq'
* stock_name : None,
example) '삼성전자'
* stock_code : None,
example) 'A005930'
* num : 'all'
example) 100, 200, 300
"""
assert stock_type in ['all','kospi','kodaq'], "stock_type은 kospi, kodaq, all 만 올 수 있습니다."
objMarketTotal = CMarketTotal()
objMarketTotal.GetAllMarketTotal(stock_type)
objMarketTotal.PrintMarketTotal()
data2 = sorted(objMarketTotal.dataInfo.items(), key=lambda x: x[1][1], reverse=True)
MarketTotal = []
for item in data2:
name = g_objCodeMgr.CodeToName(item[0])
listed = item[1][0]
markettot = item[1][1]
MarketTotal.append([name, item[0], listed, markettot])
pdMarketTotal = pd.DataFrame(MarketTotal)
if num!='all':
try:
pdMarketTotal = pdMarketTotal[:num]
except:
raise Exception("숫자를 입력하세요.")
pdMarketTotal.columns = "name,code,stock,capitalization".split(",")
if stock_name:
pdMarketTotal = pdMarketTotal[pdMarketTotal['name']==stock_name]
if stock_code:
pdMarketTotal = pdMarketTotal[pdMarketTotal['code']==stock_code]
return pdMarketTotal
df = stock_confirm(stock_type='all', stock_name='삼성전자')
df1 = stock_confirm(stock_type='all')
df1