이 문서에서는 Azure Data Explorer Python 라이브러리를 사용하여 데이터를 수집합니다. Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다. Azure Data Explorer는 Python용 두 개의 클라이언트 라이브러리인 수집 라이브러리 와 데이터 라이브러리를 제공합니다. 이러한 라이브러리를 사용하면 클러스터에 데이터를 수집하거나 로드하고 코드에서 데이터를 쿼리할 수 있습니다.
먼저 클러스터에서 테이블 및 데이터 매핑을 만듭니다. 그런 다음 클러스터에 데이터를 수집 대기열에 넣고 결과를 검증합니다.
필수 조건
- Microsoft 계정 또는 Microsoft Entra 사용자 ID입니다. Azure 구독은 필요하지 않습니다.
- Azure Data Explorer 클러스터 및 데이터베이스. 클러스터 및 데이터베이스를 만듭니다.
- Python 3.4 이상.
데이터 설치 및 라이브러리 수집
azure-kusto-data 및 azure-kusto-ingest를 설치합니다.
pip install azure-kusto-data
pip install azure-kusto-ingest
import 문장 및 상수 추가
azure-kusto-data에서 클래스를 가져옵니다.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
애플리케이션을 인증하기 위해 Azure Data Explorer는 Microsoft Entra 테넌트 ID를 사용합니다. 테넌트 ID를 찾으려면 다음 URL을 사용하여 YourDomain에 대한 도메인을 대체하십시오.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
예를 들어 도메인이 contoso.com 경우 URL은 다음과 https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/같습니다. 이 URL을 클릭하여 결과를 확인합니다. 첫 번째 줄은 다음과 같습니다.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
이 경우 테넌트 ID는 .입니다 aaaabbbb-0000-cccc-1111-dddd2222eeee
. 이 코드를 실행하기 전에 AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI 및 KUSTO_DATABASE 값을 설정합니다.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
이제 연결 문자열을 생성합니다. 다음 예제에서는 디바이스 인증을 사용하여 클러스터에 액세스합니다. 관리 ID 인증, Microsoft Entra 애플리케이션 인증서, Microsoft Entra 애플리케이션 키 및 Microsoft Entra사용자 및 암호를 사용할 수도 있습니다.
이후 단계에서 대상 테이블 및 매핑을 만듭니다.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
원본 파일 정보 설정
추가 클래스를 가져오고 데이터 원본 파일에 대한 상수 설정 이 예제에서는 Azure Blob Storage에서 호스트되는 샘플 파일을 사용합니다. StormEvents 샘플 데이터 세트에는 국립 환경 정보 센터의 날씨 관련 데이터가 포함되어 있습니다.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
클러스터에 테이블 만들기
StormEvents.csv 파일의 데이터 스키마와 일치하는 테이블을 만듭니다. 이 코드가 실행되면 다음과 같은 메시지가 반환됩니다. 로그인하려면 웹 브라우저를 사용하여 페이지를 https://microsoft.com/devicelogin 열고 인증할 코드 F3W4VWZDM 입력합니다. 단계에 따라 로그인한 다음, 다음 코드 블록을 실행하도록 돌아갑니다. 연결을 만드는 후속 코드 블록을 사용하려면 다시 로그인해야 합니다.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
수집 매핑 정의
테이블을 만들 때 사용되는 열 이름 및 데이터 형식에 들어오는 CSV 데이터를 매핑합니다. 원본 데이터 필드를 대상 테이블 열에 매핑합니다.
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
수집을 위해 메시지 큐에 대기
Blob Storage에서 데이터를 끌어오고 해당 데이터를 Azure Data Explorer로 수집하는 메시지를 큐에 넣습니다.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
테이블에 수집된 쿼리 데이터
대기열에 있는 데이터 수집이 예약되고 데이터가 Azure Data Explorer에 로드될 때까지 5~10분 기다립니다. 그런 다음, 다음 코드를 실행하여 StormEvents 테이블의 레코드 수를 가져옵니다.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
문제 해결 쿼리 실행
로그인하고 https://dataexplorer.azure.com 클러스터에 연결합니다. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 데이터 입력 실패가 있었는지 확인하세요. 실행하기 전에 데이터베이스 이름을 바꿉다.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
다음 명령을 실행하여 지난 4시간 동안의 모든 수집 작업의 상태를 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉다.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
자원을 정리하세요
만약 귀하가 우리의 다른 문서를 따르려는 경우, 만든 리소스를 보관하세요. 그렇지 않은 경우 데이터베이스에서 다음 명령을 실행하여 StormEvents 테이블을 정리합니다.
.drop table StormEvents