Ingestion API هي طريقة خفيفة لفهرسة المستندات. إذا كنت بحاجة إلى مزيد من التحكم،
فكر في إنشاء موصل في لوحة الإدارة أو اتباع دليل إنشاء موصل.
متى تستخدم Ingestion API؟
استخدم Ingestion API من أجل:- مصادر غير مدعومة: إضافة محتوى من أنظمة ليس لديها موصلات مدمجة
- بيانات تكميلية: إضافة سياق إضافي لبيانات الموصل الموجودة (مثل ملفات README لـ GitLab)
- تحرير المستندات: تعديل المستندات في Gorbit عندما لا يمكن للمسؤول تحديث المستند الأصلي في المصدر
- سير العمل البرمجي: دمج فهرسة المستندات في خطوط بياناتك الحالية
الدليل
انتقل إلى قسم الكود الكامل إذا كنت لا تريد الدليل التفصيلي. في هذا المثال، سنفهرس ملفات محلية باستخدام Ingestion API.عنوان URL الأساسي لـ Ingestion API هو
https://cloud.gorbit.app/api/gorbit-api/ingestion أو نطاقك الخاص.جهّز موصلك
بما أننا نريد أن تظهر هذه الملفات في صفحة الموصلات، سنحتاج أولاً إلى إنشاء موصل.
لهذا المثال، سننشئ موصل ملف مع ملف نصي وهمي.إذا كان لديك بالفعل موصل تريد ربط ملفات Ingestion به،
انقر على الموصل وانسخ
cc_pair_id من URL.https://cloud.gorbit.app/admin/connector/308 -> cc_pair_id هو 243جهّز طلبك
نسخ
اسأل الذكاء الاصطناعي
import requests
API_BASE_URL = "https://cloud.gorbit.app/api" # أو نطاقك الخاص
API_KEY = "YOUR_KEY_HERE"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
معالجة مستنداتك إلى كائن JSON المناسب
إذا كنت تريد أن تظهر مستنداتك في صفحة الموصلات، يجب تحديد
cc_pair_id في البيانات!IngestionDocument.البيانات الدنيا الصالحة
البيانات الدنيا الصالحة
JSON
نسخ
اسأل الذكاء الاصطناعي
{
"document": {
"id": "my_unique_id_1",
"semantic_identifier": "Gorbit FAQ v1",
"sections": [
{ "text": "What is Gorbit?\nGorbit is..." }
],
"source": "file",
"metadata": {
"category": "faq"
}
},
"cc_pair_id": 243
}
البيانات الكاملة مع جميع الحقول الاختيارية
البيانات الكاملة مع جميع الحقول الاختيارية
نسخ
اسأل الذكاء الاصطناعي
{
"document": {
"id": "my_unique_id_1",
"semantic_identifier": "Gorbit FAQ - العنوان المعروض في الواجهة",
"title": "Gorbit FAQ v1 - العنوان للبحث",
"sections": [
{
"text": "ما هو Gorbit؟\nGorbit هو...",
"link": "https://docs.gorbit.app/faq#what-is-gorbit"
},
{
"text": "كيف أبدأ؟\nللبدء...",
"link": "https://docs.gorbit.app/faq#getting-started"
},
{
"image_file_id": "uuid_generated_by_gorbit_file_store",
"text": "متقدم - يجب رفع الصورة أولاً! POST /user/file/upload",
"link": "https://docs.gorbit.app/faq#about"
}
],
"source": "file",
"metadata": {
"category": "faq",
"tags": ["frequently-asked", "help"]
},
"doc_updated_at": "2025-09-19T08:20:00Z",
"chunk_count": 15,
"primary_owners": [
{
"display_name": "Alex Chen",
"first_name": "Alex",
"middle_initial": null,
"last_name": "Chen",
"email": "alex@gorbit.app"
}
],
"secondary_owners": [
{
"display_name": "Sarah Johnson",
"first_name": "Sarah",
"middle_initial": "M",
"last_name": "Johnson",
"email": "sarah@gorbit.app"
}
],
"from_ingestion_api": true
},
"cc_pair_id": 243
}
نسخ
اسأل الذكاء الاصطناعي
import os
from pathlib import Path
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument
def read_files_from_folder(folder_path, cc_pair_id=243):
"""
قراءة ملفات PDF و TXT و DOCX و CSV و XLSX وإنشاء بيانات ingestion دنيا
"""
documents = []
folder = Path(folder_path)
# امتدادات الملفات المدعومة
supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
for file_path in folder.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
try:
# استخراج النص بناءً على نوع الملف
if file_path.suffix.lower() == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix.lower() == '.pdf':
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
elif file_path.suffix.lower() == '.docx':
doc = DocxDocument(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
elif file_path.suffix.lower() in ['.csv', '.xlsx']:
df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
content = df.to_string(index=False)
# تخطي الملفات الفارغة
if not content.strip():
continue
# إنشاء بيانات دنيا صالحة
document_payload = {
"document": {
"semantic_identifier": file_path.name,
"sections": [
{"text": content}
],
"source": "file",
"metadata": {
"file_type": file_path.suffix.lower()
}
},
"cc_pair_id": cc_pair_id
}
documents.append(document_payload)
print(f"تمت المعالجة: {file_path.name}")
except Exception as e:
print(f"خطأ في قراءة {file_path.name}: {e}")
continue
return documents
# مثال على الاستخدام
LOCAL_FOLDER = "./documents" # غيّر إلى مسار مجلدك
CC_PAIR_ID = 243 # استخدم cc_pair_id الفعلي الخاص بك
# قراءة ومعالجة الملفات
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
print(f"\nتم العثور على {len(documents_to_ingest)} مستند جاهز للفهرسة")
قم بإجراء الطلب إلى نقطة النهاية /ingestion
نسخ
اسأل الذكاء الاصطناعي
successful_ingestions = 0
failed_ingestions = 0
for i, document_data in enumerate(documents_to_ingest):
print(f"جاري فهرسة المستند {i+1}/{len(documents_to_ingest)}: {document_data['document']['semantic_identifier']}")
response = requests.post(
f"{API_BASE_URL}/gorbit-api/ingestion",
headers=headers,
json=document_data
)
if response.status_code == 200:
print(f"تمت الفهرسة بنجاح: {document_data['document']['semantic_identifier']}")
successful_ingestions += 1
else:
print(f"فشلت الفهرسة {document_data['document']['semantic_identifier']}: {response.status_code}")
print(f" خطأ: {response.text}")
failed_ingestions += 1
print(f"\nاكتملت الفهرسة: {successful_ingestions} ناجحة، {failed_ingestions} فاشلة")
سيعيد API استجابة نجاح إذا تم قبول المستند للمعالجة.
تحدث الفهرسة الفعلية بشكل غير متزامن.
الكود الكامل
Python
نسخ
اسأل الذكاء الاصطناعي
import requests
import os
from pathlib import Path
import time
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument
API_BASE_URL = "https://cloud.gorbit.app/api" # أو نطاقك الخاص
API_KEY = "YOUR_KEY_HERE"
CC_PAIR_ID = 243 # استبدل بـ cc_pair_id الفعلي الخاص بك
LOCAL_FOLDER = "./documents" # غيّر هذا إلى مسار مجلدك
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def read_files_from_folder(folder_path, cc_pair_id=243):
"""
قراءة ملفات PDF و TXT و DOCX و CSV و XLSX وإنشاء بيانات ingestion دنيا
"""
documents = []
folder = Path(folder_path)
if not folder.exists():
print(f"المجلد غير موجود: {folder_path}")
return documents
supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
print(f"جاري قراءة الملفات من: {folder.absolute()}")
for file_path in folder.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
try:
# استخراج النص بناءً على نوع الملف
if file_path.suffix.lower() == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix.lower() == '.pdf':
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
elif file_path.suffix.lower() == '.docx':
doc = DocxDocument(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
elif file_path.suffix.lower() in ['.csv', '.xlsx']:
df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
content = df.to_string(index=False)
# تخطي الملفات الفارغة
if not content.strip():
print(f"تخطي الملف الفارغ: {file_path.name}")
continue
# إنشاء بيانات دنيا صالحة
document_payload = {
"document": {
"semantic_identifier": file_path.name,
"sections": [
{"text": content}
],
"source": "file",
"metadata": {
"file_type": file_path.suffix.lower()
}
},
"cc_pair_id": cc_pair_id
}
documents.append(document_payload)
print(f"تمت المعالجة: {file_path.name}")
except Exception as e:
print(f"خطأ في قراءة {file_path.name}: {e}")
continue
return documents
# قراءة جميع المستندات من المجلد
print("جاري بدء عملية فهرسة الملفات...")
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
if not documents_to_ingest:
print("لم يتم العثور على مستندات للفهرسة. تحقق من مسار المجلد وأنواع الملفات.")
exit(1)
print(f"تم العثور على {len(documents_to_ingest)} مستند للفهرسة")
# إجراء طلبات الفهرسة لجميع المستندات
successful_ingestions = 0
failed_ingestions = 0
print("\nجاري بدء الفهرسة...")
for i, document_data in enumerate(documents_to_ingest):
print(f"[{i+1}/{len(documents_to_ingest)}] جاري فهرسة: {document_data['document']['semantic_identifier']}")
response = requests.post(
f"{API_BASE_URL}/gorbit-api/ingestion",
headers=headers,
json=document_data
)
if response.status_code == 200:
result = response.json()
print(f"نجاح: {document_data['document']['semantic_identifier']}")
if result.get('already_existed'):
print(" تم تحديث المستند (كان موجوداً بالفعل)")
else:
print(" تم إنشاء مستند جديد")
successful_ingestions += 1
else:
print(f"فشل: {document_data['document']['semantic_identifier']}")
print(f" الحالة: {response.status_code}")
print(f" الخطأ: {response.text}")
failed_ingestions += 1
print(f"\nملخص الفهرسة:")
print(f" ناجح: {successful_ingestions}")
print(f" فاشل: {failed_ingestions}")
print(f" الإجمالي: {len(documents_to_ingest)}")
مثال إضافي
هذا النص البرمجي يزحف إلى مجلد محدد للبحث عن ملفات JSON ويرسل المحتويات إلى Ingestion API.Python
نسخ
اسأل الذكاء الاصطناعي
#!/usr/bin/env python3
"""
نص برمجي للفهرسة
"""
import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any, List, Optional
import requests
from requests.exceptions import RequestException
from tqdm import tqdm # type: ignore
def setup_logging() -> None:
"""تكوين التسجيل للتطبيق."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
def parse_arguments() -> argparse.Namespace:
"""تحليل معاملات سطر الأوامر."""
parser = argparse.ArgumentParser(
description="الزحف إلى مجلد للبحث عن ملفات JSON وإرسالها إلى نقطة نهاية API."
)
parser.add_argument(
"-d", "--directory",
required=True,
type=str,
help="المجلد للزحف للبحث عن ملفات JSON"
)
parser.add_argument(
"-t", "--tracking-file",
required=True,
type=str,
help="ملف لتتبع معرفات المستندات المعالجة (واحد في كل سطر)"
)
parser.add_argument(
"-u", "--url",
type=str,
help="عنوان URL لنقطة نهاية API لإرسال بيانات JSON إليه",
default="http://localhost:8080/api"
)
parser.add_argument(
"--headers",
type=str,
help="سلسلة JSON للعناوين لتضمينها في الطلب"
)
parser.add_argument(
"--timeout",
type=int,
default=30,
help="مهلة الطلب بالثواني (الافتراضي: 30)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="لا ترسل الطلبات فعلياً، فقط اطبع ما سيتم إرساله"
)
parser.add_argument(
"--cc-pair-id",
type=int,
help="معرف زوج بيانات اعتماد الموصل لربط المستندات به"
)
return parser.parse_args()
def find_json_files(directory: str) -> List[Path]:
"""
العثور على جميع ملفات JSON في المجلد المحدد والمجلدات الفرعية له.
المعاملات:
directory: المجلد للبحث فيه
العائدات:
قائمة بكائنات Path لجميع ملفات JSON التي تم العثور عليها
"""
directory_path = Path(directory)
if not directory_path.exists() or not directory_path.is_dir():
raise ValueError(f"المسار المحدد '{directory}' ليس مجلداً صالحاً")
json_files = list(directory_path.glob("**/*.json"))
return json_files
def load_processed_ids(tracking_file: str) -> set:
"""
تحميل معرفات المستندات المعالجة بالفعل من ملف التتبع.
المعاملات:
tracking_file: مسار ملف التتبع
العائدات:
مجموعة من معرفات المستندات التي تمت معالجتها بالفعل
"""
tracking_path = Path(tracking_file)
if not tracking_path.exists():
print(f"ملف التتبع '{tracking_file}' غير موجود. جاري إنشاء ملف جديد.")
tracking_path.parent.mkdir(parents=True, exist_ok=True)
tracking_path.touch()
return set()
try:
with open(tracking_path, "r", encoding="utf-8") as f:
processed_ids = {line.strip() for line in f if line.strip()}
return processed_ids
except Exception as e:
logging.getLogger(__name__).warning(f"خطأ في قراءة ملف التتبع: {e}. البدء بمجموعة فارغة.")
return set()
def save_processed_id(tracking_file: str, document_id: str) -> None:
"""
حفظ معرف مستند معالج في ملف التتبع.
المعاملات:
tracking_file: مسار ملف التتبع
document_id: معرف المستند للحفظ
"""
try:
with open(tracking_file, "a", encoding="utf-8") as f:
f.write(f"{document_id}\n")
except Exception as e:
logging.getLogger(__name__).error(f"خطأ في حفظ معرف المستند في ملف التتبع: {e}")
def transform_json_to_document_format(file_path: Path, cc_pair_id: int | None = None) -> dict:
"""
تحويل ملف JSON إلى تنسيق المستند المطلوب من قبل API.
المعاملات:
file_path: مسار ملف JSON
cc_pair_id: معرف زوج بيانات اعتماد الموصل اختياري
العائدات:
قاموس بالتنسيق المطلوب للمستند
"""
logger = logging.getLogger(__name__)
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# إنشاء هيكل المستند
document: dict[str, Any] = {
"document": {
"id": data["id"],
"sections": [
{
"text": data["page_content"],
"link": data["source"]
}
],
"semantic_identifier": data["title"],
"metadata": {
"tags": data["metadata"]["tags"]
},
"source": "web"
}
}
# إضافة cc_pair_id إذا تم توفيره
if cc_pair_id is not None:
document["cc_pair_id"] = cc_pair_id
logger.debug(f"تمت إضافة cc_pair_id {cc_pair_id} للمستند من {file_path}")
return document
except Exception as e:
logger.error(f"خطأ في تحويل {file_path}: {str(e)}")
return {}
def send_json_to_api(
file_path: Path,
api_url: str,
tracking_file: str,
document_id: str,
headers: Optional[dict] = None,
timeout: int = 30,
dry_run: bool = False,
cc_pair_id: Optional[int] = None
) -> bool:
"""
قراءة ملف JSON، وتحويله إلى التنسيق المطلوب، وإرساله إلى API.
المعاملات:
file_path: مسار ملف JSON
api_url: عنوان URL لنقطة نهاية API
tracking_file: مسار ملف التتبع
document_id: معرف المستند للتتبع
headers: عناوين اختيارية لتضمينها في الطلب
timeout: مهلة الطلب بالثواني
dry_run: إذا كان True، لا ترسل الطلب فعلياً
cc_pair_id: معرف زوج بيانات اعتماد الموصل اختياري
العائدات:
True إذا نجح، False خلاف ذلك
"""
logger = logging.getLogger(__name__)
try:
# تحويل JSON إلى التنسيق المطلوب
transformed_data = transform_json_to_document_format(file_path, cc_pair_id)
if not transformed_data:
logger.error(f"فشل تحويل البيانات من {file_path}")
return False
if dry_run:
logger.info(f"DRY RUN: سيتم إرسال البيانات المحولة من {file_path} إلى {api_url}")
save_processed_id(tracking_file, document_id)
return True
response = requests.post(
api_url,
json=transformed_data,
headers=headers or {},
timeout=timeout
)
response.raise_for_status()
logger.info(f"تم إرسال البيانات المحولة من {file_path} إلى API بنجاح")
# حفظ معرف المستند في ملف التتبع عند المعالجة الناجحة
save_processed_id(tracking_file, document_id)
return True
except json.JSONDecodeError:
logger.error(f"فشل تحليل JSON من {file_path}")
except RequestException as e:
logger.error(f"فشل طلب API لـ {file_path}: {str(e)}")
except Exception as e:
logger.error(f"خطأ غير متوقع في معالجة {file_path}: {str(e)}")
return False
def main() -> int:
"""الوظيفة الرئيسية لتشغيل النص البرمجي."""
setup_logging()
logger = logging.getLogger(__name__)
try:
args = parse_arguments()
# تحليل العناوين إذا تم توفيرها
headers = None
if args.headers:
try:
headers = json.loads(args.headers)
except json.JSONDecodeError:
logger.error("فشل تحليل سلسلة JSON للعناوين")
return 1
# تسجيل cc_pair_id إذا تم توفيره
if args.cc_pair_id is not None:
logger.info(f"استخدام معرف زوج بيانات اعتماد الموصل: {args.cc_pair_id}")
# تحميل معرفات المستندات المعالجة بالفعل
processed_ids = load_processed_ids(args.tracking_file)
logger.info(f"تم تحميل {len(processed_ids)} معرف مستند معالج بالفعل")
# العثور على جميع ملفات JSON
logger.info(f"البحث عن ملفات JSON في {args.directory}")
json_files = find_json_files(args.directory)
logger.info(f"تم العثور على {len(json_files)} ملف JSON")
if not json_files:
logger.warning("لم يتم العثور على ملفات JSON. الخروج.")
return 0
# تصفية الملفات المعالجة بالفعل
unprocessed_files = []
for file_path in json_files:
try:
# استخراج معرف المستند من ملف JSON
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
document_id = data.get("id")
if not document_id:
logger.warning(f"لم يتم العثور على حقل 'id' في {file_path}، تخطي")
continue
if document_id not in processed_ids:
unprocessed_files.append((file_path, document_id))
else:
logger.debug(f"تخطي المستند المعالج بالفعل: {document_id}")
except Exception as e:
logger.warning(f"خطأ في قراءة معرف المستند من {file_path}: {e}، تخطي")
continue
logger.info(f"تم العثور على {len(unprocessed_files)} ملف غير معالج من أصل {len(json_files)} ملف إجمالي")
if not unprocessed_files:
logger.info("تمت معالجة جميع الملفات بالفعل. الخروج.")
return 0
# معالجة كل ملف JSON غير معالج
success_count = 0
failure_count = 0
for file_path, document_id in tqdm(unprocessed_files, desc="معالجة الملفات"):
success = send_json_to_api(
file_path=file_path,
api_url=args.url,
tracking_file=args.tracking_file,
document_id=document_id,
headers=headers,
timeout=args.timeout,
dry_run=args.dry_run,
cc_pair_id=args.cc_pair_id
)
if success:
success_count += 1
else:
failure_count += 1
# الإبلاغ عن النتائج
logger.info(f"اكتملت المعالجة. نجاح: {success_count}، فشل: {failure_count}")
return 0 if failure_count == 0 else 1
except KeyboardInterrupt:
logger.info("تمت مقاطعة العملية من قبل المستخدم")
return 130
except Exception as e:
logger.exception(f"استثناء غير معالج: {str(e)}")
return 1
if __name__ == "__main__":
main()